- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
+ B2 A4 S; ~+ M* [5 \5 J" B - require_once __DIR__ . '/Workerman/Autoloader.php';: O* }/ U! X/ [
- ! N/ o5 n n- J& N% s- O
- $worker = new Worker();
) B0 ?) ^3 M6 s - // 4个进程
, ]$ @0 ]/ x+ \2 _( b* [0 B8 z6 @ - $worker->count = 4;
" C" g4 d w/ C W! Q5 [ - // 每个进程启动后在当前进程新增一个Worker监听
) z* |3 Q0 D1 g' Z2 I4 Z9 a - $worker->onWorkerStart = function($worker)
3 O9 p6 Y2 R' I9 z0 h0 A/ P - {4 c& e0 Y0 y# J) Q- S+ ^7 e# }6 K
- /**
- y a; s% \5 C R - * 4个进程启动的时候都创建2016端口的Worker9 o. W& I; X: Q( C
- * 当执行到worker->listen()时会报Address already in use错误9 ^& z6 B" ^: O H" G
- * 如果worker->count=1则不会报错
9 ?' S! G8 W: Q+ g - */
, o" ?! W/ l% ]' E" m5 u. w - $inner_worker = new Worker('http://0.0.0.0:2016');. Q x _) w8 P& Q
- $inner_worker->onMessage = 'on_message';4 A8 f k5 m2 P- ]
- // 执行监听。这里会报Address already in use错误
1 ^/ l+ G, z. E8 k2 \ - $inner_worker->listen();
1 Z+ P' h8 f3 r( U1 X4 k. F# s - };
! T+ j& E$ \$ e - ) M" H2 J4 y5 y) J P2 v
- $worker->onMessage = 'on_message';+ H& O( S8 I6 T) Q2 E$ i
, m" k( T1 l- H! o* k- function on_message($connection, $data)
" W) X) R& }, D! ]( V9 E( b/ k - {
% `2 b7 { f7 ]. d - $connection->send("hello\n");
/ M% d( V$ M4 k1 l - }
. A( L0 c. x) Y+ Y, v
7 e4 @5 f3 I9 s- // 运行worker: d/ l# ?8 V, T$ \% R8 U
- Worker::runAll();
3 L. [7 j0 J6 ?) Y* h; @! k Z - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
6 w7 c# r. h. S1 Q
`3 W6 M# N1 a3 J& H- use Workerman\Worker;
6 ^# H' l6 c4 S+ i - require_once './Workerman/Autoloader.php';/ J' w$ o, k- g! n7 H0 B
- 0 {, v7 }9 F/ S9 h2 P+ o/ ]
- $worker = new Worker('text://0.0.0.0:2015');
/ C: Q0 r8 {: D& ^& Q# u# g - // 4个进程
. l: e+ {1 k4 f6 m - $worker->count = 4;4 n ~- S( v4 x) |3 X
- // 每个进程启动后在当前进程新增一个Worker监听
2 c3 x0 _8 k" f. _ E, z - $worker->onWorkerStart = function($worker)7 M# C+ @9 ^" J6 K
- {
1 \; t. a* Y0 M; T6 p6 O. e - $inner_worker = new Worker('http://0.0.0.0:2016');0 h9 e: f) V/ o( p; Y3 N
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& _ L) U, v t! m0 d7 H
- $inner_worker->reusePort = true;7 v( E, B' [( O% F$ J
- $inner_worker->onMessage = 'on_message';# W0 ]% y1 x, ~8 o: c9 O# D
- // 执行监听。正常监听不会报错
( X0 d+ B. s) h9 K- L9 a - $inner_worker->listen();0 W0 r9 L( y7 [
- };
/ }$ ^* @9 ~$ v) J - # u3 u4 H( S. {3 N
- $worker->onMessage = 'on_message';
: t1 X2 B7 m4 x) K. ]& j
/ L. V1 J1 T# r& y8 c& ~- function on_message($connection, $data), |+ g5 \0 h6 { b0 w+ g/ H) `
- {! W, Z0 ~6 j9 ~% x3 J
- $connection->send("hello\n");2 k$ L- D N e0 _: j) _9 e
- }
) C/ U: v% z% x& Z' G
% I$ v7 o a! r1 m$ m- // 运行worker/ [. V9 c4 {7 s% D( J$ m
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php' m# x" l8 `& l! x
- use Workerman\Worker;9 H% T5 |6 a7 K! T; {% Z
- require_once './Workerman/Autoloader.php'; E, Y1 m X2 `- H
- // 初始化一个worker容器,监听1234端口2 V2 C7 Z& T, ]! D# Y7 z
- $worker = new Worker('websocket://0.0.0.0:1234');
! ^! [' G1 }8 H. o- N( v- d# R
9 s0 ?/ m5 q* r A- /*/ |1 I6 u; c3 x) g. M
- * 注意这里进程数必须设置为1,否则会报端口占用错误 w1 Y9 G" ?" C& f" q2 o _ d. v
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
9 g* I% N4 M8 x; E$ \" F2 p5 i - */
' e5 R2 _2 \9 U; o( ~$ v! { - $worker->count = 1;6 j6 }5 Y ?. E @1 [& s% O9 W7 l- o
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 z- I5 f) B* R( A
- $worker->onWorkerStart = function($worker)* j, i* e& Y8 d; q
- {+ U {7 ~( s! _8 s
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
9 R- @$ ~2 K/ | - $inner_text_worker = new Worker('text://0.0.0.0:5678');
7 p$ B: E9 R5 H - $inner_text_worker->onMessage = function($connection, $buffer)( m: h! w2 B3 `2 l( {# r
- {! ], i' E9 g" l$ a3 O. N
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
2 W! G) O$ h# L! j& ] - $data = json_decode($buffer, true);
8 q! T* y+ v5 ^& d: \+ a& I - $uid = $data['uid'];- l4 L& _% _/ J
- // 通过workerman,向uid的页面推送数据
! }" }$ \; p3 H+ F% _2 n; n - $ret = sendMessageByUid($uid, $buffer);+ l4 l9 B. D6 j4 s, \; o7 {
- // 返回推送结果
0 p2 C6 S+ P3 p, @ - $connection->send($ret ? 'ok' : 'fail');
, m' t9 E9 n0 j" F! _* A7 s( ? - };- t" i2 `2 w' T
- // ## 执行监听 ##/ \$ M4 U2 k9 p& w* `* M8 B
- $inner_text_worker->listen();6 ?+ {+ M0 X0 p6 ?4 w3 k
- };4 Q N6 G# u$ _8 d4 O
- // 新增加一个属性,用来保存uid到connection的映射7 N3 l% a, M# T) e& e9 T0 g! i
- $worker->uidConnections = array();
0 [) m# v) y8 G' H - // 当有客户端发来消息时执行的回调函数
* K7 ?! Q' F3 t, [* X0 B% ^' e - $worker->onMessage = function($connection, $data)6 y- H6 h) J- n
- {8 y8 K* `1 i' S9 X7 _
- global $worker;
" C& Z: ]' G! ~/ W0 G - // 判断当前客户端是否已经验证,既是否设置了uid' Q9 ]' L4 T. F8 F
- if(!isset($connection->uid))) G& h/ r' N( |+ Q4 a
- {! H1 A2 J4 E7 G
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* |- e6 }* m1 P: P; m7 ~
- $connection->uid = $data;
/ c) k3 d4 f( F3 C. M& l* T - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,: l. l+ ]& o' f: Q6 C k) l
- * 实现针对特定uid推送数据% h' c! d" ?+ y/ ?; O
- */( F9 T6 r o( g. ?
- $worker->uidConnections[$connection->uid] = $connection;
: v8 K3 }2 C; R \/ k7 V* h - return;8 u5 i) `; ^7 g8 Q5 b+ l& T# i
- }9 @; A( s" o8 O0 G K' f; A% i
- };
, x3 A. u% B6 q. f+ |2 e - 0 D# \/ t) [+ Y1 n
- // 当有客户端连接断开时
0 E( ?) s8 f) O5 K8 G0 I - $worker->onClose = function($connection)
; A s! h |# Y4 z; L8 T; J) o% t - {' A" ?$ ], @7 x: U, [
- global $worker;, Q0 g+ v% J0 e4 I8 {
- if(isset($connection->uid))
0 ~! {: I4 a6 D0 [4 j - {9 R' Z: ~) `; c& q; C9 K
- // 连接断开时删除映射- ?/ z1 ]7 f6 T! ^
- unset($worker->uidConnections[$connection->uid]);
4 K( k" S2 P$ ?' F - }; @: ]* g* V) j3 Y1 A
- };
. d3 y0 t, {$ t/ N- n - ) e- m% R5 K9 ]
- // 向所有验证的用户推送数据
/ h0 y7 s0 J+ ^. L7 s( i9 K/ [ - function broadcast($message)
2 R8 _4 S8 d8 D. n: o+ A; | - {
5 }. a; ]! O; R) f3 e( M: x' s - global $worker;
$ L% p# O6 N. Q9 r+ e- W - foreach($worker->uidConnections as $connection)& r; Z1 F! y! g* O$ S% ^
- {
0 V1 e# M0 V! v5 K - $connection->send($message);9 o: O. |' |# v7 C
- }. @3 @4 S2 e7 o; d4 H9 W, R
- }' Q0 V- A' @0 s9 u0 p
( |7 J+ Q/ J0 Q! \: ^- // 针对uid推送数据8 f' D3 Z$ Q2 M* j' i& z; [
- function sendMessageByUid($uid, $message)
2 F+ h9 z0 p( b - {
/ v0 I, s% P, ^5 ?( R - global $worker;
! F8 d0 z, l$ j8 M h* @ - if(isset($worker->uidConnections[$uid]))' L8 {& j7 ^% o2 A* i- Y
- {
$ P7 @5 a8 M- B( M - $connection = $worker->uidConnections[$uid];7 u; P3 W9 R& P1 x
- $connection->send($message);8 w S* c+ L: {6 ~& {+ _, c
- return true;
' O3 I+ S6 A: P- }1 g - }! v7 w, \0 }6 n' ]$ c' V( B7 z# b* p
- return false; p4 l1 j4 ]. H( M s
- }
7 Q6 p# d! B6 E( C# y3 ~* v
: m; V) H" d V: w- // 运行所有的worker7 k7 p. u K9 k% ?
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');: A& i0 Z# a g$ q ^2 A
- ws.onopen = function(){: t" W5 ]% G0 v# L1 y& F
- var uid = 'uid1';& e0 ?( s7 Q% s6 V, S. B/ a- X
- ws.send(uid);% ~* c& f" H3 l+ j, D
- };
5 U+ [* h9 T/ Y, D: {' A - ws.onmessage = function(e){* b Y4 [ H5 M
- alert(e.data);
5 W9 p7 |+ O8 L+ y5 p# A8 a - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口' M4 J N4 M. V5 v
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);- q9 {* j& g0 P( m# a: a
- // 推送的数据,包含uid字段,表示是给这个uid推送9 y5 v m: b" j, n
- $data = array('uid'=>'uid1', 'percent'=>'88%');
' v( u( S. k# O! d! i4 d U - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ o8 {: N4 v9 w/ w
- fwrite($client, json_encode($data)."\n");
) X1 J1 P# v, G - // 读取推送结果
, F0 E1 R7 Q N. `: { D9 i$ n - echo fread($client, 8192);
复制代码 9 I1 }* m- C. B
; Z' a( i- f0 R( Q z |