- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;# C0 X$ |* H3 Z
- require_once __DIR__ . '/Workerman/Autoloader.php';& J0 T$ ~) @7 o5 C& G" M
; U; |3 e5 O& n: o$ n: F- $worker = new Worker();
/ F( {% L2 @+ L$ s3 q - // 4个进程1 k$ l I) F4 ]: Y
- $worker->count = 4;
8 w- e% c L: e6 ] - // 每个进程启动后在当前进程新增一个Worker监听7 @- _) o0 ?; ]5 G6 t f
- $worker->onWorkerStart = function($worker)) J/ k0 H i g4 `; K9 W
- {
; g) V6 i- @! { - /**
) M; O6 p+ H9 D6 f! |. J1 H' U - * 4个进程启动的时候都创建2016端口的Worker8 Z. Z6 M1 g- }- h3 C
- * 当执行到worker->listen()时会报Address already in use错误
9 y1 E4 S4 L# P& i8 V- v* H9 P - * 如果worker->count=1则不会报错6 @, k$ }0 v$ G3 u6 r
- */
- w4 W! A; g! C6 W7 m3 _, v2 \ S+ S$ r - $inner_worker = new Worker('http://0.0.0.0:2016');
3 L" c' x- k# P" J# [0 ] - $inner_worker->onMessage = 'on_message';
9 F3 L- W2 P$ m/ O - // 执行监听。这里会报Address already in use错误
1 q6 N( F- j; [2 Y6 I - $inner_worker->listen();
2 R- a8 E5 W+ a* Y% t - };
8 A: f3 F" a4 n - , O& v4 i' g1 C; {' T" V
- $worker->onMessage = 'on_message';' [$ }9 `3 [$ y* h1 B0 A) c
- / V2 m+ H; c _0 ~
- function on_message($connection, $data)
0 m& z, ^" I2 e6 u$ S/ T - {0 i" x4 o: [; T# ?8 ]% |6 l/ k
- $connection->send("hello\n");2 w8 R+ f- M% |+ D5 f5 u
- }
0 Q% H# [$ r$ H% D! d
8 Z) I) `! m! b( m9 H- // 运行worker
: z2 W. [3 f, Y* K2 G - Worker::runAll();
0 ]' I$ l) @3 E4 _9 w2 F - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 }: U f" B* n! b% q* w
- & I) o9 R& p' x1 d: t5 U6 m
- use Workerman\Worker;# l2 a* @: U N$ d( J
- require_once './Workerman/Autoloader.php';" I: i2 X: i% t/ k& B! V
P; M+ Y* n/ I( ^& E7 T- $worker = new Worker('text://0.0.0.0:2015');5 K8 K3 m" _% ^2 u! I
- // 4个进程! ~8 u, B. w3 c8 c3 I
- $worker->count = 4;
+ _( ?% D" d K! ]3 q* v - // 每个进程启动后在当前进程新增一个Worker监听$ X6 H0 K5 ?' z% w
- $worker->onWorkerStart = function($worker)
( U! |: `% x( X; h - {5 A& K) l+ v4 |6 N& B1 b
- $inner_worker = new Worker('http://0.0.0.0:2016');
+ x/ Y+ H, N0 x2 ?# `- s - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
% G" w/ B" m# N. `2 n t( r - $inner_worker->reusePort = true;
- H& D) |6 W# m3 i2 y; |/ l - $inner_worker->onMessage = 'on_message';' Z, _5 v6 W. f. C5 ^( O
- // 执行监听。正常监听不会报错
: _1 A6 I) c7 o1 y# |/ y - $inner_worker->listen();$ P! y: q. J$ L1 q. c1 R) o
- };7 A. R* r2 }1 [ S V- i
- , r% ~% I- u9 {2 a
- $worker->onMessage = 'on_message';
* W. e+ f+ E, }# Z L! I - 7 E6 w; y% ] h
- function on_message($connection, $data)0 _7 {3 |/ R" s. O
- {" ?" g8 [5 N7 s! A9 h+ J
- $connection->send("hello\n");. H. g0 {+ o0 n0 S0 i* ]
- }
4 c! I- M( F" s( R
8 C, I; k% |1 A- // 运行worker* |, J8 w: U% R' \+ {- p
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php! w8 {; g% R- R) C; `$ Z l
- use Workerman\Worker;
5 ?( c' D" D$ o# `& L0 v* B - require_once './Workerman/Autoloader.php';
6 W6 @! U9 k4 Y5 \ J* A* ^+ B - // 初始化一个worker容器,监听1234端口
+ _2 W( U$ z8 K" s1 ~+ T - $worker = new Worker('websocket://0.0.0.0:1234');
3 s" ?3 o; H% v2 D: g - * K; f6 K4 t& {! n1 W' E) F; n% D
- /*- A7 d* I/ Y: X$ h9 }7 L3 K/ e/ f
- * 注意这里进程数必须设置为1,否则会报端口占用错误
6 @. R7 i( A' P# B4 Q- R- e - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
1 }; O# N( q4 t0 r W! x - */8 `4 J U4 x; m+ Q% J$ \- {" I! v+ w
- $worker->count = 1;
% T2 C. P/ V c \) h2 r. L1 p - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, d J3 x! Q. o
- $worker->onWorkerStart = function($worker)
" U& a: }! y n+ N& G" Y( Q6 \ - {( B: G! g; Y, z
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符: M2 f) ?2 A0 j/ U* a
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
8 h: D! [% t# R O3 F. m3 {* i0 e - $inner_text_worker->onMessage = function($connection, $buffer)( |$ \7 s! R) ]- _
- {5 [* _ m7 c* \: U+ }
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据# l8 E9 K& V. {# r
- $data = json_decode($buffer, true);+ D/ G) H, M" q* Z: O
- $uid = $data['uid'];
5 b/ ^; L+ J0 R) o( }5 B8 s h* Z7 P - // 通过workerman,向uid的页面推送数据' }. j, I! \0 k+ J% E* Q
- $ret = sendMessageByUid($uid, $buffer);
6 G7 J& x$ c1 z; a$ V! E* A5 G) k! Z. ^5 h - // 返回推送结果
5 k0 B2 E7 ?' F4 ? - $connection->send($ret ? 'ok' : 'fail');' t/ j: l: ^% c% J, O }- s
- };
; ]6 M( e; r) c7 b8 J - // ## 执行监听 ##) Q4 `8 [- l' s, T
- $inner_text_worker->listen();
% }' `* N9 e7 i! E" J: u) z- x - };6 {2 o) a6 _9 A9 W
- // 新增加一个属性,用来保存uid到connection的映射
" I! h, f# M/ j3 ] - $worker->uidConnections = array();
9 J) k- Z5 P$ M& S - // 当有客户端发来消息时执行的回调函数
2 c5 B- f* \+ b% \* \- o) R - $worker->onMessage = function($connection, $data)- |6 \* |! x7 v: ~. A) S
- {- Z1 D3 v1 n8 ~; S+ p( k
- global $worker;$ S5 t: b2 ?; N& p$ T4 P) S: g7 P
- // 判断当前客户端是否已经验证,既是否设置了uid
1 P" ?; W* p/ p) t* ]6 H - if(!isset($connection->uid))" b( j9 v( g5 D) H/ y, c4 T8 r
- {( @$ ?' R2 A6 |4 k" G# L
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)! I" V$ j9 H- H$ F" F% [
- $connection->uid = $data;* x# ?1 r: C% N# {, Q7 `1 [
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
+ u: v7 m* F7 _! V) ^: o% A - * 实现针对特定uid推送数据/ q/ P9 P2 j6 K
- */
# O# \0 \5 M9 Q" L- [7 R' w - $worker->uidConnections[$connection->uid] = $connection;8 o J+ q' B) q0 [- d, g X
- return;
\9 j% T* u1 L) H) M - }
0 q, B0 o# [( d - };
/ y8 K9 @ H( F& h- v
& [! ^- u7 c& N ~# R- // 当有客户端连接断开时( Z9 Z9 ?2 i6 ^) `3 n+ t4 H) m+ X
- $worker->onClose = function($connection)+ k$ `6 W9 Y+ j) P- j
- {- c8 T: v$ U# l- m
- global $worker;" a2 e4 e @* v1 X
- if(isset($connection->uid))# D, F( a8 _! `
- {
- g9 V3 R1 a7 I% c - // 连接断开时删除映射" _6 P& @* o l% Y, L
- unset($worker->uidConnections[$connection->uid]);/ Y; T) U; u; E2 l$ S
- }
* g7 R# Y, h5 l+ U8 { - };; H% U, {) M6 S9 h7 \8 p
- " A, u; w% D$ H
- // 向所有验证的用户推送数据8 A# [- d/ `+ _) N. h( I5 `
- function broadcast($message)
* [- k. z# e: C# L3 a$ H& n5 s - {! `# ?2 k2 I; c, v0 Q
- global $worker;
# o$ |4 P: }" `, x) l - foreach($worker->uidConnections as $connection)
4 I) ]# Q0 g6 U$ Q0 ? - {
" m7 }- T. i2 |& d. r - $connection->send($message);/ m( v/ A, w% P: {3 E1 z5 d
- }7 a0 s! d( H8 B7 y, `
- }0 v2 l/ C' H6 P l$ i8 Q( j
- ' k1 i. [& X, _6 e) s* A
- // 针对uid推送数据. c: F; v% V- i9 N1 r
- function sendMessageByUid($uid, $message)- y4 q8 F: n- i- o' ^) n
- {
8 f+ a# S7 X+ R9 b - global $worker;8 h7 w9 s. x' N, B2 j! o
- if(isset($worker->uidConnections[$uid])), Z3 `/ f) o/ U8 H& \
- {
! S _# P2 i" Y8 @, {- g- }/ u - $connection = $worker->uidConnections[$uid];
; P( ~8 q$ r) `" a, B1 D - $connection->send($message);
7 s9 c$ {1 {& n7 v( ~" |/ P - return true;$ A" j: C* o* P2 S1 `' J# i; M
- }
# H$ z7 c0 s4 s+ k1 g7 {& Z - return false;. c3 k; f- h% e% L: U( V
- }
/ @; O: g; Q h; Z! @
% |! Z) ]9 O% c$ C ~- // 运行所有的worker
9 Y+ w% j2 x6 c' s X - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
* m: H/ s# T2 ~" ? - ws.onopen = function(){
9 T$ T1 h+ \3 d- A* ^0 S2 p - var uid = 'uid1';6 O$ u% |7 |0 Y9 r
- ws.send(uid);
9 ^: R$ t+ ?/ W2 s/ D' O5 q* G+ A - };1 x' X; K. L" p5 X4 t2 _
- ws.onmessage = function(e){( H9 h, y% z2 q' q5 P2 H) I
- alert(e.data);
' f5 E9 H, H8 N- h) h( k - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口7 o! C% E7 J: H
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
/ }& Q' D/ X3 [: | - // 推送的数据,包含uid字段,表示是给这个uid推送% o: R9 `% j# o5 M+ E
- $data = array('uid'=>'uid1', 'percent'=>'88%');
! d' O) u- U+ N; ~- Z - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
) h9 x1 ~ Z9 H) r% t f% t" M$ D - fwrite($client, json_encode($data)."\n");) `% L. P- n/ E4 r
- // 读取推送结果" [7 L% \/ Q/ Y7 I3 c8 N9 u) _" \
- echo fread($client, 8192);
复制代码
9 v2 C6 r& }9 k% Y- i$ y& T+ d7 [$ b _7 w3 P8 C# \+ ^
|