- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;7 Z* m" V0 T2 A$ K
- require_once __DIR__ . '/Workerman/Autoloader.php';
- o( ?8 U3 X+ {3 d1 B0 ]- H - 6 I4 l; i" h% Y$ H; B+ y3 D
- $worker = new Worker();/ t( q) a; E4 P( |2 n% P
- // 4个进程
. d. q& ?% y9 `9 V - $worker->count = 4;
2 P4 M& Z, Z& b/ ? - // 每个进程启动后在当前进程新增一个Worker监听; R& {& ^" P% \) w- z
- $worker->onWorkerStart = function($worker)
$ z/ ~& q2 d, B: n# |' l N - {
- J o% T$ `/ w" k - /**# I7 G9 l2 e, o
- * 4个进程启动的时候都创建2016端口的Worker: x3 m- U! C4 j& U* ]2 D
- * 当执行到worker->listen()时会报Address already in use错误0 u4 i" j- y, s g$ N* s3 J
- * 如果worker->count=1则不会报错! q% N3 U% D9 D' U# G
- */
8 b0 e/ Z6 K) `: s$ \ - $inner_worker = new Worker('http://0.0.0.0:2016');
8 i9 C3 J( A* {, x - $inner_worker->onMessage = 'on_message';
# [' C: Z# Y6 ]7 @8 H ? - // 执行监听。这里会报Address already in use错误
" u+ m5 I; e2 Z( V1 U - $inner_worker->listen();
7 G2 F, X( Y- P9 Z% \' \7 N - };
|2 s- Z& n, @+ |1 A4 z' O- ?
( T p: X9 o, q9 R- $worker->onMessage = 'on_message';
: @% @- ~ `0 o8 [5 k - , t. v; k8 `& ^! z
- function on_message($connection, $data)
# m$ M$ B/ ^" }4 {# \- |! h8 z - {
( `( K# t/ [" S) E - $connection->send("hello\n");
- i+ P! I3 E+ s; \ b - }8 Q" l; y8 H" j3 S# b
- ! h. m. ?" Y3 G% l
- // 运行worker. q W% a6 e' Q- ?2 w: {6 J6 c ^
- Worker::runAll();
9 b6 G0 \$ E# v1 w2 {5 d* K1 M5 E - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
- C# Q5 |/ @- }: X
+ U+ d- Z* x2 q; O4 H- use Workerman\Worker;
: j3 q( B' S9 d7 a, L - require_once './Workerman/Autoloader.php';
9 P# P4 }5 S/ Q1 x3 I6 W! d5 D6 _ - ) b7 R1 U* { N( ~4 }4 i& B5 k8 x5 G* o
- $worker = new Worker('text://0.0.0.0:2015');( B3 x8 N7 e7 B& Q6 H# v5 S$ T
- // 4个进程
/ o0 }9 x4 p6 ~0 T - $worker->count = 4;
+ o: @9 K7 M4 l( G - // 每个进程启动后在当前进程新增一个Worker监听9 o+ g6 ]& Z+ i* ?. m
- $worker->onWorkerStart = function($worker), |% A$ t% [$ r
- {
3 ?( d7 r; V, y( } - $inner_worker = new Worker('http://0.0.0.0:2016');
& r5 n& j& v, Z$ `8 R - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
6 y) P" f* [% `% f9 j/ e/ C - $inner_worker->reusePort = true;- ?7 T4 C7 Q; @5 L
- $inner_worker->onMessage = 'on_message';
5 y% u6 k8 h9 g3 @& k; u! ] - // 执行监听。正常监听不会报错
5 S, _7 H; w8 z6 ?9 d, L" D7 M7 [ - $inner_worker->listen();( S `! ~9 _/ x# T/ s+ |+ G! ]: p" S
- };
5 @5 ^4 N9 Z% { - : Y1 ^; H0 M2 R
- $worker->onMessage = 'on_message';
, ~0 C- L) z, y A* q& q - 9 J) J# @+ T; v/ A4 [* W: c
- function on_message($connection, $data)/ c* \. e% r1 P! e9 v- e
- {
5 h3 T# y6 k9 A/ A - $connection->send("hello\n");1 `% I" O4 g7 B+ u. v) X+ n$ g: M6 V$ Q
- }! w2 N% y: ]. Y: q& v5 v1 @7 [
$ Q7 ^- E. t7 Q/ k( J; ~0 a* h/ u- // 运行worker
T* x$ x# d9 Z8 H( V5 W) G - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php& M( u9 J: J) m4 k7 x: y5 b/ n
- use Workerman\Worker;& V4 N+ e! |7 o0 x2 [3 ^0 J
- require_once './Workerman/Autoloader.php';1 t1 ?# B! I- i
- // 初始化一个worker容器,监听1234端口! C# L8 H$ h* f/ m* ~) I
- $worker = new Worker('websocket://0.0.0.0:1234');3 ?$ _$ I/ j& x$ C) h
- 2 j% F- U+ X% m R8 M* x4 F
- /*+ w3 \) J2 e0 {- _( ~, l! C0 m
- * 注意这里进程数必须设置为1,否则会报端口占用错误' Q5 Q. R" l% K8 `
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
8 n n. B y- a - */% s& d; x6 O! Z- n3 D E# E* F
- $worker->count = 1;
3 F9 `. x! d( J0 }* E1 `6 F( [ - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
# a: N) V: A& D, O - $worker->onWorkerStart = function($worker)
- X" t4 Y# g" r* m6 f6 |$ g - {5 p' ?7 d( R" Z2 j7 w; W3 L6 k
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
2 J# g; E: e; t9 F' \0 l m# s - $inner_text_worker = new Worker('text://0.0.0.0:5678');
2 H, g! [3 B; `0 M - $inner_text_worker->onMessage = function($connection, $buffer)
# E6 l. T; R/ R5 a2 T# Q- d - {
9 y' U i+ x1 b9 V( } - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
( d- Z7 f4 u7 J) A$ n - $data = json_decode($buffer, true);9 I j( n" q# R+ p
- $uid = $data['uid'];
) H) L# v/ X9 u& H7 s- X- g# H1 h - // 通过workerman,向uid的页面推送数据' h3 Z$ d3 P1 x6 d! ~' C
- $ret = sendMessageByUid($uid, $buffer);! m# I$ }9 {9 ]9 _2 {0 A& _# o
- // 返回推送结果1 V3 H) {7 C% V* a7 ^
- $connection->send($ret ? 'ok' : 'fail');
! w6 X% W+ W: n+ v+ T - };# u; e# c0 s- }: B+ X
- // ## 执行监听 ##3 C1 z1 x- i% T
- $inner_text_worker->listen();( K( O- z3 V& i. U, h2 O
- };
* d5 }1 |' x; `1 j - // 新增加一个属性,用来保存uid到connection的映射; n* |# ^6 E0 W! d' {
- $worker->uidConnections = array();
! ^/ v* w- c. e/ u; D$ E0 p - // 当有客户端发来消息时执行的回调函数
8 d! S% u, f+ b - $worker->onMessage = function($connection, $data)
1 u4 e. n1 w3 i$ c - {" |9 |2 Q; ^) l+ x8 P4 k* l
- global $worker;; K# x( y G, [2 |2 E/ ?! K
- // 判断当前客户端是否已经验证,既是否设置了uid! l( g6 s( d+ E' ]
- if(!isset($connection->uid))
K# F) U2 [/ \9 V1 N - {
* g R3 p$ z: o4 Q8 y$ i - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证) {7 @$ Q9 h3 [1 |0 h' b
- $connection->uid = $data;
0 I& g* W% m8 O& v* o. P - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,1 N- u9 g# S) w# P( b5 P
- * 实现针对特定uid推送数据4 W/ a8 Q$ O0 _. Z" g5 ^
- */
, k* K2 \, w/ G" C- X$ X - $worker->uidConnections[$connection->uid] = $connection;
# t0 Z, e1 r! V* S - return;
/ `: ^4 v! X8 @( @2 \9 W% y' B - }
3 ~: v+ O+ f6 l& I& V) T( [) j - };
! ]8 p6 S( X5 _4 h, q) ]# n - 4 W% o4 z3 X0 J1 s# B8 g+ `, |
- // 当有客户端连接断开时/ D9 ^. q' q5 |* H+ W4 G- U
- $worker->onClose = function($connection)
: X! f6 O) I# |7 y0 n& M - {, Z/ s+ S! K T/ E% }. P7 J
- global $worker;2 j2 q. P Z! {8 o+ k
- if(isset($connection->uid))
# j0 i9 D* A- j - {, p# `: y, K- c' l
- // 连接断开时删除映射
: X& W O! O( \2 d - unset($worker->uidConnections[$connection->uid]);
6 e: K1 V0 C& W- z - }: T2 P" ~$ {7 ^0 ~4 Z0 P
- };
" t, L a; s/ V0 X- k/ Y
7 L9 P A/ v9 p- // 向所有验证的用户推送数据0 n% m( y9 F: j4 W
- function broadcast($message)
+ W" W% L: ?" K* c - {5 {1 u& H9 m+ D- p+ ?( E0 P: P/ `
- global $worker;7 S# x0 j7 W6 v% A+ k$ F* ~% ^
- foreach($worker->uidConnections as $connection)& [" m7 G! K. {5 A) @ C
- {
5 B m. U; ?, N2 g! c" ~$ j; H - $connection->send($message);; o! H6 d. t1 f0 y. Z+ m
- }: w. F: M0 b) f" f8 ^
- }/ }1 ]; x$ n. y& ]) h
- 5 T' J) y) K! I* ?. r( _0 B
- // 针对uid推送数据3 N* J0 L+ F( K' [! F; t! F
- function sendMessageByUid($uid, $message)/ O* G1 w' q! Z! A; z
- {
$ x3 J0 a2 \6 H" C - global $worker;! D9 T9 G' c* n/ w! h* V/ s
- if(isset($worker->uidConnections[$uid]))9 l8 b! Q4 S4 W. J2 P6 b" t6 G$ \. q
- {' z% N' n9 v* C3 I5 P" D" M, G
- $connection = $worker->uidConnections[$uid];
2 }/ }5 W+ N; v& b - $connection->send($message);
7 W% I+ Q3 D$ L1 s5 T r* v - return true;
( z9 C/ b, K2 c4 k5 o; _: ] - }
' k' Z1 M. }0 @! A - return false;+ [) H- u2 B( y1 A, b' f q
- }
# T# R. m: R }& z- a - 7 K& r' V) _, `7 L) u
- // 运行所有的worker$ [/ Q# W( U4 v( ^- I
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
$ J) k$ b, @% E' \) ?2 T: Y - ws.onopen = function(){/ t; U% v& I7 ?
- var uid = 'uid1';
; t3 F5 R/ L* E, g2 V - ws.send(uid);- d4 H1 Z/ k7 @0 v2 i7 |' t8 l
- };
3 L0 I( V0 A# f - ws.onmessage = function(e){
1 R$ G+ j; @; V - alert(e.data);
a* n8 L: p# B( R% j - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
& M8 r" v K# t ]" T - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
7 Q h. g/ O4 D9 Z" c/ Z& z1 O8 w( H - // 推送的数据,包含uid字段,表示是给这个uid推送, n4 @$ H6 u0 e5 \% V
- $data = array('uid'=>'uid1', 'percent'=>'88%');# P9 Y }: g+ _9 P- x
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ _# p% h% W2 o) o3 F2 ~6 Z
- fwrite($client, json_encode($data)."\n");# y: n$ v$ S$ H D
- // 读取推送结果
% P5 F4 v+ V+ U+ R* } - echo fread($client, 8192);
复制代码 0 i7 g2 J. z( x* A
. `! Z& j6 G' v% ^* F+ d: w |