- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;8 Q; c u+ K# Y& `
- require_once __DIR__ . '/Workerman/Autoloader.php';4 q4 N( I4 z7 ^- ~
- ' o1 P/ K, T( M' ]/ A' \
- $worker = new Worker();; X1 ~5 i, L; |: P( u7 a# h+ U6 T
- // 4个进程5 ~6 ^% C H& ~/ ]
- $worker->count = 4;
# ?7 U6 t0 d) [4 t2 } - // 每个进程启动后在当前进程新增一个Worker监听
) k9 g \+ I$ @) J - $worker->onWorkerStart = function($worker)* L! k1 E- z: d% W
- {
/ L, ^% f% P8 A4 w) x4 m0 ` - /**
: ~& T! L0 L" Y$ H, p - * 4个进程启动的时候都创建2016端口的Worker. O. K3 u c% m
- * 当执行到worker->listen()时会报Address already in use错误+ R1 ]; ~; r4 V, C0 X0 F
- * 如果worker->count=1则不会报错4 c# i4 u5 n1 g$ Q
- */
9 d a6 ?. x1 ]$ U) ^ - $inner_worker = new Worker('http://0.0.0.0:2016');* g1 g( c! ^3 D" l5 K& w4 ?
- $inner_worker->onMessage = 'on_message';6 P3 r( M9 ]# Z
- // 执行监听。这里会报Address already in use错误3 y9 j# z+ X2 z
- $inner_worker->listen();
2 c+ \7 x# m+ w - };$ b. ]! ]( O$ S
- + r( A S( ^5 E+ m; h0 @
- $worker->onMessage = 'on_message';2 F, G& {5 i: Y4 B
) L1 [$ X$ P% h6 @, q6 W- function on_message($connection, $data); K6 ^, z$ u. b/ E
- {! q/ y" }) V& ?" S1 w1 _
- $connection->send("hello\n"); S8 D0 @/ Z4 U- f& P& l8 d
- }
" J( t" y+ c/ H7 p! |' B3 N8 U' S
8 p# k i' w1 U3 w- // 运行worker
6 [6 `* `& ^: [ - Worker::runAll();) F+ N+ F8 j: w( a4 K
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:+ y+ {+ {2 q7 j& q( O8 `: R
& P8 x, w8 x* w# R5 Y3 V. `- use Workerman\Worker;
: o) D% F, Y8 e8 ?' F - require_once './Workerman/Autoloader.php';6 E/ F! e7 q& T# x4 j/ s3 i1 a
- / W- F8 j3 c I' l% ] m8 i' A9 Z
- $worker = new Worker('text://0.0.0.0:2015');
# Z+ t/ I5 B* W' P: n - // 4个进程) f3 w# l0 v. a& I, G- S" N
- $worker->count = 4;
8 i7 P1 b, Z* H( Z, }4 Y7 S - // 每个进程启动后在当前进程新增一个Worker监听
! S. J" S0 t9 ~' f% { ]" I: } - $worker->onWorkerStart = function($worker)1 |) V3 U8 K- y" b0 p# z/ E
- {' j% V2 V1 Y+ o( C
- $inner_worker = new Worker('http://0.0.0.0:2016');
' c1 v' q( u' v4 i* q/ c - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& q6 ]; i& K0 J2 t- C7 f; C6 T
- $inner_worker->reusePort = true;
( V9 Y6 ~% Y4 j% X. O - $inner_worker->onMessage = 'on_message';" J0 v" P; } C$ K/ d
- // 执行监听。正常监听不会报错
9 y' ~3 [1 B% h4 |0 w - $inner_worker->listen();0 b+ A" t3 o: k; G8 ~6 ?
- }; u5 c' R( k, S9 L' J
- * [4 `7 m7 g1 ?4 w8 Z* Q6 g" x/ s1 u
- $worker->onMessage = 'on_message';
5 D. O D5 {3 w7 c/ j$ w7 ^; X2 y
3 d# z7 h# Y1 Y& v- function on_message($connection, $data)( ~( n3 [0 O4 H
- { r' x' z3 m/ g. M
- $connection->send("hello\n");* ^" O" q7 y0 `. P T
- }! U& L u- F! M, Q/ K. |' \
- ; p: b- i3 A& e
- // 运行worker
p4 m6 Z+ D1 j I8 x - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php, _! r9 R$ z7 ]
- use Workerman\Worker;1 w2 h' O# m& }# [/ P
- require_once './Workerman/Autoloader.php';
" E: c% ^' A# `+ Z - // 初始化一个worker容器,监听1234端口
5 _- k6 I# j! U. \. ` - $worker = new Worker('websocket://0.0.0.0:1234');4 @0 Z! M; f Q- k
- ! R+ l5 O! i9 q" q# k2 h
- /*
0 j E4 Y% e( A( E' | - * 注意这里进程数必须设置为1,否则会报端口占用错误8 t) p* A+ K# D$ B
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
& S, a5 B% E4 D K8 z - */# n ]) T, z E9 S# t: y
- $worker->count = 1;
, G8 F7 T( `$ R3 |) }& i( j3 K1 [ - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
' S5 Q1 { T7 k- g Z - $worker->onWorkerStart = function($worker)
5 Y2 i: q! j @+ S - {5 i1 H9 z) K$ B9 I, o4 q
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
" U# S7 y5 o7 H n7 B - $inner_text_worker = new Worker('text://0.0.0.0:5678');
G- j; W# i/ y1 D% u0 ]8 [* s - $inner_text_worker->onMessage = function($connection, $buffer)* q; Z2 S7 C2 M) R+ P$ o' p. U
- {
- I) @/ G; V2 K' g - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
1 d" H# P, U' M& z; ^1 q - $data = json_decode($buffer, true);
" @7 t# x8 d& ]. H% ~+ c4 D5 {6 _9 }/ A - $uid = $data['uid'];
; |* [! t' e5 }. S3 g- T - // 通过workerman,向uid的页面推送数据
/ ~. f; w* `; s$ L! b3 Y9 u - $ret = sendMessageByUid($uid, $buffer);1 e1 l) u7 c1 A* U( i: M+ b0 T
- // 返回推送结果" z4 y" T3 \9 u. Y: O
- $connection->send($ret ? 'ok' : 'fail');2 R6 m. |: x1 }8 U% s# {
- };
# R. L' c, i7 j: d' R - // ## 执行监听 ##$ C5 p5 C8 U8 }/ g2 d- G6 u6 W
- $inner_text_worker->listen();+ w$ m5 H: |+ A0 J( @" k8 ^9 z) i
- };
4 v- F% u1 s4 M. s3 N2 P - // 新增加一个属性,用来保存uid到connection的映射) F! z9 k4 Z5 k L9 U8 J4 n
- $worker->uidConnections = array();
2 P* P% v( h% R3 C( ~ - // 当有客户端发来消息时执行的回调函数! p5 U j) C' z0 B) R6 E
- $worker->onMessage = function($connection, $data)
. O; q8 J2 Z$ B( ~1 r+ D - {$ K( P5 ?7 F! g9 b2 `/ {! n
- global $worker;
2 e. G# {- Z& O9 Q# j - // 判断当前客户端是否已经验证,既是否设置了uid
& K' P3 I5 ]# Q1 p! F - if(!isset($connection->uid))
7 N X2 R4 q$ X8 b3 E: f - {4 C6 C& A1 }- \. g9 }: \
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
% e& e7 G. [# d - $connection->uid = $data;$ E9 w6 v. V) }% N' o' t4 V
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
+ x3 b# d- @8 C+ }. H" ^ - * 实现针对特定uid推送数据
, H- w9 w; a9 ?1 T0 Y- J& i - *// \, e) M9 v3 L$ j. x+ l
- $worker->uidConnections[$connection->uid] = $connection;% i$ B; Y1 q. Z$ ^: N% p
- return;
0 B* D, Q1 y, u0 o - }$ K7 l. b5 B; J+ W
- };* ?4 o$ p# V1 i$ `, f
- 2 u5 E1 U3 P7 j6 }5 l6 [* Z
- // 当有客户端连接断开时
; @* o0 y/ F. d2 G6 i% k - $worker->onClose = function($connection): V# x2 m0 P# e7 w
- {
0 W. j h. i9 j( Z6 O) } - global $worker;
' U' X% p+ `8 v: s! g; e) l - if(isset($connection->uid))4 S0 `" ~6 m" P( c! a- M- u' Z
- {" j5 X" I+ A8 _; }$ J
- // 连接断开时删除映射* y8 T6 X' H: e
- unset($worker->uidConnections[$connection->uid]);
/ V: E! a* \! @& x$ C; I - }
$ G! s! }2 g& e; v) N# { - };. N5 S1 B) n/ c9 ]8 S9 c8 y% [
- - x: ~/ [! `2 P7 p- s% h% R
- // 向所有验证的用户推送数据0 X$ X8 X( P, B
- function broadcast($message)* @, }- K, f. {# O X9 W# I; k
- {% E& A7 a& I8 E& U6 v) {" Q
- global $worker;: L: @9 C8 R t! y- \
- foreach($worker->uidConnections as $connection)1 g4 O7 |8 K' o. Y8 \! T0 f5 s
- {
0 x5 u/ R3 F* F% ^' u) s9 O* i; @ - $connection->send($message); V8 U& X) w2 n
- }
; T' \ C) H0 P - }
* }* G/ _& I9 X2 j9 h, l/ D
4 M. e$ G% f3 Z3 Z% S- // 针对uid推送数据
$ ]& E' a, b! `; I - function sendMessageByUid($uid, $message)
* g* k! q8 @5 O9 y! T - {2 h; u& ^7 q8 s# ?* F
- global $worker;
6 k7 f8 Q5 T. ~( f) _, {) s" g - if(isset($worker->uidConnections[$uid]))+ N% W% n0 M$ B- F2 _6 f' h
- {1 b9 _4 A6 M" r
- $connection = $worker->uidConnections[$uid];
6 s$ J7 Y* y0 d, O - $connection->send($message);9 R; a) y0 S4 ~8 q x% V: m
- return true;$ G' r3 d/ c5 {
- }
1 P! g9 O- Y; E; o# [& @ - return false;
5 A; u- r& Q& P; f2 S0 f' F - }
- R& _- L8 B6 @$ p. w' |
8 r. [" C8 z* [/ m9 {5 \- // 运行所有的worker
, L8 E) F9 b0 x7 O, U- e - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
0 A) q- R. r/ I8 M- @ - ws.onopen = function(){1 I/ w/ t: n1 J. W/ h
- var uid = 'uid1';, b4 Z( v; k3 n: W, c" B w8 I
- ws.send(uid);& H" C2 Y/ Y9 E# \+ s
- };# J( k9 S; B; L8 ?/ _
- ws.onmessage = function(e){! v' B) O4 I1 R# z- v
- alert(e.data);* ?: W4 b$ g% G
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
9 c5 f8 A9 U8 h- U$ r6 L - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
8 n6 T5 W! B3 Q - // 推送的数据,包含uid字段,表示是给这个uid推送
Q3 a6 s5 x/ K) b - $data = array('uid'=>'uid1', 'percent'=>'88%');2 B# P- p. n+ O
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
* f8 @" |5 a+ F2 B4 P - fwrite($client, json_encode($data)."\n");
' s) o6 c" z8 x9 T/ ]" k - // 读取推送结果
2 o7 b/ I. }+ I/ @ - echo fread($client, 8192);
复制代码
6 l# z7 K* m9 i) I4 n N3 \' m
4 g) e) ?+ r( ^) u |