- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
- P; A N# M# R z% I; j - require_once __DIR__ . '/Workerman/Autoloader.php';
/ z. B I' n7 ^% J: ]9 _
' u5 p4 Z) |. H5 ^* k# L) k: b" a- $worker = new Worker();- h M2 s% s; ~( `- i
- // 4个进程 f# G( v& P ~4 V8 P9 M
- $worker->count = 4;0 w) m# Q5 n v; }
- // 每个进程启动后在当前进程新增一个Worker监听2 I6 p6 s$ o, `' x
- $worker->onWorkerStart = function($worker)
$ l' u% I7 s5 Y. ~; G/ K( E - {- I. @5 `/ X5 E, X+ q- r k% W7 o
- /**# c; z! i* t- B
- * 4个进程启动的时候都创建2016端口的Worker9 e+ n, J9 z4 K; K% a f# m
- * 当执行到worker->listen()时会报Address already in use错误
9 r: O2 X9 o- W( y% \' t - * 如果worker->count=1则不会报错* u: W S/ y% I7 F; K( s3 u
- */" Y6 O) X: R3 k) G
- $inner_worker = new Worker('http://0.0.0.0:2016');' m% L# S4 I1 Y
- $inner_worker->onMessage = 'on_message';7 E! G# O9 r0 h) \% B5 `& p
- // 执行监听。这里会报Address already in use错误
" O: C& N% A, S4 \' j' k - $inner_worker->listen();
% I+ k; h3 ^ j - };9 G" b5 x1 d) E* ]; O( q% B; B7 [0 E
9 ^: h' R; v8 [! E6 k# i& @- $worker->onMessage = 'on_message';
0 C* M+ m! C0 ^0 F' f; X - ! F5 Q) U+ R& ^& {- }
- function on_message($connection, $data)
. v) H6 M. [7 t3 T - {
: h# ^5 I8 W- _8 i% a# U, P% c' n - $connection->send("hello\n");
) f' ]) n, N( G+ g1 d; R - }4 L" q" S1 b% ~7 @
- , Y/ Q B7 X, A. U0 B4 A; P* ^% V/ p
- // 运行worker: z: O( T. c' c4 Z9 j+ z
- Worker::runAll();. e- R, N0 W' B7 |! j1 I
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 l% z: k0 J* T
- ' _. @+ v( W+ u+ p1 L& m
- use Workerman\Worker;1 `7 {3 C4 B- T
- require_once './Workerman/Autoloader.php';" B: a+ B; T) P" ` }- ?# e3 T
. _! Y4 K" C" M; r/ j8 H/ g9 i, `- $worker = new Worker('text://0.0.0.0:2015');
( |, X$ G' K8 r5 h - // 4个进程
- V0 |5 _6 H2 c% w, ]% T- i* U - $worker->count = 4;
" e3 F4 P$ V' |% g- f: D - // 每个进程启动后在当前进程新增一个Worker监听; y' B o( `4 {/ W3 v. U
- $worker->onWorkerStart = function($worker)* T( S7 F Q! v
- {4 ^1 I1 }% r* j/ u' v6 q
- $inner_worker = new Worker('http://0.0.0.0:2016');
& A3 K& Z9 r; t! E2 d7 a) j" v ?7 ^! Q - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
, {0 n: r5 Q7 P, s) q& I - $inner_worker->reusePort = true;
- J7 S6 V0 c8 b8 X - $inner_worker->onMessage = 'on_message';' U# _& `3 ^+ B: j U% g
- // 执行监听。正常监听不会报错
0 D$ V3 a6 x5 w8 L+ A. L& R$ L - $inner_worker->listen();" V# Z! _7 H5 V( A. @8 {) Q9 L
- };5 N, u) v' D6 B+ k8 `9 ?7 {% @! P
9 A0 {: U/ l* v8 [( H- $worker->onMessage = 'on_message';8 j8 R A3 P8 H
- * ?6 r. U/ p3 |1 h/ V* E4 M0 Z
- function on_message($connection, $data)
% C2 _* Z! {# X - {
. E( ^+ ~2 b( R* H8 S - $connection->send("hello\n"); J9 z: U, G7 |
- }
Y1 t0 B, d5 W$ p - ! r* H4 T. G4 r8 T8 o
- // 运行worker6 |" ~, `8 P2 _: D6 e" Q
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
3 l1 L, k8 q0 L, w - use Workerman\Worker;
; n* c6 v8 J, x* M/ A1 [ - require_once './Workerman/Autoloader.php';6 N; Z; J7 K6 M
- // 初始化一个worker容器,监听1234端口
: w) Q7 g# e" J* a/ ? - $worker = new Worker('websocket://0.0.0.0:1234');
8 D9 J4 ~% ^: N% V# D8 j; Y# }
, M, y9 f: T% S. G0 ~5 F% p- /*
/ ~% n& z' _; V! r) L - * 注意这里进程数必须设置为1,否则会报端口占用错误
9 M8 |7 T$ x$ W: c+ X - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)( _/ r4 f/ x6 \* Z' B) Z& J4 W
- */: \; i* H5 A$ D: `" M: |+ K
- $worker->count = 1;8 C& ?. M- C; V: u2 ]7 Q- P; B
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
% L9 P: s. z- n$ A9 m - $worker->onWorkerStart = function($worker)
, w+ r( x! e" g* \ - {# s4 ?1 A% I" H; U9 V
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
) j! Y( |- A5 z. [# k - $inner_text_worker = new Worker('text://0.0.0.0:5678');
# Q3 g( O& W5 A* ? - $inner_text_worker->onMessage = function($connection, $buffer)
+ E- i+ v) c8 B) C, l5 v - {( @5 ~& b5 h9 Z& ?
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
, F3 M. t# |# ]. U - $data = json_decode($buffer, true);
2 V. [) c0 m T3 ~& g - $uid = $data['uid'];/ d8 C8 f: H7 U8 L/ J7 c. a
- // 通过workerman,向uid的页面推送数据( a2 F* T3 g* c7 u: }
- $ret = sendMessageByUid($uid, $buffer);. q/ r5 I, H8 J- p7 t
- // 返回推送结果$ U/ b+ O- R6 U! x- @( o& H# w
- $connection->send($ret ? 'ok' : 'fail');
$ y! \$ w+ x j$ X" A. s- Y0 d7 I2 | - };! ?# e8 o7 |( Z6 D+ L4 w
- // ## 执行监听 ##
; `. a" l6 f d: Q' { - $inner_text_worker->listen();& O! g& K. ~! g: J6 J
- };
7 M0 S( J" m( a - // 新增加一个属性,用来保存uid到connection的映射
$ V- u; G* l( U& ^( h5 [ - $worker->uidConnections = array();" j6 M& |" W( R
- // 当有客户端发来消息时执行的回调函数8 w/ m5 l, p, r: ]+ |
- $worker->onMessage = function($connection, $data)
7 a6 j" W! q$ V' B6 i* x# g/ l* u( T - {$ \1 g' B* M" G* `
- global $worker;
* y5 m; ?* B! _% C+ b( O2 E( ? - // 判断当前客户端是否已经验证,既是否设置了uid
5 A4 o4 f+ J8 H9 |3 v" @9 d! j - if(!isset($connection->uid))
" S' ` n" n c2 n" _3 E. s - {- t$ u. F' B V: k& S. A& V( ^
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证) ~& T1 _. ?8 M- W7 t
- $connection->uid = $data;
2 w+ B5 @/ ]) o) ^; L* w - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
) c0 E$ u% N- S* ]% |5 k - * 实现针对特定uid推送数据9 D. }( R" c* r
- */8 E$ ~3 _7 t" X" F, X% O- R' N4 O
- $worker->uidConnections[$connection->uid] = $connection;
7 Z8 }4 o" W9 _% u - return;
4 y& B1 M6 Q% l: e( ? - }
& u$ W* O' B8 s - };
8 [" }, o* P* K* |
: r7 z5 g4 c( x7 I: `4 Z- // 当有客户端连接断开时/ b9 H3 G- G- G; E3 L+ C$ j- h, ]
- $worker->onClose = function($connection)7 b, C' \$ F- s1 H7 o
- {4 |+ E" c, x: q+ L$ D5 h& X! J, w
- global $worker;
7 S7 P; S) `" Z [ - if(isset($connection->uid))# { Q Y! G* W$ v G( u
- {
/ ~9 w( e+ W0 \& m3 H. k - // 连接断开时删除映射 S0 X. V1 g- j3 r, L
- unset($worker->uidConnections[$connection->uid]);% ~3 Y4 e3 B0 ]& t1 m
- }
$ O0 M; x+ A( @) b' ~( } - };. {4 b: ]6 x; ?: t
- : H& Y3 L. m$ \3 f8 k& u6 m# A
- // 向所有验证的用户推送数据! P6 ]' Y& U$ \/ D
- function broadcast($message)
9 O. L+ O6 {- {, |" U - {0 W. o- w$ n9 [( S
- global $worker;+ V8 W d) o# k" i+ G' W' Q
- foreach($worker->uidConnections as $connection)! Y9 w2 o5 ~+ r, ?6 Z2 E2 L( [
- {# e z- y2 z8 E
- $connection->send($message);
9 N8 x) E/ W. [8 S; C - }
6 ]3 v- f- @8 n; R8 |0 |. r4 c% C: [& I - }
% E; q$ B$ F$ W5 u - 2 K) [$ Q$ U: ?
- // 针对uid推送数据 k- l: v* b5 m9 u8 r
- function sendMessageByUid($uid, $message)
, k! t' z1 [5 E, F7 P2 @' O - {
" s) p" x/ i4 q8 I) g - global $worker;4 \5 k+ }1 H! j+ K9 @
- if(isset($worker->uidConnections[$uid]))
% B+ e/ v1 Z2 X8 X$ z& M# ?0 h - {
7 s" t4 w3 y) k: g - $connection = $worker->uidConnections[$uid];5 X3 S4 v5 U6 d. l7 _" U
- $connection->send($message);
) Q) \! Q, H$ |0 s, O - return true;
$ F( v E8 w( D2 y/ o - }6 s' G, E. E5 M: ]2 }( [
- return false;7 {5 S3 ~# |: I) m! {
- }
$ W3 b% w% p3 H9 T( V; o - : ~; |8 N/ |+ _- D- I t" w
- // 运行所有的worker
! o' U* c( p! p9 T/ Q9 q$ C - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');$ p- e2 h& P6 W& @" `0 l+ j2 c
- ws.onopen = function(){
6 O6 a! ]$ Z1 f( Z) W4 i* F - var uid = 'uid1';0 E0 C) k5 {$ g
- ws.send(uid);
5 W" B+ S6 n5 q7 o% D - };, T+ K8 r7 _. E( B7 D0 Q
- ws.onmessage = function(e){3 M4 I# d u& T
- alert(e.data);. L% n& U, F# M5 F% k
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
0 q' ^% I6 ]: j - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);+ a: V9 O$ Y; m3 L1 z# p. ]
- // 推送的数据,包含uid字段,表示是给这个uid推送5 |* O1 l) N( G8 I( z; R
- $data = array('uid'=>'uid1', 'percent'=>'88%');
$ R0 ~# W8 U( J - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符7 \6 ]+ r1 n: A6 \7 C
- fwrite($client, json_encode($data)."\n");/ I C4 Q7 {# o
- // 读取推送结果
+ O$ X. T; v; r" q q - echo fread($client, 8192);
复制代码 9 H1 B, R' W9 n4 q# I) F: H
; \; q$ x8 X4 }% ?7 ?
|