- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
3 z, w9 i' m7 i. C7 J- T& C! r - require_once __DIR__ . '/Workerman/Autoloader.php';4 Z, B- q4 h& H8 n9 R( @/ U% n; u& O* ~( h# G
' ?- j- U8 W, w, d0 p5 a- A- $worker = new Worker();
0 E+ d; r) W4 `- q' F& L' o' Z: v4 U - // 4个进程
0 `5 v9 c# D3 I2 p v3 w - $worker->count = 4;9 Y, ~5 p4 a2 U
- // 每个进程启动后在当前进程新增一个Worker监听
% v; D) |% Q; g - $worker->onWorkerStart = function($worker)
" G1 Y: p% p, ?( w - {
$ [6 j, B6 u" e - /**) U8 n; n2 U' j: Y
- * 4个进程启动的时候都创建2016端口的Worker' U' v; g) O2 S Q
- * 当执行到worker->listen()时会报Address already in use错误
6 U; n3 e6 H+ {) v1 x - * 如果worker->count=1则不会报错
% P# D. p Y0 Q) A3 q0 [+ } - */9 j2 l" {/ ~* a( U9 u) n
- $inner_worker = new Worker('http://0.0.0.0:2016');7 ?! @6 V# }3 w: N0 q$ Z% y7 K# d
- $inner_worker->onMessage = 'on_message';
3 r: q! B; d3 {: D5 n0 o - // 执行监听。这里会报Address already in use错误! z1 X; k) x N
- $inner_worker->listen();' Q; y& B6 M2 i) Z# k( K
- };
1 q$ r2 K# I# V* N6 k' X: Y( z( z - $ G* D/ X0 r! W
- $worker->onMessage = 'on_message';
|8 W0 c2 a) t3 f# D! Z
& c" V0 X$ v) M3 s* d/ x- function on_message($connection, $data)8 x2 U$ s6 H0 [: x
- {
7 |1 y. H5 s( ?& P2 O& } - $connection->send("hello\n");
+ Q- M% c: p9 C' x - }! q. a% ?# V+ _2 G. U
- - A; I9 E7 X1 {$ o- X
- // 运行worker
# `+ ^& o0 _4 n* q# _& \" U4 d - Worker::runAll();- {! u6 G& y3 p! l7 |
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
- v+ t) @- ]+ H3 b& k
/ P" f! }$ v1 m+ H" H! a/ t' y- use Workerman\Worker;( v$ j% {$ r2 S1 B, v
- require_once './Workerman/Autoloader.php';& O$ z: c; Y5 V9 P" f1 u
0 Z' E' B$ f5 N0 \- F# Q" g, L- $worker = new Worker('text://0.0.0.0:2015');
" n( u2 a" {7 u1 I - // 4个进程
! Y# W; w9 i4 |) F! v7 @ - $worker->count = 4;! P6 i7 y9 T# J; t
- // 每个进程启动后在当前进程新增一个Worker监听6 F9 @; U* M6 x* I
- $worker->onWorkerStart = function($worker)7 P3 ?; M% H7 u' Z3 ~
- {* O" d, A2 N$ Y" t
- $inner_worker = new Worker('http://0.0.0.0:2016');
7 F4 e4 A, R A, D" }: t. H - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
9 u) ?8 n* w# N. e* b- X4 v# m: ] - $inner_worker->reusePort = true;+ y! J) B* U# V7 E/ Y$ k
- $inner_worker->onMessage = 'on_message';
2 P+ U2 E: ? F - // 执行监听。正常监听不会报错0 v- d$ R- T! X- s6 ]2 \. s* y
- $inner_worker->listen();# Y( u- \% Y0 Y
- };
' c e: F1 f9 f, { - & \2 t8 C* | N: T
- $worker->onMessage = 'on_message';
9 ^1 x1 l8 e( W* V
5 `( z6 d8 m1 q, C- M) o/ f# V- Y- function on_message($connection, $data)
" l6 q3 b# E, M - {
) A2 W0 [; Z- ^6 I' B2 a - $connection->send("hello\n");" B9 i: ?* c& M' N/ n# L6 Q
- }4 S; Z$ Z. R! u0 G" o5 j
- + l+ `/ Z& G: J* ]! y) D2 n
- // 运行worker
/ b8 w' n) N4 d4 \! R4 u3 f, A# b - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php: F' h; h7 O' G0 X& n0 u8 v
- use Workerman\Worker;
) i5 k: m" m" c2 P. f* _ - require_once './Workerman/Autoloader.php';* F0 X: x) s3 O" U1 F
- // 初始化一个worker容器,监听1234端口
1 M9 j8 y) @* @5 ]# X( f2 T - $worker = new Worker('websocket://0.0.0.0:1234');/ \4 w9 p. a E
- 1 D' f" u, {* Z
- /*
8 z3 U7 W1 T" t9 R+ U - * 注意这里进程数必须设置为1,否则会报端口占用错误
8 C; ~9 R, s' m3 \) v% _ - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
4 x+ s' [+ X3 J5 k T, M- S - */
! W7 u% I" A8 _& I% o6 D& d/ t. Z - $worker->count = 1;# G' L) a- H0 E. C, W
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口7 C1 t$ G# g4 X/ Z+ v0 p% l
- $worker->onWorkerStart = function($worker)
: t" i1 X# J7 |0 f- e - {3 r2 G$ y8 o0 J1 G8 |, Q
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
! b- N3 K8 Z, _9 K: o - $inner_text_worker = new Worker('text://0.0.0.0:5678');) n- P. M/ L" n$ z0 M2 ~1 ?1 D
- $inner_text_worker->onMessage = function($connection, $buffer)& ]4 ^0 f& f5 Y V
- {# q9 F0 f. s v! C
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据 h; n4 F# l2 p& y7 r# p- k
- $data = json_decode($buffer, true);
& Y7 k) C5 I* p9 J - $uid = $data['uid'];3 O' _* j! @) j* _
- // 通过workerman,向uid的页面推送数据
! B% h. ]7 u3 M" R4 G( f) t1 n - $ret = sendMessageByUid($uid, $buffer);6 x* e. k+ F* t% H# T
- // 返回推送结果
" e- p4 W4 I; R& W- w. {. R, u - $connection->send($ret ? 'ok' : 'fail');# R5 |* I4 L8 @9 }) q
- };2 f# ^' t0 @* }- K0 m/ j
- // ## 执行监听 ##- P+ X7 v/ f( p5 Z& K# ?
- $inner_text_worker->listen();. d0 z; p6 ?; c: F! x2 u% U
- };
3 O9 W# G8 E+ h3 i& S - // 新增加一个属性,用来保存uid到connection的映射
3 b$ y( M$ q, m; d( f' n - $worker->uidConnections = array();& O6 S0 T" c4 ?% R) f
- // 当有客户端发来消息时执行的回调函数
8 f+ y7 z( q- g) { - $worker->onMessage = function($connection, $data)
; I0 z3 Z. q: p - {
6 q; O* Y' F, q( R& u6 I - global $worker;
/ O1 {% ]0 w, m: q5 E - // 判断当前客户端是否已经验证,既是否设置了uid* ]( t6 K0 S2 z2 G: H$ ^+ N1 T
- if(!isset($connection->uid))( \( @, n+ C- y# `, U' G; H, e! x
- {+ ~. ~7 U& D5 \1 O+ F
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
o# s. t7 \- E3 a - $connection->uid = $data;
9 z- j5 U' L3 _: J4 N5 g/ x+ C6 c - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# g' q6 c: ]/ e# Z0 T% {( M4 T7 |
- * 实现针对特定uid推送数据+ f$ t5 Y5 z6 Q! C, u' d
- */4 `: Z$ v& V, E) P
- $worker->uidConnections[$connection->uid] = $connection;
) w; i% \/ D/ O/ |8 P - return;
t0 @/ w% X' V5 Y. j - }
' K& s7 ^: U% O5 w2 s' T - };
* E) B4 o4 `2 E2 o; i, e; A - 8 a. _+ @! [8 X
- // 当有客户端连接断开时) \& Y* g( `( C7 ?
- $worker->onClose = function($connection)$ {4 m. Z4 |- j& ^
- {7 _: g( B( j$ d2 \2 c: a% _
- global $worker;5 o3 Z) u& X$ {3 y
- if(isset($connection->uid))
/ ^2 m1 J, V/ w' D' b, D" F - {2 A* t Q& c4 q! U# |$ u
- // 连接断开时删除映射0 {7 e) h0 d* V% W
- unset($worker->uidConnections[$connection->uid]);! C' O2 s* z7 i. E
- }
( R" U( b8 v2 J) M - };
7 g. C1 r9 A; c. V9 E0 n0 V1 J - 9 w/ G8 m$ N; P. K+ j3 x
- // 向所有验证的用户推送数据1 p9 w" p( z: y$ T
- function broadcast($message)
& H$ q: Q# W4 \. }% b - {
2 j2 ?4 x. o' ^4 T: K1 q0 Y$ s - global $worker;
3 A7 p( c, R. S( b - foreach($worker->uidConnections as $connection)
; L' M6 h- e) ~ - {( [4 L! H0 t( c! I% e
- $connection->send($message);% X# D% g L2 Y
- }# u: \! a7 g9 P& h' u4 e
- }1 q) x/ ]2 T/ t. N5 d- C
- t7 \/ Q- ~! @- O- // 针对uid推送数据
/ m9 \+ v( {4 M/ K! A. n - function sendMessageByUid($uid, $message)
$ v! u( u+ v3 I" w% l - {2 e6 t! J0 V5 x
- global $worker;
. H* z" \6 _% U0 r6 K- R - if(isset($worker->uidConnections[$uid])) L5 q- ]& e8 `. v5 M
- {
% U& X: H8 g0 ~) W1 Y) ?* d7 o - $connection = $worker->uidConnections[$uid];3 [1 D& C6 }2 i, c! i; o
- $connection->send($message);
3 a6 Q8 T" |$ o% `! t$ D# x9 b" T - return true;1 j) z2 n9 g( h" L$ |, k
- }- I# g H# N3 S" d( {2 j& _
- return false;
% u1 A& T# A! u) j l+ { - }
0 H3 y" n8 `, ]- P# o, V4 Q$ n7 w - - w) } X6 H' s4 S0 }
- // 运行所有的worker+ H' @8 m# u% P. c/ H7 Q- O
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234'); [3 [; Z4 P& n4 `: ]
- ws.onopen = function(){ p- f7 M- z0 O% @0 f
- var uid = 'uid1';+ d6 D+ {/ Q6 _/ o8 c
- ws.send(uid);& O( s$ I; B# i B. D3 _! i
- };
1 p7 l# N; i) @ - ws.onmessage = function(e){
% Q1 l, G; }& D" w& M - alert(e.data);
. Z; \* m; K: G - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口& W3 ]7 Y+ N2 u4 O: I; a- X" [2 h2 l4 I
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
9 y4 k m$ U3 I( L+ |3 T - // 推送的数据,包含uid字段,表示是给这个uid推送# _% O- k) [+ o# o# K4 w* t l
- $data = array('uid'=>'uid1', 'percent'=>'88%');$ G2 ]3 j0 `$ p4 p, t0 W3 E
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
! k' p' p" K9 H7 P - fwrite($client, json_encode($data)."\n");
8 F$ v9 ^" L0 M% z! [, m; ^! E - // 读取推送结果; m7 y' V) Q+ u1 k0 j/ |. f
- echo fread($client, 8192);
复制代码 + \, T+ L' [1 L: e/ @& w
! i( q# p7 k( D/ C ] |