- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
w* A9 N c A0 e) w# F" l - require_once __DIR__ . '/Workerman/Autoloader.php';+ s* i, Y R& N1 j
- 5 N/ N& ]$ c% |4 F/ S; n& t
- $worker = new Worker();
) `/ G5 A" Y0 m; E - // 4个进程- F# w v. u6 ]/ [
- $worker->count = 4;% y8 q7 n* j. ?
- // 每个进程启动后在当前进程新增一个Worker监听
$ h2 R: h0 [+ }+ P- E v - $worker->onWorkerStart = function($worker)
, k# G, ]; D G. B. ?. C4 | - {4 |8 I5 O: k1 k5 i t; `+ x% L
- /**9 u' }; n4 |5 q& T
- * 4个进程启动的时候都创建2016端口的Worker
) E6 H1 g2 \7 g - * 当执行到worker->listen()时会报Address already in use错误
: z0 }( ] a5 G3 } - * 如果worker->count=1则不会报错; _! ^; q$ \3 f+ n
- */& g' h& r& }5 d5 g! {: r. k0 a
- $inner_worker = new Worker('http://0.0.0.0:2016');
0 p3 {5 n- w5 \7 K* o - $inner_worker->onMessage = 'on_message';
' F6 p- H0 f+ j( E - // 执行监听。这里会报Address already in use错误
4 g1 W- B" h! @+ S# X - $inner_worker->listen();! n& C; Q' ]$ R$ }
- };, V* E' M% g8 R
9 R$ L5 E2 {/ @7 [3 W- J8 e- $worker->onMessage = 'on_message';! A6 T W2 i8 p+ R7 f
( y# ~8 l' m; ]- function on_message($connection, $data)
8 A6 v. k% _9 c% V2 A6 @& N - {
! @3 H1 Q; T7 K4 Z$ Q* q - $connection->send("hello\n");
; }; c7 r+ g6 | - }
( I- A) t5 S3 J0 A0 y
J; \' r. Y/ |2 e8 y( @- // 运行worker
' n: r5 W8 t& W# I6 G - Worker::runAll();" n( N! c! p3 r) G6 M7 l @
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:2 i5 w7 |+ c- R9 b
- ; N' k; }* J0 B0 e. [4 v7 T4 X
- use Workerman\Worker;# v: L( o% L$ M: `/ X! I; O# a
- require_once './Workerman/Autoloader.php';/ g8 p8 B- j; ~$ W* F! F
- ; o$ W; A% e, `0 L6 k) n6 d# p/ _
- $worker = new Worker('text://0.0.0.0:2015');
* @0 b( `1 }9 r8 a+ @" ]/ v - // 4个进程
# j5 ]6 \% u+ P8 ?! J5 C - $worker->count = 4;
- v+ K/ ?& q% ^ z2 m/ M - // 每个进程启动后在当前进程新增一个Worker监听4 V- `" f; K: h* h0 m" {! a
- $worker->onWorkerStart = function($worker)) j* e/ V% |' g& ~
- {
& b. c9 Z, S' ]* s. G9 f - $inner_worker = new Worker('http://0.0.0.0:2016');: s; N% Y7 z. z
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
' ?& ^/ m: I$ { - $inner_worker->reusePort = true;3 \' v1 e/ T* v* P& I2 ^2 i. p( ]
- $inner_worker->onMessage = 'on_message';1 H- {; i( D' O
- // 执行监听。正常监听不会报错
4 D6 l O4 Z' n5 L' E# c* L - $inner_worker->listen();
0 n3 z/ T* k% ^9 C, m u m - };, U* J) X; t0 T) e
|: c7 Y, z# x4 d' b; a) t- $worker->onMessage = 'on_message';
4 O( g! ~3 C( A - ) A1 o6 C& H5 k1 i4 b
- function on_message($connection, $data)" M1 {) q% {% D# d2 b( S8 [
- {$ o4 T, }% T% R/ E
- $connection->send("hello\n");+ j6 J1 X8 ^: h& c0 L* @
- }, B" o& y+ H0 z- X( F4 O
- 1 a3 @% P4 z7 C5 m1 x
- // 运行worker
. x% C' m; B! {: J$ m( }4 W7 q - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php0 A) k9 k* t5 K8 I! S4 G
- use Workerman\Worker;" e! o+ m: M: u E" G. I
- require_once './Workerman/Autoloader.php';
0 r! Z$ L4 G' |6 K4 U9 C9 x. E - // 初始化一个worker容器,监听1234端口2 ~$ P, i: R6 f' Y
- $worker = new Worker('websocket://0.0.0.0:1234');& ?" }/ r& Y4 h( l* q1 U
- 2 ]2 J6 M! D- K
- /*0 B' o9 t) ?0 K9 F+ ?- B6 i. c
- * 注意这里进程数必须设置为1,否则会报端口占用错误5 M/ V7 o4 x+ y# n* Z( a; X
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
$ |" D) S& C$ J4 U5 M - */1 y2 _* N& j B
- $worker->count = 1;
2 A8 U: q5 K% c; G: f% \ - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
9 t2 [5 T e- k* F2 @( ?- a - $worker->onWorkerStart = function($worker)
& |: H8 @' V0 ^$ n+ @5 f - {
5 S- [+ w( j1 r" J - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
8 C+ `; D2 Q1 C$ `: ~- c! y - $inner_text_worker = new Worker('text://0.0.0.0:5678');+ l) s' M8 f- t* E- H" E
- $inner_text_worker->onMessage = function($connection, $buffer)7 S) I, p4 l1 V2 f
- {4 N/ n8 L* b. I$ a% h4 |
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据$ K# c3 G$ G4 z2 o$ v6 x
- $data = json_decode($buffer, true);) G# ^, ?; l; x0 ]! g
- $uid = $data['uid'];$ ?3 X7 N: H$ T! J0 K/ U
- // 通过workerman,向uid的页面推送数据
# T/ u* x* ~0 b- ?- t) D - $ret = sendMessageByUid($uid, $buffer);
' \7 v' x' Y2 q( O: H - // 返回推送结果; G9 f- Q/ L B5 X
- $connection->send($ret ? 'ok' : 'fail');( [" _7 m3 L' S3 F8 } U
- };
! U9 _! A# G. J9 Q* o" W6 s - // ## 执行监听 ##
9 R- F. {9 ]9 a# Q - $inner_text_worker->listen();6 t: G* A2 e6 g0 w' a* R% ?( t
- };
8 W( S9 Q K9 R! d* R# ~8 k8 I$ }0 D( _5 b - // 新增加一个属性,用来保存uid到connection的映射% F" Q& ~" U/ Y; n) E
- $worker->uidConnections = array(); m3 _5 K7 u/ S) J
- // 当有客户端发来消息时执行的回调函数* T+ q; f, C8 X6 d5 O0 {* N
- $worker->onMessage = function($connection, $data)0 e, b- `, k7 v
- {
3 N8 M6 R) |) V( {$ r* k0 i+ d - global $worker;" D3 f6 e! B; ^* c5 l1 ?' Q
- // 判断当前客户端是否已经验证,既是否设置了uid4 e: N& E7 Y) q6 n/ k
- if(!isset($connection->uid))( b" |, f8 ]1 V; |: o3 A) T7 B
- {
* W; c- ~1 k: [; H1 [) w* L - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
, z$ d" q) Y# @5 g* a1 Z# q - $connection->uid = $data;* y& n4 ^6 D$ k5 m& E# _2 Y
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,- {! A5 ]/ [2 Y6 U+ S
- * 实现针对特定uid推送数据1 }3 @& R: l* U3 ~% w( [9 ^
- */
$ t% a( g, d8 C, u( L - $worker->uidConnections[$connection->uid] = $connection;
0 t$ _: {8 I/ [& b - return;
% H' ^, O& L9 k - }
, h0 h% g) F2 K2 T0 z - };
6 y4 F$ @% j5 {$ s! O - : s3 l8 K7 n6 h9 T$ x3 `! v
- // 当有客户端连接断开时/ m% t9 O/ n U
- $worker->onClose = function($connection)8 x+ T- f6 M' y- u- r% b
- {
* Y y; d$ q, H7 B1 C+ M8 T' A - global $worker;! G$ n/ ]7 j6 W$ s# M3 h/ c" U
- if(isset($connection->uid))2 x; J; Y0 R( N3 s( `8 E! B! O. Y
- {
8 ` Y; G/ P: K7 z# `1 I9 K - // 连接断开时删除映射
( H1 F! ^- [7 D# V( Y! ` - unset($worker->uidConnections[$connection->uid]);
8 ]+ V7 |% o! ~6 S, R - }
& z! ]; F; H: X - };
, V; A& }" a9 o# x2 I3 A0 ?$ s - - u7 [! C e* h: n- ~
- // 向所有验证的用户推送数据& |3 V. I8 C8 _0 R
- function broadcast($message)
2 ]2 Y# `, @2 K3 {+ m( z - {5 x' G8 k1 r' ^# L8 }% T6 `
- global $worker;
" K3 e4 h0 }, M8 R/ N% P - foreach($worker->uidConnections as $connection)" D3 u' W9 f" [( O
- {
' D* {" D( m+ Y4 W, n( | - $connection->send($message);
$ u6 m- }: o, k, }5 u - }' e1 j" t1 t C7 P% S( O
- }
- u3 B2 N s6 E! a) Q) D# A) A
6 U; |7 ?4 k3 ~$ C; P2 v: a- // 针对uid推送数据
. Z; e! M6 v! H: n% m4 p - function sendMessageByUid($uid, $message)3 K% {3 i! i& d/ E3 R
- {
1 w3 J. @6 p' u; c" Q& _ - global $worker;/ \# z5 X+ r, T2 X+ g/ ?) U8 w
- if(isset($worker->uidConnections[$uid]))
1 l" l' t! g" Q2 @0 i - {
: ]/ ]$ P8 }$ W" c - $connection = $worker->uidConnections[$uid];; x& o0 I' b# D$ ?# l
- $connection->send($message);
$ T$ j' G1 U! o - return true;+ p% t% i: f2 o1 K
- }6 P+ r' e; H( z8 W0 e+ U
- return false;9 N! o/ x S0 N& n" m
- }
7 o" _7 W" `+ B2 `
9 E @9 h. y" a, k" `5 H& L- // 运行所有的worker. c0 _5 x$ J2 Z! J" I* I
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
" N4 \8 A( B1 b. g$ X0 J: w' `; j - ws.onopen = function(){9 A4 B. C% C6 Z& X j4 @( L
- var uid = 'uid1';
* D. ]; `( t" n3 d% J) R5 a0 t2 P" d - ws.send(uid);
% B2 b6 r. |3 J- ]' L - };' B5 B% T) q( L4 m3 {( r: v
- ws.onmessage = function(e){4 q2 a! I7 E0 z9 v6 X* G( f9 S2 ^# ` k
- alert(e.data);
, V. u8 T* h' g( d P - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口1 z" k; Q6 w! A
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
- {& W0 J0 h' r$ N' g9 m: X0 n - // 推送的数据,包含uid字段,表示是给这个uid推送. h0 S9 y/ q( }3 z
- $data = array('uid'=>'uid1', 'percent'=>'88%');
% u D. W8 E' |5 |+ \ - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ c& p5 S1 W3 L6 K$ k
- fwrite($client, json_encode($data)."\n");2 H) d; O0 t0 o2 V8 V) g, v( A* X8 x1 I
- // 读取推送结果& }, ]' J1 l j) P0 n6 ]
- echo fread($client, 8192);
复制代码 " q( @: P, ^/ }& X% N, P% O
+ H* U6 D- ?; ]( z/ H& K/ V" A
|