- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;( F* [) G5 M. j# Y- ^4 S: v
- require_once __DIR__ . '/Workerman/Autoloader.php';
/ Q+ h' q C' X4 Y7 m/ r - # A; ~, j3 q+ I0 [# G; K/ g
- $worker = new Worker();! B2 W" H, f( N7 }0 Y" j
- // 4个进程5 Q5 ^; U0 x6 s G: S8 j2 B7 u
- $worker->count = 4;
! O" C) A ]# O% L4 [ - // 每个进程启动后在当前进程新增一个Worker监听
; X) j9 }6 m- ^ - $worker->onWorkerStart = function($worker)' P; ]( h, u+ T% P' f4 l+ X! P- [+ j
- {8 [4 ^3 V$ U0 q& B) [
- /**
. K- {6 Y+ M) G" U - * 4个进程启动的时候都创建2016端口的Worker/ C, P8 D8 o% G+ P# H
- * 当执行到worker->listen()时会报Address already in use错误6 z6 }! M3 k8 l. l
- * 如果worker->count=1则不会报错! G$ q; p4 U+ x9 N
- */
/ X/ Q* s- g9 o! f) {3 [9 c - $inner_worker = new Worker('http://0.0.0.0:2016');2 C/ F5 j* S& P7 z7 M
- $inner_worker->onMessage = 'on_message';
2 X( o( n8 l: \( W2 Y1 l - // 执行监听。这里会报Address already in use错误' X/ }5 [! @( |2 V: C3 ~
- $inner_worker->listen();
/ f5 v# P6 F+ u1 U; r8 E# B - };+ L {9 v; j3 U
; y5 s( n4 M' Y8 Q- $worker->onMessage = 'on_message';
+ z. T8 g# m- S0 T7 z
- ?1 w2 A3 Z' ]6 z& [) N- function on_message($connection, $data)# k2 O6 _# U& M! \
- {
" M; g* B; K7 q' l: N - $connection->send("hello\n");
- M6 R- L7 h2 Z9 ]# c - }/ O2 E0 G- f) h6 C0 i
- # n- ]% c0 K7 a% C2 \( _) w
- // 运行worker% y0 M9 X1 M. g0 l
- Worker::runAll();
5 q( T" ?8 ~9 M3 R1 J0 ^ - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
3 z. O/ o4 d5 O; `, P" b0 R- J - 4 o: M. n) d& ~6 m
- use Workerman\Worker;6 q+ m5 i, m$ {0 G M1 N# m9 n
- require_once './Workerman/Autoloader.php';# g9 o% [- d0 O [8 r$ d3 S
) `3 Q: {! ]0 f- $worker = new Worker('text://0.0.0.0:2015');
! s# i2 i0 `6 X0 ^( _* m0 q, p: k - // 4个进程2 Y+ ]; n: M- Y1 n5 I' g: E+ o/ T! z
- $worker->count = 4;* L1 z* w! s% s9 G# z% s* I
- // 每个进程启动后在当前进程新增一个Worker监听/ e2 ~. c/ [! ]8 c0 y+ N
- $worker->onWorkerStart = function($worker)( F, K1 V4 F4 g$ G3 j4 z5 Z
- {5 V( Q/ ?4 c9 s. {: [
- $inner_worker = new Worker('http://0.0.0.0:2016');+ F' n: W3 ~( t) P
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)3 M- H8 D1 @( I% Y9 @6 T- ^+ f' d5 e
- $inner_worker->reusePort = true;& V) N0 e/ u1 K6 ]) z
- $inner_worker->onMessage = 'on_message';" U: W9 G) a$ t0 \
- // 执行监听。正常监听不会报错
+ K! s9 J" s, Q3 E - $inner_worker->listen();
0 Y2 E/ z5 S0 o/ l& [ - };, O1 Z0 Y( R4 q W" Y6 Z6 F7 _1 I
- % G" S% F* W3 J6 M7 j
- $worker->onMessage = 'on_message';
% @; [' F& r# S$ X5 T1 l
# d4 f3 A; k8 d% L( b- function on_message($connection, $data)
* V2 S6 T \! L( M - {, C0 ~" ~) A9 {8 h
- $connection->send("hello\n");
, c0 e; M3 W' q C4 W& z' `: ~ - }
2 Z0 @" i8 D: a9 g& S0 H9 ]
6 r3 O* E: r0 Y. ]4 C; C6 s) F- // 运行worker
% B& k- C3 r0 S! d! z- K9 j, k - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php) }1 L! x; w. a" m
- use Workerman\Worker;4 ~. S+ w/ `3 z, z
- require_once './Workerman/Autoloader.php';7 P: s( S4 g& p" d# H' Y3 z6 w* K& U
- // 初始化一个worker容器,监听1234端口3 K% h! e; v* T2 r6 @- V
- $worker = new Worker('websocket://0.0.0.0:1234');4 O3 R/ D; u. x! G7 X9 o! X0 Y
: w6 F% x8 G( i- /*
" \, n0 N/ ^( k9 P+ ] - * 注意这里进程数必须设置为1,否则会报端口占用错误& u6 g' ?0 X( c3 ~4 s2 t7 N ?
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
9 e3 f7 A1 }# H" y Z7 b0 T. ?9 S4 d0 H - */
* w6 `; g/ i1 V+ d2 i" { - $worker->count = 1;6 m; k# N2 {( }
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 g+ ~# k$ ]5 }, a! E% G7 g
- $worker->onWorkerStart = function($worker)
; p7 v& I- m- ]0 i) H2 O - {. n6 m" i4 Y: t K" f
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符9 U: R2 ?5 d1 ]1 s( k, P# ?
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
2 @) z! ~, [. p# M, W* X - $inner_text_worker->onMessage = function($connection, $buffer)
9 Z4 n5 e; u$ F$ Q2 N7 I - {
5 `2 r' o/ A# |4 B8 J4 j n - // $data数组格式,里面有uid,表示向那个uid的页面推送数据 T! u/ g' B% R: l6 r
- $data = json_decode($buffer, true);. R$ O, O& F- P. y$ U) ^
- $uid = $data['uid'];5 t; m8 N3 ?9 B/ ]3 t
- // 通过workerman,向uid的页面推送数据
, N- _( _5 x/ f6 g, F0 L1 T! q - $ret = sendMessageByUid($uid, $buffer);
) E; `; B5 u7 H- @ E - // 返回推送结果) c) g+ q2 u3 k" S4 k
- $connection->send($ret ? 'ok' : 'fail');
" Q' m2 I4 a1 H' ^: J! t/ m - };5 J: A$ i1 B) R- E# a% M
- // ## 执行监听 ##
$ M( h5 ^0 L& [* }' K; B5 |1 I - $inner_text_worker->listen();2 Y1 ^' i6 ]5 { m" `
- };( g3 D1 N* E: S
- // 新增加一个属性,用来保存uid到connection的映射# K) L1 X4 J7 |! }# g
- $worker->uidConnections = array();
( J" z6 @/ b% m& U - // 当有客户端发来消息时执行的回调函数/ L S" `* t5 j9 W8 B6 U: s' I
- $worker->onMessage = function($connection, $data)
) A3 l: M5 F: {2 G h: K - {
: d( m0 q; @7 \2 @& \ - global $worker;
4 T9 u2 }. @( |+ Z: m4 M - // 判断当前客户端是否已经验证,既是否设置了uid
" W% n& @% K3 d7 c7 \5 Q0 M - if(!isset($connection->uid)), @! Q, o/ M+ P! v
- {
6 V( @; p+ p! @( y - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
9 \" Q: D/ C2 B6 a0 j - $connection->uid = $data;
9 M" y u7 J/ ~# Q - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
' Z) F4 V* ^/ K - * 实现针对特定uid推送数据: O- b2 _: L* Q( L3 ~3 O
- */( I8 C( H- T) n. j
- $worker->uidConnections[$connection->uid] = $connection;9 W- |0 M1 ?2 F0 b5 ~
- return;
r( g7 D2 k8 x5 E# z- r& }3 d - }1 b/ x) y# Y. ?3 M. w, [: K M
- };
6 U7 X0 i/ C* k( ]
* b$ x* }. S$ r& j1 ]) U6 f- // 当有客户端连接断开时6 A" w$ i0 |) {2 W
- $worker->onClose = function($connection)3 L1 l& Y# |; O+ e) E
- {# f b0 d+ I: Q0 D$ \, r" q3 n8 ?
- global $worker;. u; Z- @9 @* R4 y
- if(isset($connection->uid))
' [0 v% n I; L$ i8 S; O - {$ s) X0 M$ B! |( l. r
- // 连接断开时删除映射
+ O3 y$ F, p9 O+ L: l2 Y - unset($worker->uidConnections[$connection->uid]);; l. \" o' L5 Y) @
- }
& D/ S; w4 E9 c8 |: F - };( v$ R a7 d! o- P6 [' p
; T0 _- U# ^/ N. `0 l8 ?$ w8 A- // 向所有验证的用户推送数据
7 K: D( c7 }% a$ F" O! t, } - function broadcast($message)
- Y2 A l0 b9 h4 n2 H - {
+ M7 W/ y3 @) K3 g' J+ ] - global $worker;
6 f% d+ T+ m$ u4 V7 F6 K& C3 F8 E - foreach($worker->uidConnections as $connection)) P5 n5 N% `" I! S& z% s6 ?
- {
1 o* B4 o% v3 k2 _5 _$ @% s - $connection->send($message);# @$ Z/ \' T0 ^7 j* ]% T- b" A$ q8 t6 {
- }
4 S) J2 Q1 {) I9 X! E1 d - }
) e! i% F$ q' Q - + b# A6 q: `- {$ [
- // 针对uid推送数据7 I8 A0 _ e; \9 p
- function sendMessageByUid($uid, $message)
3 Z4 q5 E( @' F Y. Z - {( T. H% i) _, g2 }8 }
- global $worker;
( b. _- d4 f { - if(isset($worker->uidConnections[$uid]))
% N# h: T& A1 E t - {
& k1 Q7 c: p; d; h9 [% ` - $connection = $worker->uidConnections[$uid];* x3 w+ R! I5 J* Y
- $connection->send($message);, b3 A8 [* z" V7 u% U
- return true;! F0 W" V* {3 v
- }' x+ [: G$ U; J* A
- return false;
. O6 ?/ T6 D7 S - }
7 v* ?/ g7 w6 t# d4 M" F% G- t7 r - & z! y+ M& E% K, \9 J. W
- // 运行所有的worker
" e8 h8 G5 l9 v% U) ]8 i - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');& p j) |' V( e; p& d) d) X$ \- S
- ws.onopen = function(){
% f6 a& R# G3 B: ^ - var uid = 'uid1';. |" L& q6 l2 {1 M* n
- ws.send(uid);
. y6 n3 B! h; n - };
2 H. l' }, U1 E) Y: ]5 g - ws.onmessage = function(e){
/ F8 A2 h$ j$ K# K9 ` - alert(e.data);
* [( k0 a) S3 z6 _9 n1 K8 r% F4 R - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
1 J$ e: |# z; Y4 L - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 R0 L, D2 z. D" l h
- // 推送的数据,包含uid字段,表示是给这个uid推送5 \1 z$ o* s" b
- $data = array('uid'=>'uid1', 'percent'=>'88%');" b- K; u/ O. w. j
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
. w- J( J7 b5 G& }$ f - fwrite($client, json_encode($data)."\n");
- g3 @: K1 T, e4 T% E - // 读取推送结果
' U0 X5 L2 R5 m( p7 } - echo fread($client, 8192);
复制代码 ( _3 L. M; J* w( o2 B
; ~8 e, S7 s' |' @. }/ z; q
|