- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
6 ^, ^8 F! N+ F4 V5 D - require_once __DIR__ . '/Workerman/Autoloader.php';
5 _4 i* O- {5 J) m& B
! I, [* h# x. e3 j0 f: u- $worker = new Worker();
9 P/ b' g5 q( r: J; c& Z - // 4个进程
+ p4 |* r' f" T# l3 D, p - $worker->count = 4;
- o, w& {" j) h9 o( L: q - // 每个进程启动后在当前进程新增一个Worker监听1 q2 d5 |/ Z. C, x: P0 x& h
- $worker->onWorkerStart = function($worker)$ ^$ V1 A: n8 E% p* c! @8 U6 ^ W
- {
4 p" k; a) M( K6 Q$ B6 e+ { - /**1 U- c1 N2 m6 k" v0 D. K
- * 4个进程启动的时候都创建2016端口的Worker$ ?$ s. `$ f6 H& L8 [" O
- * 当执行到worker->listen()时会报Address already in use错误8 w' B- Y M, D
- * 如果worker->count=1则不会报错
8 r" u, ^: L1 b7 D3 f, } a/ H( E - */# h' Q( b9 S4 e3 Q0 J! H( h0 I
- $inner_worker = new Worker('http://0.0.0.0:2016');; \2 g0 [# w3 R" J- S- w
- $inner_worker->onMessage = 'on_message';
$ G) l6 {0 R, t8 {: M/ l: Z, v9 n6 w8 s, R - // 执行监听。这里会报Address already in use错误
+ J3 |+ s" k4 Z+ L1 { - $inner_worker->listen();
6 F# D' G y+ `9 ?" ?9 e, m* n - };
! @; z0 h# ]$ N+ m" Z
0 j( F" a! B& [/ p! ]: C1 s% O- $worker->onMessage = 'on_message';, _0 [0 Y& d- q( E
6 s2 k5 ~7 S9 F# C; e6 {- function on_message($connection, $data). P6 r5 P" G# c: l7 E2 @& K" i P
- {
3 i* j# e9 t, \& B4 I - $connection->send("hello\n");6 `0 O8 _ x$ Q& Y3 ~; L$ i, c
- }
' _- Z' T, N; u) M - * I! M5 w9 ^2 h4 q: }: d& ^! j
- // 运行worker( N2 n2 K$ ]0 d+ E9 Q7 C
- Worker::runAll();
' e! B1 n+ L" C* s - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 F) ~5 |: o2 a0 F7 [9 W3 |! M$ g
- + R9 t/ \! s5 L7 {5 ]' m
- use Workerman\Worker;3 o* C y; ~, s# r* Y0 E- U* V, l" z
- require_once './Workerman/Autoloader.php';
' F+ r! |" U% m9 r2 z) r
) Z& z' T7 ]) r- $worker = new Worker('text://0.0.0.0:2015'); ]. ~: E. ?% c& B5 e0 K# p
- // 4个进程
( q# m. v \7 D* q* ~" b# B - $worker->count = 4;
3 a4 r3 ^3 L0 @ - // 每个进程启动后在当前进程新增一个Worker监听
6 E9 i7 I" ?" Q q0 l8 z - $worker->onWorkerStart = function($worker)
! a0 [6 R6 j! X0 q5 Z( A% ] - {
8 ^2 s3 y6 y" x! q, _6 R+ v, s, Z8 g - $inner_worker = new Worker('http://0.0.0.0:2016');
( q" o; S7 e; n5 z8 {) I' R - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
+ _8 u) Q4 R% V& d; r/ A& y. @1 M2 Y - $inner_worker->reusePort = true;1 i* |, J$ ~9 o, J$ \, P- @6 O
- $inner_worker->onMessage = 'on_message';$ [* _4 n- J6 ]; v# }( Y
- // 执行监听。正常监听不会报错
: [& Q9 ]. z* [1 o - $inner_worker->listen();
* t# \! B$ F% A9 p8 G. B( l - };
# @* O2 P. ^% R {) [+ i7 X/ w
" o5 w# L. O% R8 }3 x* ?4 l. f. V- $worker->onMessage = 'on_message';7 u) r, W: a& K& Z( p; ^
* b0 I( p( W7 D- function on_message($connection, $data)- m7 J* r& G0 p: f
- {) ~6 V# r; s- v
- $connection->send("hello\n");7 n6 R) x% c+ {) `
- }
% F2 V7 ]+ g R% B( B" K7 a, d& i) M
) M$ `6 i. ^" }# b) d x- // 运行worker( k; Y) |% G9 \4 `2 |4 C
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
* A! f3 P6 i6 a y8 L) `2 F+ w - use Workerman\Worker;* J, H, g# V1 B7 `7 W, B9 d
- require_once './Workerman/Autoloader.php';; b, O+ ]6 h! C
- // 初始化一个worker容器,监听1234端口8 G$ T" G1 T8 |
- $worker = new Worker('websocket://0.0.0.0:1234');
0 ]" C$ D5 d1 L7 h1 ]
6 s; @2 E% z6 L9 F- J3 \/ A- /*
8 G' ~6 ?6 h2 y7 w' W1 S/ R - * 注意这里进程数必须设置为1,否则会报端口占用错误. v% n" w0 R. @ E: I
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
, M& e4 n. k0 H1 X! p U# P) c - */% z* T" A% R+ g
- $worker->count = 1;
3 J9 F6 I1 Y( g4 _( y- h; j y - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
2 Z ?- r2 U! C0 Y- n# e+ e; M - $worker->onWorkerStart = function($worker); X, u2 d$ S+ A( i1 W' P
- {
& U( j5 B8 z- _5 m) p6 V- e - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
% A" Z \; ?& M9 Q: w8 w: f - $inner_text_worker = new Worker('text://0.0.0.0:5678');; G, K1 W0 m- z2 n) n& |
- $inner_text_worker->onMessage = function($connection, $buffer)
1 h! c: [$ r1 K - {8 K" w2 |& D5 p, M
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据* R1 x5 H# h. V! D/ ]' L
- $data = json_decode($buffer, true);" q. e: \/ o+ S8 D' o. J* B( w1 m* g
- $uid = $data['uid'];( E8 C- Z' J/ s9 T; Z+ n
- // 通过workerman,向uid的页面推送数据
8 W+ ?& Q- ~" }; ]# O1 X - $ret = sendMessageByUid($uid, $buffer);5 p1 h* C* P, M' u
- // 返回推送结果) a" u& B! Q% j, F l# `. M% |. X! v, j+ j
- $connection->send($ret ? 'ok' : 'fail');
+ |2 w& t) f, B- U8 r - };
! t- t! u& D8 c" V \: H c( C/ P - // ## 执行监听 ##. ?+ W2 Q4 S! ^& k# A
- $inner_text_worker->listen();3 A h* O8 _1 `& J8 J M3 ^
- };3 U; y$ D y2 V2 T) }% Z/ ?3 `
- // 新增加一个属性,用来保存uid到connection的映射
3 f: n6 v+ K9 z, W% K- ` - $worker->uidConnections = array();4 ]$ d6 y, Y" ?: O/ b0 a
- // 当有客户端发来消息时执行的回调函数" y; X6 `' l/ e3 X1 f; a
- $worker->onMessage = function($connection, $data)
. \$ {! o( G# J( `6 }, P - {
. C5 z2 L F# @7 F q) f) ]$ s - global $worker;
3 K, l& L8 i* A4 Z1 x& k2 K3 m - // 判断当前客户端是否已经验证,既是否设置了uid
. g( B3 V) ]7 n r1 d. u: o - if(!isset($connection->uid)), U" X+ y ^, Q) F
- {2 e, `% T1 M( Z; G% |: h$ z
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
5 e" l) S6 L" G, |' w. C - $connection->uid = $data;- R# l5 Q" v7 U% j
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
. Y& b4 a6 c) o1 l; l* V - * 实现针对特定uid推送数据8 {( R- l* x: }1 `
- */
0 B5 g" u8 F: ^7 f9 y" ` - $worker->uidConnections[$connection->uid] = $connection;
2 l! @' }' g% I Q/ `3 T3 n - return;
: v$ g* `5 x# B! S3 f% m2 @ - }
8 ~6 b( I/ L; K2 Q - };( t5 E+ u+ q1 B1 L
- 2 {& |$ H: Q. [& T; r
- // 当有客户端连接断开时* t* q. ?; u. ?% i I- i5 ]. W
- $worker->onClose = function($connection)
& x( ^. k' v" A* { - {, y9 V) w2 c# ~5 g) X: D
- global $worker;
U9 j& o3 i" l B - if(isset($connection->uid))6 f8 l. c. [5 Y7 ]: O
- { M P! Q: K a- p
- // 连接断开时删除映射8 {( G* O/ ?' k+ M, }1 ]! I
- unset($worker->uidConnections[$connection->uid]);
$ M' n* W3 O9 y9 r& t# u4 g3 ]- @9 W - }/ Q, o1 Z4 I3 ]* _/ L$ b
- };" o+ e: S( m, Q. T# I
& j! J, ~4 u8 W, p/ u0 Q- // 向所有验证的用户推送数据
( a! u( {; M, V1 u2 L1 Y& H - function broadcast($message)9 b$ P8 d/ m0 F8 R% w
- {
4 Z0 K0 c8 ^) C- N" _! m - global $worker;
2 b% c$ f3 q* T2 ]- n/ o8 l - foreach($worker->uidConnections as $connection)
4 w! t9 R& ?, y! |9 Z - {
! P2 n* y& Z6 W) ^. S8 s3 w. A+ a - $connection->send($message);/ D! e2 i9 A; c. `- m9 `6 w( l
- }" w3 h0 M0 ^/ I0 g2 j
- }
1 O0 X& g( q+ e; B" O
0 b* _) J" V! W- // 针对uid推送数据
0 z# p, {" N( _. O - function sendMessageByUid($uid, $message)
" D. w' C3 [0 i- a - {7 R/ F: T8 G. ^0 a! ^; W
- global $worker;
, c( r/ Q8 B4 `" P+ b- G8 ] - if(isset($worker->uidConnections[$uid]))7 Z2 W, F" K9 o
- {
( F5 [ H* S" h2 a# ] - $connection = $worker->uidConnections[$uid]; s) @* q, l9 L1 I! _) Y
- $connection->send($message);" p6 b+ ?0 c8 V0 Y
- return true;5 I7 o- N0 j! n: O! m
- }( Q) y! a/ k9 j4 H3 I3 l
- return false;8 `# A$ b2 L$ n( x' P
- }; v: P' y. a8 W+ r4 L. s
- & L5 ~5 b5 }8 c0 w* ?0 z
- // 运行所有的worker
% E0 R" L) o1 o) J5 h# ` - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');1 v: X* U9 f& i6 B: f0 |
- ws.onopen = function(){
s" e4 {: U+ n& i# N: P - var uid = 'uid1';2 z9 v2 a: `; a$ i% e
- ws.send(uid);' _1 g. R1 `; l% h! Y$ R, }! j
- };+ C- k+ ]0 v* ]
- ws.onmessage = function(e){$ W( {% N! {6 ]$ G4 ^+ f) }: e, h9 k
- alert(e.data);
' s# v, C `: ?# B7 _/ o' H - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
# }- o6 F9 S- s, Y1 ]' e, o I& P - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);& l1 j/ M" d: o G* s" [
- // 推送的数据,包含uid字段,表示是给这个uid推送
# B% p5 o: \# w3 j" K - $data = array('uid'=>'uid1', 'percent'=>'88%');& N! L) q# O! h1 m" p9 k% p0 C/ S4 z; ^' E
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
6 c0 d: }2 C$ [; K. X+ o - fwrite($client, json_encode($data)."\n");
" i( j& y- K- F" S8 c R - // 读取推送结果
2 x$ z9 {: j5 ^5 D! f* U" F - echo fread($client, 8192);
复制代码 / g" `( Y+ Q4 p3 K
$ S) ]! ~2 I+ l X [0 E& E" E0 d9 ]& H
|