- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
7 c |. J6 c7 @& ~$ w - require_once __DIR__ . '/Workerman/Autoloader.php';* s* j3 {+ n! C4 n0 R
- " e5 T ~' `, L
- $worker = new Worker();* E E( ]) z: D2 a$ }( x, x% `' U
- // 4个进程
. F1 B& j7 s6 f. B - $worker->count = 4;
& n5 b' Z8 ? B) @. i9 |- y - // 每个进程启动后在当前进程新增一个Worker监听
& [) g0 \/ C% X1 e1 {6 L5 q) x - $worker->onWorkerStart = function($worker)
7 r8 f9 ]; e( }/ R - {( x/ s% k& ]0 p ^& {5 z4 g
- /**
- k3 v- w/ Q8 L% X9 w - * 4个进程启动的时候都创建2016端口的Worker
o9 F. Z P& H) I5 g( b; T* X - * 当执行到worker->listen()时会报Address already in use错误
; I, y: a8 S1 U: n - * 如果worker->count=1则不会报错! P4 U6 l( D. [" E
- */: `9 M7 V: D D; p3 C! R: H6 V
- $inner_worker = new Worker('http://0.0.0.0:2016');: M$ W d0 ~2 B! G
- $inner_worker->onMessage = 'on_message';
, B* t3 I: @% r7 v" w) o- Z5 a - // 执行监听。这里会报Address already in use错误+ _2 i2 `' w$ p$ l2 S4 f% {
- $inner_worker->listen();
1 W6 |( U! L. e3 A* b* v5 i - };9 E( P0 G. O- S" W! ~
- 3 P* T2 J5 l7 Z8 _# g7 O/ H* X
- $worker->onMessage = 'on_message';/ L) r2 C7 y# @6 F$ [' z
2 r$ ?) `5 B% D5 p- k9 [$ G, o! v- function on_message($connection, $data)
0 M: m* N% n6 E% m* w0 K- o - {7 U7 v# l( C2 E8 ~' I
- $connection->send("hello\n");: I% c% ^, {. e, l. N* S8 |
- }( D+ {7 R9 l0 v, W
9 u# N0 f% x: [ u- // 运行worker. u" X6 {' g3 E+ u" X
- Worker::runAll();4 C: e' h* M4 [% N
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
* {; f9 P* p6 p - - P" E; f) L! m
- use Workerman\Worker;! e. J$ K3 ?( x0 w0 h2 D7 \+ a& ~
- require_once './Workerman/Autoloader.php';
7 c+ @' L) \* x+ k9 z0 e- w - , \1 @; w; O. l# w) {, \; g
- $worker = new Worker('text://0.0.0.0:2015');
1 i' Y; Q0 v7 H' h' o - // 4个进程
- `) E7 {3 g3 B: w) \% X: u" q1 a4 | - $worker->count = 4;
' h" |5 q* ?( G0 | - // 每个进程启动后在当前进程新增一个Worker监听% {0 Q+ i. y0 J: M/ ], v
- $worker->onWorkerStart = function($worker)' k$ N& G8 b6 V! H+ T5 x3 A
- {3 y! I0 ] G1 U. P4 h: f3 Q
- $inner_worker = new Worker('http://0.0.0.0:2016');
+ |. y0 Y! N, r' _8 H3 C - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)+ ]5 g, a! e |; r/ U
- $inner_worker->reusePort = true;
* V9 {; g$ x$ P! N( d - $inner_worker->onMessage = 'on_message';. b9 F+ v. q* S# G6 i
- // 执行监听。正常监听不会报错
% i/ c: `& P" R y5 Z; A) @9 j - $inner_worker->listen();' A: j" _/ ?, ]$ Y1 n4 {
- };
; Y+ @* j& u8 ?6 Z. P* n - 9 U' X2 D T! {- b7 {. Q
- $worker->onMessage = 'on_message';
! l3 @2 R b) @. q1 j P: O
- }6 m' D4 e( e: v0 ~ U+ h+ U$ V; D- function on_message($connection, $data)
* }' ?) p2 p4 Z$ k$ N - {4 [6 N! }7 O3 E
- $connection->send("hello\n");
- p8 n$ `; G: E. K$ e/ _( A2 m/ ^& \ - }
' K* I7 m" r0 v5 i4 t8 T
9 `0 @4 G/ ] [- a6 M4 s! j1 I- // 运行worker
" f7 J% ^, r4 ] - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php8 `) N6 C+ R8 W, b9 K) J; c# y+ p
- use Workerman\Worker;" a, J1 }( N' E4 Z$ A
- require_once './Workerman/Autoloader.php';7 W$ w! \- A; B g6 r9 C
- // 初始化一个worker容器,监听1234端口
1 L! x- a' B6 V5 \8 _) E0 ~ - $worker = new Worker('websocket://0.0.0.0:1234');1 \# F# Y3 f0 k# L
- ( Q6 x9 ]' b2 h" B/ ^9 f$ j
- /*
% p+ B& d( k# m% g' g/ J# _4 E - * 注意这里进程数必须设置为1,否则会报端口占用错误
- J( O7 `, G' k7 L: A. w - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
& x& W: O8 D' G% w, A8 s) G% X0 J - */
& n) D6 Y$ Z" E: p1 A - $worker->count = 1;8 X! X" I/ |5 u* ?
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# S* {6 m- D7 a, [% D4 c1 ]
- $worker->onWorkerStart = function($worker)
4 }. M) t( r1 a - {8 H6 L# ~6 R6 X; @9 H9 O3 G
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ g7 X: ]: O$ e2 Z8 n& t
- $inner_text_worker = new Worker('text://0.0.0.0:5678');9 q- @! F1 h1 V$ h) |
- $inner_text_worker->onMessage = function($connection, $buffer)( i. v) I+ F6 ?/ F ?% S. }% Y
- {* g* C" k1 r5 v- W
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据4 s F! q6 j2 j/ A" X# F
- $data = json_decode($buffer, true);% M4 x m3 D4 I3 e+ ^
- $uid = $data['uid'];6 S2 I3 p2 a: P8 e
- // 通过workerman,向uid的页面推送数据: k% V2 X. h- v( S2 ^
- $ret = sendMessageByUid($uid, $buffer);, J! Q" X6 d% |9 N
- // 返回推送结果
7 `% P7 K& p S6 x - $connection->send($ret ? 'ok' : 'fail');3 z, [0 A+ Y% P8 v; N/ d: I+ ?
- };
7 a6 j% R/ ?3 k- K- x- k - // ## 执行监听 ##
8 v# ]' z9 W6 n+ Q" H6 p - $inner_text_worker->listen();1 N2 X+ R/ c1 q& y
- };
% e; z& u2 D% H6 B - // 新增加一个属性,用来保存uid到connection的映射! x5 r" T# ]( N- I6 D
- $worker->uidConnections = array();* b2 W) H. D" ^/ L4 A- w0 W# ~
- // 当有客户端发来消息时执行的回调函数
+ j, O$ o4 J2 i. b: |8 B( Z9 ~ - $worker->onMessage = function($connection, $data)- V2 G( l+ y J' ?7 n: h0 m
- {
8 @0 d0 b& I3 M3 I: E - global $worker;5 a q" F8 g' Z4 P
- // 判断当前客户端是否已经验证,既是否设置了uid
1 F( t! a( P2 a- E( P& W' ~ - if(!isset($connection->uid))
# M5 s2 ~4 G! c+ e - {
. ?3 t5 o% G2 y; Q( A' T0 E; K - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& H, I0 k; W* m3 Q4 q4 ~: g& Y
- $connection->uid = $data;; {! H. _! H9 [/ j- E* ?. |
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,3 _+ r7 c1 T8 b0 V9 k6 @8 C
- * 实现针对特定uid推送数据- Z; Z0 J0 F6 A( s8 t, h! a( g
- */
" u! }# \5 Q- V - $worker->uidConnections[$connection->uid] = $connection;
1 w# H; r" a: n" e- G* c+ B& R3 W - return;( S; X0 G9 i o6 s
- }- Q, k+ l# X0 O1 g+ u" c$ C
- };
1 W3 D4 m2 S& @2 l2 e6 a3 e4 C- P5 G - ; q! k1 y+ G3 t1 [4 D) y
- // 当有客户端连接断开时
2 M# v6 W# d# ^: a$ U+ O: o, N/ A - $worker->onClose = function($connection)
% Z) c! F$ q! X! U" ~ - {
7 N C3 h; J; }& j& ~ - global $worker;, a7 t' t; f: c4 `
- if(isset($connection->uid))
9 f% B5 M! v+ S2 E1 n9 x, X - {
$ q2 [* j" b0 W - // 连接断开时删除映射1 l3 y" E( V& C9 u
- unset($worker->uidConnections[$connection->uid]);
4 F: O* E$ n$ W: p- F1 f - }
0 e5 j3 s3 T1 ]: I' F* {5 N5 I/ i - };
* |6 {% m6 w9 j, y
* w" ^9 A' X+ ]/ O- // 向所有验证的用户推送数据; ?6 l4 p* _. j& R0 O
- function broadcast($message)
3 `5 V. E/ \; Z# J9 {9 m4 {, l - {. I( J3 c! C( J2 F; P
- global $worker;! a" o- n( s5 }* b0 O G
- foreach($worker->uidConnections as $connection)
3 W6 o) }8 e$ E/ n- S$ f - {: s! h3 L+ J' c2 I/ t- S( ^6 Y
- $connection->send($message);1 A6 V4 X7 F* J, r: g; X
- }# {7 c2 {. y4 I) s5 V
- }: _. _! n9 M& @/ @, @
- 1 ?: i/ s1 w, V& k% C; Z) W
- // 针对uid推送数据
e N; L4 x% i; l1 c& n - function sendMessageByUid($uid, $message)3 D; A# Q/ T9 x* C
- {& C6 ~% N( `. }4 L5 e) Z
- global $worker;
; |, m5 t8 E' G6 J: D - if(isset($worker->uidConnections[$uid]))$ q3 @- n* }; O/ c8 c* L
- {
6 r) H1 C/ f& d! S. E" Z - $connection = $worker->uidConnections[$uid];
k9 c- g; R. ~) F% X2 l$ i/ o- M - $connection->send($message);
& [& {! y- w) ~- D - return true;
6 [7 E, F& c# T: k: y - }* n% e3 n4 M& t$ i6 e2 h
- return false;" C; W, G, l% S1 k$ G: E5 b( L
- }
- K& d5 R. ^# F1 w w1 b
6 R. h# V) {) Z0 f- // 运行所有的worker$ Z9 F: k7 o9 I8 ~: a$ T- B
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');3 Q& A- L# F! Y. w8 s- e
- ws.onopen = function(){ X6 c* q( k8 O3 W: l' K
- var uid = 'uid1';! Q; H/ O# z! a% O+ K S. O8 l
- ws.send(uid);
' @( p4 o. Q* i( X b- p - }; [5 Z( b7 U: Q* X: A/ `
- ws.onmessage = function(e){
* P# ?( n) K& d$ u4 T - alert(e.data);
4 @$ U. y: E6 [! B6 }7 C/ ^* b - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
& G/ i [9 I: A! O" F1 i" f' o3 u - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);/ x. u R2 n2 g2 }# u0 m9 Q" ]
- // 推送的数据,包含uid字段,表示是给这个uid推送
y# Q, x' b+ y4 @ - $data = array('uid'=>'uid1', 'percent'=>'88%');7 k% \9 B( a) p% A" N8 s6 l% A. t
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
, W. r9 ?7 `( i2 T# r - fwrite($client, json_encode($data)."\n");
+ I8 M/ z6 e( \" [4 Y% } - // 读取推送结果
6 `# ]. d* s0 O! e5 |" ^ - echo fread($client, 8192);
复制代码
: v n0 Z+ ?/ X5 c$ C% d; y/ M" f6 K& b$ [
|