- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
5 z* W, S O& U4 d( n. a7 ^) z - require_once __DIR__ . '/Workerman/Autoloader.php';2 U2 l- ^0 e7 T1 ]1 s1 ~+ W4 z
- - x9 x0 V' J8 Z- P! A
- $worker = new Worker();# O8 w! v& M: z" ~5 `8 D- W
- // 4个进程1 E- P4 c& Q6 V- `- R& K3 A. ]2 P
- $worker->count = 4;
4 o8 Y6 a' l) z; N, E7 q - // 每个进程启动后在当前进程新增一个Worker监听! @2 n: z9 `$ U, I: |% X6 d, P( y! \ ~
- $worker->onWorkerStart = function($worker)
5 g) d7 u) V. o/ T) p2 U6 k8 i - {7 |1 v7 v! d: |4 _* \
- /**
3 R3 ~+ E; l$ l - * 4个进程启动的时候都创建2016端口的Worker
( F, J5 \4 X z3 `5 X; t( a0 C - * 当执行到worker->listen()时会报Address already in use错误
, S3 m$ c! Q" W! {) e2 a. K - * 如果worker->count=1则不会报错
1 L0 O. y" w0 K/ j - */% z& M$ t1 S8 B$ p/ X: @
- $inner_worker = new Worker('http://0.0.0.0:2016');5 F: O5 l2 X9 c" B, T$ }
- $inner_worker->onMessage = 'on_message';4 c/ W2 q. E! }6 P" a
- // 执行监听。这里会报Address already in use错误# ~" {. C7 N# p+ b# V
- $inner_worker->listen();# A& U2 y7 n( ?' U0 s9 W
- };
9 i$ P" k8 M: Q, f8 v
5 W. j3 R: }& r% `8 I- $worker->onMessage = 'on_message';
, v% Y' e6 e: L: W7 h- _
3 D% o, k' [* @- function on_message($connection, $data). c, N) W, d( I! x, s
- {5 b0 Y5 d+ z- v) s3 x a
- $connection->send("hello\n");5 z$ B6 L ~+ w0 H' ~
- }* e' t! z2 I9 P3 h
- : k3 W9 I9 h: G S( D
- // 运行worker
0 s* _, s7 K4 |# x5 G1 L - Worker::runAll();5 u( X$ v) ` v) x, q$ Q
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
% z1 j% `4 ]5 D% V6 F - # b4 M- S V* _5 ^( Z" d% v1 Q _
- use Workerman\Worker;# y4 @( T9 G! L3 R
- require_once './Workerman/Autoloader.php';0 `. a/ d& d5 X8 m! M
- V& v* L5 T0 @1 P( @$ k/ V: Y, E- $worker = new Worker('text://0.0.0.0:2015');( M9 c0 ^# _# t8 M; A. _/ H' P
- // 4个进程2 b: }* @- H( e* V
- $worker->count = 4;
: _7 P3 a- `4 m9 ?: _8 u; S - // 每个进程启动后在当前进程新增一个Worker监听* k. i" p/ T, z* o" E9 _
- $worker->onWorkerStart = function($worker)
' ]7 f( c( U6 X) a" r' | - {
. v& k/ v) E- k - $inner_worker = new Worker('http://0.0.0.0:2016');
. v0 j# `4 ~/ m - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
+ h( M! \9 g" j - $inner_worker->reusePort = true;
/ F+ Z9 g1 |' |7 f - $inner_worker->onMessage = 'on_message';
1 A$ B, {' B4 R2 N( X' X - // 执行监听。正常监听不会报错) {9 E1 G# V9 f1 f* h( d! N
- $inner_worker->listen();9 K; E% U) a! B
- };# ]7 {3 a8 A, {1 V1 ~1 h/ \
- $ h$ H# a$ y+ p3 d. O# A7 U! l- N
- $worker->onMessage = 'on_message';
# G: n' a" a: N1 p' t% @& g1 d
/ _0 e& i1 l$ ^+ h+ \) _5 W7 Q/ Q7 ?- function on_message($connection, $data)
4 M# ]! K) x* F6 x6 j - {
/ ^: F/ b ]# H8 F- y/ \ - $connection->send("hello\n");/ M$ }2 X8 C; w' o- C, h" E- t7 E5 K
- }
% I9 G3 T8 j; q8 _
2 T: k" k- _" y% ]! u2 |- // 运行worker* }' ? ?4 k: r* M# C
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php* f+ k) C! c( r$ W) h# a3 Q/ G
- use Workerman\Worker;( Z s/ \9 [& o$ U; f; g
- require_once './Workerman/Autoloader.php';
* j; J( q8 U( N2 e; ]3 Y& b2 j4 ` - // 初始化一个worker容器,监听1234端口3 o+ ]6 G, h) k, g) M4 D' s
- $worker = new Worker('websocket://0.0.0.0:1234');8 U. m2 d4 a, E! F; ~/ b- L+ s6 M
- 7 ~3 C# U1 {8 e2 ?& M5 R9 b
- /*1 n* q" A- Z1 \$ t
- * 注意这里进程数必须设置为1,否则会报端口占用错误
' i9 H/ O. a+ K0 W1 F4 `& L0 } - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
$ ?! f) X1 t* ~! D1 x4 y7 L - */
. T, Y* }+ c b) c; Q5 B1 E - $worker->count = 1;
6 N! ^' B# [3 o- `3 p# Q - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口' _4 ? O' U6 {% p5 A& W+ _. A
- $worker->onWorkerStart = function($worker)
3 z0 P L3 L$ x, F$ w - {
1 R* Q/ T. z6 I+ j! A i - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
0 W% E6 U1 H. D - $inner_text_worker = new Worker('text://0.0.0.0:5678');
- v8 ~$ S) Q+ `5 h# H0 t2 {, g - $inner_text_worker->onMessage = function($connection, $buffer)
6 P# `& v4 U7 R, _; G8 i$ O k$ ~. e - {
) A7 u9 H n- N6 F3 |4 g5 i - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
# E/ z; @; l; _% H3 V - $data = json_decode($buffer, true);" Y$ B% u! Y* w
- $uid = $data['uid'];
/ @8 j* t, X" _- v - // 通过workerman,向uid的页面推送数据1 L! f+ `' R0 e I- @6 Z
- $ret = sendMessageByUid($uid, $buffer);+ e- v* Z9 [/ @* f: w8 Q9 B3 `
- // 返回推送结果( l0 X1 ]3 [$ u
- $connection->send($ret ? 'ok' : 'fail');
2 Y2 [0 u7 ~: f3 e - };
* M/ H+ M) `# y$ ^: \6 `) V1 [' l - // ## 执行监听 ##
9 M) R1 h& r2 Q5 h4 O - $inner_text_worker->listen();
4 Z: Y- i1 L9 A7 r3 G; r! C - };8 j6 i5 f' a$ a1 [; ]. u
- // 新增加一个属性,用来保存uid到connection的映射
* j7 n, S3 ~4 p0 o, P - $worker->uidConnections = array();
* o- }* I D' F$ O - // 当有客户端发来消息时执行的回调函数( _0 E0 z1 C. X$ C9 B
- $worker->onMessage = function($connection, $data). C# r9 C! ^6 U) a& ^- }* y
- {6 U6 s% o# q( C5 g ^ L
- global $worker;
/ n" Q: T: S+ T1 s4 N1 _2 W - // 判断当前客户端是否已经验证,既是否设置了uid
/ H5 g1 b1 {1 p# W0 |" N8 N6 ^ - if(!isset($connection->uid))
# h3 V7 k4 W* ^& ~, G2 ~' w" [ - {$ j7 {; ^ c* a l: l
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)$ _+ L/ m$ H% i) j4 W4 R1 e2 g
- $connection->uid = $data;
# W* [9 y) f% x8 I& r6 l! Y# J5 | - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
3 c* y3 X0 L/ x2 s( x - * 实现针对特定uid推送数据
( _# f( T: i6 s2 h2 b6 x - */
, X. X# a9 x k - $worker->uidConnections[$connection->uid] = $connection;
+ K: I' K* ]+ D- j - return;
0 b4 }$ x1 A, S" P: P - }
c1 e. r) l# u8 T# |/ u - };
% g9 I$ i6 U, A# i
' C7 Y* \8 T7 }$ E+ o- // 当有客户端连接断开时
/ p- G2 u# S' Z- q8 p - $worker->onClose = function($connection). ^5 Z z- S( K4 R7 U9 x
- {# G- o s7 u3 |* F8 y
- global $worker;
6 \6 F( ^2 R- o' j; m: `9 L - if(isset($connection->uid))
! v, g x9 ^4 e# G6 t) W0 W - {
5 |) j" C# i3 }* V& ?7 Q - // 连接断开时删除映射
. V% ` @ Q" R, ^) ]+ C - unset($worker->uidConnections[$connection->uid]);
8 C" {, x8 [4 v0 W1 G - }
. Z8 S% p8 ~5 V6 J# R4 a/ Z - };
; v5 `% m# \4 ~, c; P* b - ; I$ ?3 \& k; [- i. x4 p
- // 向所有验证的用户推送数据
9 c A' \ j* L/ [/ {, F - function broadcast($message)
/ n' K d/ K. ]& c - {( }; ]: Y7 W' p9 d
- global $worker;
! \9 G* R" \. R" D% O! b - foreach($worker->uidConnections as $connection). \( J. Q$ | E( \1 t, }
- {
1 y/ o4 P& G; [8 Q - $connection->send($message);2 X+ K1 h5 i0 m6 W3 r( n% v
- }) ], K8 \* P. x
- }& [ i6 V& d. ]6 [' Q5 e3 ?7 m! L
- . T9 Z9 i P$ H# u
- // 针对uid推送数据4 \0 {- k, k; b- K+ _: M: f
- function sendMessageByUid($uid, $message)$ k1 s+ E6 f' r# ]& t x9 B1 y* i
- {3 q6 y3 R" H6 N+ \/ j7 s
- global $worker;3 }- W! |0 [1 b, W; J$ p
- if(isset($worker->uidConnections[$uid]))
7 c" z' A r( w9 K7 O# L - {. H# O% |* F1 F( M
- $connection = $worker->uidConnections[$uid];
* J, w' \$ v' m* ? - $connection->send($message);( y T5 M& h" @, C2 K6 R4 B/ v
- return true;
& i: f, }# j! O) ~# v - }1 X7 C" j! ]: N2 A
- return false;, ^8 Q: R p+ R5 j' }
- }1 R6 a& @ u) c* l
! }' J1 U( H, t$ z, a( r- // 运行所有的worker4 V9 x4 n5 s- y5 v
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
5 k7 n9 i1 O5 ]1 i* W - ws.onopen = function(){1 ?2 b' ^7 ^ K0 P) M+ g
- var uid = 'uid1';
7 T$ e0 I* r6 _& @ - ws.send(uid);
$ ?" L: n9 B3 K) j3 L; Q - };- Q$ q$ H% V/ P5 i* K" K
- ws.onmessage = function(e){
4 I$ {7 P' n* U5 x8 p - alert(e.data);6 O0 e A) ~0 z7 E! k" r
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口' z5 z8 Z+ B$ T; F
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
9 J- u9 \4 V( f0 O1 q+ [% \ - // 推送的数据,包含uid字段,表示是给这个uid推送
- i0 n+ x. q1 D - $data = array('uid'=>'uid1', 'percent'=>'88%');0 x# n0 d2 p% |4 Z
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
) }5 Q" ]0 } |1 ^0 d0 z( a - fwrite($client, json_encode($data)."\n");: U% u [2 s% i8 c/ P
- // 读取推送结果# x( }! G8 M3 |" {
- echo fread($client, 8192);
复制代码 0 m& Y1 V# B2 m! @6 Y7 j
" i% \: t6 G, R5 F9 ]* z& K! } |