- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
- F: ^+ h* \& R. n - require_once __DIR__ . '/Workerman/Autoloader.php';
6 |) L" z j: ^; D2 G" v6 c - ( a7 |2 Z: u2 X0 z3 b8 N
- $worker = new Worker();6 F0 A& a# B; s Q. V
- // 4个进程. }- z& y4 F, b" W ]1 L2 @
- $worker->count = 4;/ ^" ^: W# M% B" Q- c
- // 每个进程启动后在当前进程新增一个Worker监听4 ]: P+ D& |& C( @7 Y/ M
- $worker->onWorkerStart = function($worker)
4 N4 b; G9 T; N, L. l, P - {' b/ E% y: {" V9 V2 D2 ]
- /**
, |' ^# d7 T/ l0 B O - * 4个进程启动的时候都创建2016端口的Worker0 o8 B7 Q H! Z2 `5 |2 W
- * 当执行到worker->listen()时会报Address already in use错误
. v/ {7 _" W" J; Q- t" G - * 如果worker->count=1则不会报错
1 S9 N% M' }# \8 l - */6 F2 p% _5 o- H6 f9 P* Q5 K: ]
- $inner_worker = new Worker('http://0.0.0.0:2016');3 F6 | J# r3 p9 `! ~! y
- $inner_worker->onMessage = 'on_message';' M' {. a) q2 H$ M$ b
- // 执行监听。这里会报Address already in use错误9 W. Z9 {4 a% `; `9 {/ B$ @. c
- $inner_worker->listen();# d p* L e9 s1 H" u+ G
- };' T2 U, O- g+ ?" H2 G u! C I
- " r" _% u9 K3 L4 C
- $worker->onMessage = 'on_message';
% c( J- Y/ F) ^$ H) g( S+ E9 N - ) Q( Y5 |6 l" x9 ^
- function on_message($connection, $data)2 l6 ]: a* R* N# w- {2 q4 C. A- D
- {
7 t# m/ q5 }8 O, f+ R4 w4 { - $connection->send("hello\n");
3 S" x. R: [* [ A- B - }. X1 b' Q$ c- Z3 A1 u3 R: J- _7 Z
- - e4 t G& U' P7 F! @/ L7 x
- // 运行worker
& L) C) I8 V8 `# w4 k - Worker::runAll();
7 M! Y- q2 P- v; H( @; X - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
6 i, n3 J) K M
2 Y' r a0 |% Z( a- use Workerman\Worker;5 j' }- V5 `, f. F, `
- require_once './Workerman/Autoloader.php';
3 D6 _* I r6 s! ^
, K: V5 |$ ~4 u% Z) l- $worker = new Worker('text://0.0.0.0:2015');
5 F$ {# {9 B* Z. r# a - // 4个进程
# b Y" u3 _! ~- K - $worker->count = 4;
+ X( M0 x; ^) {5 j2 c6 T" ?. m - // 每个进程启动后在当前进程新增一个Worker监听
% G; u9 V" N4 B: L4 h# b - $worker->onWorkerStart = function($worker)$ V& e' G [' V
- {
4 @7 H# x( s% ~! c" ` - $inner_worker = new Worker('http://0.0.0.0:2016');4 E. P+ H$ ~- i6 f# L
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
" O' [! O6 V$ _8 o M) r# t9 M - $inner_worker->reusePort = true;/ w w0 O/ Q2 u# o! f
- $inner_worker->onMessage = 'on_message';
9 y3 h. Q. @( P9 G; H [ - // 执行监听。正常监听不会报错( k! f w& P$ f+ R4 Z
- $inner_worker->listen();( z' P H4 L; n; m# d
- };
( C3 g! K6 D, I% o+ c$ n - & `; v2 A0 u E+ m
- $worker->onMessage = 'on_message';
3 O9 l5 X [. d2 e# u7 \
8 k5 j, O- @" {: g0 I- function on_message($connection, $data)
2 v/ q0 e8 h! }, z - {
D. E4 D- |4 ~ - $connection->send("hello\n");
8 y% p6 z8 a' P7 @: I - }
8 g8 Q) ^! e3 i6 B8 C8 L - $ n, h9 X9 F# ]3 }4 T" |! l% d
- // 运行worker
. ?& Y9 D2 X6 y/ g - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php/ ~1 {* |8 b- Q0 u
- use Workerman\Worker;: D! |6 s& g2 g, x7 t+ U
- require_once './Workerman/Autoloader.php';
& J0 P% ^$ \% {; b0 x) B9 b6 t - // 初始化一个worker容器,监听1234端口, Y* z3 w; C' Z2 L0 k+ D
- $worker = new Worker('websocket://0.0.0.0:1234');/ W; D0 J* y8 x% A: A) K) p$ ~) ^
, j& n3 O" F- B9 m, o6 Z. O3 h- /*
& G; s# B4 z n; r' O+ Z0 @ - * 注意这里进程数必须设置为1,否则会报端口占用错误
& ^6 q* ?& V d6 R - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
* S$ f. U* D5 m0 E3 H - */
1 g# K2 x; |9 z3 u b) D - $worker->count = 1;* h: ?, D8 D' `$ t) L
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
q7 D+ |, V H ~& n! ^ - $worker->onWorkerStart = function($worker)
1 J7 D4 ]6 d. i+ `4 ~, h - {+ }" V" L1 L+ a1 c
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符) D5 R) T- Z, q7 V4 k: @
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
8 Z/ @9 [( E6 B* P, t - $inner_text_worker->onMessage = function($connection, $buffer): T9 g* l9 M6 o7 S$ E
- {( z# ]% j- P- T) g) Q7 T
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据' X2 q/ k$ n# E0 y8 Z! U4 X
- $data = json_decode($buffer, true);
) @: m& \; T2 `- C' s/ S0 I - $uid = $data['uid'];5 \- O1 l5 }( f! X( x( `
- // 通过workerman,向uid的页面推送数据
; B6 l1 B; w& B" |; d; v - $ret = sendMessageByUid($uid, $buffer);
9 q$ q7 n3 L6 Z; C - // 返回推送结果
- Z0 N) _ [! U u) H6 \ - $connection->send($ret ? 'ok' : 'fail');$ }( p ?; \4 C( d8 W, _% y8 ]
- };
5 ]/ |: I! A. i - // ## 执行监听 ##! O: Z; T4 q; r z3 n
- $inner_text_worker->listen();, N5 d7 a; b }+ O* h! o r9 P
- };
) a7 x; x& _, `8 f# g3 R) g - // 新增加一个属性,用来保存uid到connection的映射 D: Q- A: `" x; C3 J) i$ s
- $worker->uidConnections = array();% o$ K. k! p }( v0 O a
- // 当有客户端发来消息时执行的回调函数1 {4 J+ ~8 c& H8 r8 m( b
- $worker->onMessage = function($connection, $data)7 e3 L& E) L, r4 i* Y5 K' m
- {6 L; d: \6 W* d( T
- global $worker;: w3 U7 f4 Q: Y. _( E7 ]
- // 判断当前客户端是否已经验证,既是否设置了uid
; W9 [; |4 h# c7 `4 t, Y - if(!isset($connection->uid))
; _' P& W/ o. a; F1 T' S - {0 y# J, R7 @9 y; ~7 U8 c. T
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)8 ^3 C. @+ }6 t1 r, J
- $connection->uid = $data;1 k+ \* \' c" u% l4 k1 P7 c
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,( w( Y" u0 \2 G
- * 实现针对特定uid推送数据4 d# D1 A$ ?' ?# C7 E
- */
% s' i7 p$ o# @/ B7 y. G9 u( K - $worker->uidConnections[$connection->uid] = $connection;
$ y( i; |4 H) K, ]) z/ x - return;
. G! w' M: F" u$ f7 A/ [& { - }9 N- s+ r& k( s6 C- U
- };2 F6 x, a7 N. t6 T
- ) g5 _& \/ S+ q) T
- // 当有客户端连接断开时
- c, X2 G+ e$ I' c - $worker->onClose = function($connection)
( _. g% }. B: Q# d; Q4 \ - {4 @7 n. s" a O5 T0 ?- B
- global $worker;
% F' q. L% o# V8 ~% Y7 {+ u - if(isset($connection->uid)) V; N; l0 N. i) d& X5 q& G
- {
4 ~6 C3 ~6 G: r, i; Z: H - // 连接断开时删除映射
% O: k7 I" w6 W% c2 @& S$ c - unset($worker->uidConnections[$connection->uid]);" V: \1 u/ ~: Y1 e7 r- }+ h/ E% |
- }: N' B7 T5 M" E" w8 Z5 A/ M
- };
; s3 W8 ~3 Z Y9 m6 c. Z7 W - $ J2 [9 D/ c" [7 n6 w
- // 向所有验证的用户推送数据- u( d; b* n. x' M0 O) Q [4 S
- function broadcast($message)) Y; }* a7 j% E- v9 P
- {# K7 N$ F: j$ j
- global $worker;0 K' k2 o- W/ \9 w4 R8 X) a m
- foreach($worker->uidConnections as $connection)( x& x9 T$ `) |5 |- i) c; s% M
- {
+ F! K% I( I8 [; z - $connection->send($message);
5 L5 K$ v" ~! _. W* r3 N( i; k - }) p) c7 }* C2 T- ?/ F `
- }
& }2 k7 n& P/ [; d
0 S2 Y6 `1 d W$ h1 p+ z- // 针对uid推送数据 q5 s. A$ N$ u5 v4 _$ D
- function sendMessageByUid($uid, $message)3 @1 i7 K: i' o) l1 f( F
- {
, ^9 C5 |5 @6 ?1 m: x - global $worker;
& U8 m0 \5 j0 K# ] - if(isset($worker->uidConnections[$uid]))
6 M2 T+ n6 p/ A& I - {
' E5 k2 G/ K/ M* w! \: l& [ - $connection = $worker->uidConnections[$uid];
0 U; N! u, T/ a& ^$ K1 a: o% C - $connection->send($message);
( u b. z; r" [( H: \/ y - return true;
/ H, C4 j8 Y8 B. s2 z+ a7 h" V+ ]. q - }
. f% j5 G( c, f, e9 \, W: [ - return false;
4 s* S7 I4 d* l( A( ? - }
# ]4 }7 s9 S }# U# p - $ D" s+ a3 A1 C4 t' F* V; `( p
- // 运行所有的worker Y+ U( f" Q' {4 ]8 Y
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
/ @+ W) y( R5 m - ws.onopen = function(){ K. n Z$ V9 B4 d u' l
- var uid = 'uid1';
6 v6 G9 w+ h8 B2 F0 O6 D - ws.send(uid);
, |! p& H$ k& g$ F; [- \0 a - };/ N, `- o* _# I3 I, r4 s8 _7 U1 j9 A
- ws.onmessage = function(e){; A4 {9 z+ ~' i" D; {6 t) w
- alert(e.data);
# _+ h* Y. L- n ^- ^4 i5 T! i - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
( a' d+ S& {% O - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
) m$ M0 _' S u2 P - // 推送的数据,包含uid字段,表示是给这个uid推送
' ]( I: h& u( `) D. c - $data = array('uid'=>'uid1', 'percent'=>'88%');
$ J* f5 L& o- N1 n - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
% _6 S0 `( `1 ]( k2 r8 q5 \) f - fwrite($client, json_encode($data)."\n");. M4 X$ W/ J* X) r, b
- // 读取推送结果
8 O; V1 R0 {4 |! ?& k$ x* n - echo fread($client, 8192);
复制代码 1 L! u. ~6 t" ^2 T& E% m
0 c h$ B' B8 p+ A" Y. q3 N |