- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
7 I. |1 J0 t% W2 m4 z0 \ - require_once __DIR__ . '/Workerman/Autoloader.php';
5 @: F' l9 o D6 s1 a# |7 u - ( i; |: t; S$ k3 J
- $worker = new Worker();; E: N) G* P" C; r
- // 4个进程2 |' K! P" V6 @: G- ]) Q
- $worker->count = 4;$ N, c6 B4 |/ h
- // 每个进程启动后在当前进程新增一个Worker监听
+ n& K! _; d+ C - $worker->onWorkerStart = function($worker)
! b: J- }; J" S, L1 R' W0 V! { - {. H5 h3 Z+ p3 T% @
- /**5 _# G2 x7 L C) m: b
- * 4个进程启动的时候都创建2016端口的Worker3 g& A/ D2 K1 H: p2 J' S
- * 当执行到worker->listen()时会报Address already in use错误3 s# a8 s" `8 s8 U _0 Q
- * 如果worker->count=1则不会报错
5 Q9 P/ D" c1 ] - */
: a$ i) i2 V* w/ @, g5 k, Y5 a - $inner_worker = new Worker('http://0.0.0.0:2016');
' T+ q8 Q0 ^6 V1 e7 _8 \" Z - $inner_worker->onMessage = 'on_message';
* |; `0 V, l8 | - // 执行监听。这里会报Address already in use错误2 Z: x( H3 r }9 R# w* ]- F# I! b: i
- $inner_worker->listen();# R) P" n V0 g0 B8 e' @
- };
& {3 U6 S- v9 s/ f1 e; k - ' h2 T3 { T5 h- {
- $worker->onMessage = 'on_message';( n3 L: y6 M4 a9 S
- 0 M! K% G! p% K% |1 V- S1 j7 s4 t
- function on_message($connection, $data); c0 k+ z5 a3 n" @
- {
" h0 h r% K6 A4 _& p - $connection->send("hello\n");( N* c* u, Y: `
- }) Y# C5 P! M7 c8 \9 c5 r& o
" j5 T. a8 f" r$ T* v, t- // 运行worker
1 L& P9 S' Q" d s* I/ J - Worker::runAll();
7 v F3 F. s9 g# p' y - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
" \2 d# H: y- z
3 n F* u6 ]( I& V* S. _! H- use Workerman\Worker; c/ Z' M: D9 J6 z
- require_once './Workerman/Autoloader.php';& l q, c0 d* E9 V8 j x* u9 E
- 5 e- r2 R u' s1 v# `1 D7 r
- $worker = new Worker('text://0.0.0.0:2015');9 e P' W. ^" n8 F R3 T3 t4 B
- // 4个进程
( ~" D1 c5 U$ I" z - $worker->count = 4;1 [+ z5 ~' N* E/ ]( g
- // 每个进程启动后在当前进程新增一个Worker监听
7 w- Z1 V8 b4 s( _5 q% p3 Q - $worker->onWorkerStart = function($worker)
" Z& @; X! n3 c1 }+ C, {1 ` - {) u; G. T3 \+ w: {: y' A
- $inner_worker = new Worker('http://0.0.0.0:2016');
+ ~/ x- ^- Z. J, a - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)0 N4 U* x; y/ E' i0 N! M$ @
- $inner_worker->reusePort = true;
7 V. Y6 D! p/ T& |7 u8 f3 V - $inner_worker->onMessage = 'on_message';
' a: z( u) V2 T! B - // 执行监听。正常监听不会报错
$ O) d; \; ?. ^, F& ]# ]; q - $inner_worker->listen();
' M/ A5 U. ^; B" W - };( E' A# Y; M4 T8 `& c% i! w8 P' w
+ d. ^; x1 \5 T- $worker->onMessage = 'on_message';5 G7 r0 m" Q3 P; v
5 z# A0 w3 @- _* [' S5 u; i- function on_message($connection, $data), r$ T4 n4 i# a# d% I! J
- {! W( m( e- |: A g# `/ X2 w
- $connection->send("hello\n");
- w* T5 U1 i# |6 |+ P+ V - }" d: ^/ h9 {: I6 Q# j% c8 s
' y/ W" ?" O. ?% G+ z8 {2 T- // 运行worker
0 O8 ~6 a, C7 Z; x! x% E) @ - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php/ E1 _: k. r% B& y# X4 O6 B
- use Workerman\Worker; j& T3 h2 w ^. g1 L# u E( U
- require_once './Workerman/Autoloader.php';4 J/ @0 K$ \9 f s) E
- // 初始化一个worker容器,监听1234端口
/ l9 l1 C# q' O% P |" f0 Z- s - $worker = new Worker('websocket://0.0.0.0:1234');' T* o9 i) E: o" z1 o/ H# E% V0 |
( J% b' |9 f% s- X% G; J- /*% r: n7 L2 K8 B, z
- * 注意这里进程数必须设置为1,否则会报端口占用错误0 F1 ^- c! U. z4 O3 z
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true); N! E; S r, q. }, m+ H" j
- */
% M) _5 w% M: L% O1 g% U, v5 R - $worker->count = 1;' c% B+ G- M% K1 [" g
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
3 Y3 x0 K% k: J& { - $worker->onWorkerStart = function($worker)
" {. k; j) a$ _+ R1 B' H - {3 b. V- f9 W9 M
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
7 |; k- Z k% M; Q8 V! d - $inner_text_worker = new Worker('text://0.0.0.0:5678');( m7 }! K. D$ y" m# w# B
- $inner_text_worker->onMessage = function($connection, $buffer)9 V1 f' t7 U% m X: y
- {
8 V* l( l; F1 M0 ?1 o6 o - // $data数组格式,里面有uid,表示向那个uid的页面推送数据% B1 T+ C- ^& p7 F8 N
- $data = json_decode($buffer, true);
2 o% b% c! F4 X8 k - $uid = $data['uid'];
% y. Y& `9 u# g6 G$ ` - // 通过workerman,向uid的页面推送数据0 l, P8 P3 ?; q
- $ret = sendMessageByUid($uid, $buffer);
' p. t& n3 h/ L }0 y# ~ - // 返回推送结果
z; W) w. Y' m% l - $connection->send($ret ? 'ok' : 'fail');
2 A3 x& h% @$ x; g - };
" B5 R9 c8 v& X5 d. z' \" B: `, S+ _. X - // ## 执行监听 ##
3 }& W2 R; q; Y5 G2 T - $inner_text_worker->listen();( k6 d. v3 X9 d! S/ g3 ~
- };
. H& t& `) X1 k$ O) z2 D6 D6 h/ h - // 新增加一个属性,用来保存uid到connection的映射 I' P8 T+ l, b1 i6 U8 @
- $worker->uidConnections = array();
% l! a& d/ [* d1 L* ?( k! Y# s - // 当有客户端发来消息时执行的回调函数
, ~( O$ S- x- F: Z5 ] - $worker->onMessage = function($connection, $data)& ?8 P4 v5 }. T/ U, v* }
- {
* `/ W! }9 G. j7 N. L' q4 a. O - global $worker;; j* E3 N6 V! P9 V. V
- // 判断当前客户端是否已经验证,既是否设置了uid
8 `( q" N1 O* `0 f9 Q$ A - if(!isset($connection->uid))
, t2 S* Q2 }) N% P+ m- q - {$ `# k' V4 p& f4 i
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
]7 L5 M7 t2 ~! r, k% Q, m$ S5 F1 m F; J - $connection->uid = $data;8 H% A: f( x& p$ K% D
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
/ ?7 ^/ `/ V6 {( Z/ X - * 实现针对特定uid推送数据
, E9 i9 d$ u+ H9 ~' R" w: u$ l - */) x- T3 | Q6 O, Q9 v% W; n
- $worker->uidConnections[$connection->uid] = $connection; u3 w$ J4 h9 N8 J
- return;
* [! M j# S+ P+ r - }
8 J2 Y& ^( B+ J - };
3 a1 H3 M; q# r - + V3 m( ~1 Z( a4 W2 n
- // 当有客户端连接断开时( y b* v* C, ^& y5 w. I5 [* G
- $worker->onClose = function($connection)
# F. y+ J% m w# K$ y - {7 R) N. S1 _3 R- |3 d
- global $worker;( X- G2 J6 g& C( h1 U
- if(isset($connection->uid))6 u6 y2 y2 B' q5 G( b, N
- {! c( A5 G7 q! \/ d/ [
- // 连接断开时删除映射
) @% t E i' c - unset($worker->uidConnections[$connection->uid]);8 X5 ?4 U% q) \7 t% a1 s" U
- }
$ _8 z0 p. n/ Z+ a9 k - };5 S& [. x9 p- @" F' u3 J! i+ ]
$ D. `' F/ t# W$ P- R- // 向所有验证的用户推送数据
% Y& ]5 J) T8 U9 {* \ - function broadcast($message)" ~4 a8 n5 w; f. s; m% r
- {
% s) `/ _* l- x/ V - global $worker;: Y& N/ g, m4 l/ s- g7 ^
- foreach($worker->uidConnections as $connection)
1 v3 B6 r( o# ]/ N U6 P3 h7 k - {
& h6 U: P8 L, j, n" Y, s - $connection->send($message);, P5 _! g/ |9 B
- }
- P# @$ G) U t0 F& u8 G - }
) J8 t6 W1 Y# @4 F2 Y+ W - ! p0 C- d, G D- F: A2 X" V
- // 针对uid推送数据8 D0 a9 s1 @2 m% Z0 _- k, ?$ K- L
- function sendMessageByUid($uid, $message)0 n3 D( w; w9 o# m
- {$ L8 M* M& T- r( n9 M. j6 k
- global $worker;: m: J. |* e o3 b* j% J& J& ]
- if(isset($worker->uidConnections[$uid]))5 Y% | _- E" \' k$ ?3 R
- {. w0 V# X2 k, [3 {. j
- $connection = $worker->uidConnections[$uid];
; u. W, D8 o+ T* l+ z; U# i1 G) S \ - $connection->send($message);0 a3 V, Z) x: l, ~9 K$ `
- return true;
* p6 s8 k$ u7 H- T/ N3 ] - }
3 e- {+ }% M# @- i - return false;
# `( j/ Y# o3 h+ q - }8 T6 Y. g, k. k, d( n1 @1 D
, g$ V' K8 f) K# y- // 运行所有的worker
8 h9 J" a/ k* I; n, m, j3 T - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');' Y% I5 g3 M0 V' t
- ws.onopen = function(){0 ?$ J* H# X# n
- var uid = 'uid1';
. C+ h& ~1 q: K) H- [ - ws.send(uid);6 R ?, s# @7 e* M Y! X
- };' \8 K1 o# P% y4 W# ~2 ]" K, f
- ws.onmessage = function(e){
' |# c" C$ w+ J- Y+ o9 v& s" t# Q: X - alert(e.data);( F9 t5 `- i4 ` S f ^( P4 G
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
- i' v$ X/ [# l& c8 K1 J - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
2 _9 C' T% t/ Z - // 推送的数据,包含uid字段,表示是给这个uid推送9 M+ X2 a# L- [' E
- $data = array('uid'=>'uid1', 'percent'=>'88%');, N3 p2 R1 g# w0 Z: ^
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
( X& q7 N* G" I; T3 x! z* q - fwrite($client, json_encode($data)."\n");
! m) J" M/ X% F2 }) u8 D2 p; Z- a+ d - // 读取推送结果1 a* N5 W G( n8 l% S( y
- echo fread($client, 8192);
复制代码
! G1 D7 U$ ?+ h2 q
5 I( L# X2 B0 X2 A1 @ |