- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
3 G% b3 O5 d; [1 C4 ? - require_once __DIR__ . '/Workerman/Autoloader.php';
( I( f8 s5 ]- R( F& O; \ - 2 k# P/ b) r9 p; \2 b
- $worker = new Worker();2 j: B# o$ {( p. }% n) g V, D% P9 ]
- // 4个进程4 A) S2 `+ R* w1 C% Y6 S8 O# T8 y" b6 J
- $worker->count = 4;5 n! G6 N0 e! U. A/ N
- // 每个进程启动后在当前进程新增一个Worker监听5 J- Z$ z/ j( }4 w4 B
- $worker->onWorkerStart = function($worker)3 L* r9 v/ D' v$ K8 f
- {; q/ T# o. V- H0 Z8 p# m! m5 s3 t: w
- /**9 ~0 f. C* F: J4 t0 i
- * 4个进程启动的时候都创建2016端口的Worker
% @2 \2 m. V9 ^0 o - * 当执行到worker->listen()时会报Address already in use错误9 @( S; x0 r0 I* |) m: t
- * 如果worker->count=1则不会报错
9 b3 r6 x! L" t - */
* a' ?& ?$ g" @) R - $inner_worker = new Worker('http://0.0.0.0:2016');
6 |+ P# }1 L& c' V - $inner_worker->onMessage = 'on_message';" F# |/ G, q: T& x! L
- // 执行监听。这里会报Address already in use错误
4 ~2 ^7 P6 f9 ~* [: x) s - $inner_worker->listen();# I& S. k4 d3 h) O( y% m
- };7 d# h o) {2 V7 k. V3 b0 \
. q3 y8 h, y$ T: W- $worker->onMessage = 'on_message';' I! U; w. j/ i% o
. M2 N2 R( N' x& R; e6 s/ l" O' X- function on_message($connection, $data)
" d9 ?, a% d0 O8 {2 M# J# F- ~( i# T7 f2 u - {9 A3 \2 T/ x5 u3 k
- $connection->send("hello\n");! \! f# }' h) t- ^
- }8 S: ?" g& D+ T; `5 |! c0 I1 Y' {
- $ Y1 L" t2 @- ]( v
- // 运行worker3 ]% @7 ?+ t) G, `+ w( Z' Q* B% E {
- Worker::runAll();+ n6 G6 K( p; J' L
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:3 u5 i" c" y; k; S/ h1 J) ~: _
- * D0 t. G0 n5 D Q
- use Workerman\Worker;
' m: K; |$ n# G$ q& ^ - require_once './Workerman/Autoloader.php';/ f* G$ c% c. q8 _" [
- 1 o2 E, q) a" G9 g D+ `
- $worker = new Worker('text://0.0.0.0:2015');/ N3 F. X3 D7 g' E- Z5 Z( \8 p
- // 4个进程
& j4 @# l: m. K* A- v2 _ - $worker->count = 4;1 ]1 A; ]4 f8 U2 r( x
- // 每个进程启动后在当前进程新增一个Worker监听
( _( D/ H, r0 i# v) u - $worker->onWorkerStart = function($worker)
5 r0 X2 [" ]( |1 O* P) ?1 a% N - {
) X) ~- O! u6 |0 `# }% M - $inner_worker = new Worker('http://0.0.0.0:2016');
$ H" Q2 `4 }' P6 ` - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)( ?: s: F p( B7 r+ a9 j+ v0 T* `
- $inner_worker->reusePort = true;
$ y: U9 p5 ]+ k- O7 @/ |# d - $inner_worker->onMessage = 'on_message';9 K5 X' r' [: j7 Q+ @. x; d
- // 执行监听。正常监听不会报错
; z/ [0 T9 D& }6 M0 y - $inner_worker->listen();
6 W# x, j }" A. p# T9 `: d - };( P4 Q. ^% {, Q/ f. X0 `: W( e2 ]
- ! n" X Z4 s5 S
- $worker->onMessage = 'on_message';8 u8 }8 e# z5 ~$ l
! a6 H$ e* V$ A1 ?- function on_message($connection, $data)
/ P2 a4 J; h9 v, j" `5 g8 w - {* d9 }) t( ^! y9 E+ z. N0 o
- $connection->send("hello\n");
. ]4 F% p1 r! _. q: S - }
% a1 U' ^& Y. c! Q3 z - + p4 {3 Q4 D4 M9 @/ P' j( T! b
- // 运行worker" x s- ?) z- d7 T& \1 |6 F1 w
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
, M }# ^5 x" V% T, J$ q. A - use Workerman\Worker;
8 \! n ^# G$ j, z! G - require_once './Workerman/Autoloader.php';4 a4 x3 C4 t( J d
- // 初始化一个worker容器,监听1234端口2 M7 Z [( W4 L1 z/ J
- $worker = new Worker('websocket://0.0.0.0:1234');( P8 S1 v( R, e% ]
- ; Q+ [4 K% S1 O4 K$ W* v. g
- /*
1 z. `- E; |7 {% o! y" a: K - * 注意这里进程数必须设置为1,否则会报端口占用错误
" U* f; t q7 b, p2 ]" } - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)7 h$ K9 M! o. H' l$ @
- */: Z r3 E, a0 S4 g# h
- $worker->count = 1;
0 f0 ^' r' }% h - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
( H& h, Z$ D6 A0 P+ l3 E6 r - $worker->onWorkerStart = function($worker)" b4 a, M. y M- M: ~' x
- {1 V, h$ w; {, Q8 _. D
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
! V$ W$ U( |! l3 H/ \ - $inner_text_worker = new Worker('text://0.0.0.0:5678');1 O9 b1 l; r) W, Y. c( P! T
- $inner_text_worker->onMessage = function($connection, $buffer)' N/ A6 s" a8 k d5 F
- {" ]4 f6 K4 S. x- ?# c4 W
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
; e8 |- A* Q/ I - $data = json_decode($buffer, true);; T* I+ y7 a8 W7 ^
- $uid = $data['uid'];
b4 g7 d0 s4 |' u6 A7 W - // 通过workerman,向uid的页面推送数据
) b6 d' Q4 v0 b5 N& a - $ret = sendMessageByUid($uid, $buffer);
7 E) \7 h% u( h0 {, k- d - // 返回推送结果
B8 S7 S& y/ }! P - $connection->send($ret ? 'ok' : 'fail');
9 H+ q# X w- \ - };1 [+ p! _. i$ v1 Y9 c4 h/ B% b0 o
- // ## 执行监听 ##4 L& _) c) a/ P) W/ K7 J: Z
- $inner_text_worker->listen();5 L% g0 L5 |0 I
- };) o% a2 g2 S# c% G$ x6 Q# q9 d: q5 W
- // 新增加一个属性,用来保存uid到connection的映射
: n F, s' }. u7 L* d - $worker->uidConnections = array();
. y, Q' v, I+ e3 O% T- w - // 当有客户端发来消息时执行的回调函数
N% G9 Z- D$ t2 m - $worker->onMessage = function($connection, $data)
4 X6 Y2 r7 w8 j% a5 y( m. F - {1 p9 N" _6 m# u
- global $worker;2 |; v- Q; `3 n
- // 判断当前客户端是否已经验证,既是否设置了uid9 j6 x Y# `& X0 e1 g* N8 p
- if(!isset($connection->uid))
! Y9 p( T/ Y7 @ - {
, m5 L) d+ E8 E/ M - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)( F3 B) `8 r9 @' l% w; X! S: e
- $connection->uid = $data;; i, a; U5 \1 @& t6 `
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
7 ~5 P& R2 P1 O4 t: ]9 z0 e( D; q - * 实现针对特定uid推送数据/ ]* g, S+ u# A$ y
- */
& z9 U, S6 d' X& y2 s3 c4 X - $worker->uidConnections[$connection->uid] = $connection;0 n) B4 n6 {5 J0 L+ d3 k
- return;
# P( z. l" ~6 i7 ? t - }
7 d2 O. h; f2 n O) }2 |7 k - };
; m+ f3 R6 k' x: f# | - ( M# M- g" `" J( A1 m, r
- // 当有客户端连接断开时! b) v; H0 Y/ I$ U. G* q C5 |
- $worker->onClose = function($connection)
4 V, q9 }+ A& {, \% G9 V! ?" A - {
2 l/ B) D% [! @ - global $worker;
8 k3 i% _) {" I; Q - if(isset($connection->uid))
! J7 g" E# X9 { - {2 f" |, o( }; z9 `3 `3 X- m
- // 连接断开时删除映射/ @8 ^' k/ M9 G& D
- unset($worker->uidConnections[$connection->uid]);" X3 J8 X4 a7 q2 o# r1 T
- }, k6 |! ~: G8 B- N B, }) P; t" D
- };* [9 \ V: o0 s
# z" }) g2 ?8 Z0 c7 r3 B+ V- // 向所有验证的用户推送数据) d7 D# T! W$ f( k/ Z7 e" {7 c
- function broadcast($message)2 C1 ?- @: `' H* _4 i- B0 R
- {/ r/ s7 \$ G4 I8 ~* a, k2 l
- global $worker;) R2 g3 t# L! `: J; a2 E, M
- foreach($worker->uidConnections as $connection) u8 l6 l4 D7 X2 J, s5 F3 h
- {
& l/ \5 g. @2 ?4 d - $connection->send($message);
6 L3 c4 N" R3 p, { - }2 L; t4 d g* H! V' D6 T, J
- }5 y: }& L& p/ A: P! g; \! E
0 o4 U6 ] n/ A" W% ?- // 针对uid推送数据 C" Z/ u [/ ?/ y' |
- function sendMessageByUid($uid, $message)/ v6 q- K' Q2 |0 N* z0 Q9 B
- {
# g/ c y. ?1 @9 q4 F9 M - global $worker;
: A Z2 c# j% B' \. ? - if(isset($worker->uidConnections[$uid]))
. s% O. P( e) u - {
; r% C4 H8 u8 z7 G0 R, z0 x8 z - $connection = $worker->uidConnections[$uid];/ J2 Q% L& O7 S' t H
- $connection->send($message);# l3 M. G' S, D, p% H
- return true;9 I. _1 j6 E3 o' d
- }
+ E8 W3 F: Y% p+ F1 t: v2 q5 j - return false;
; ^! o3 s% l; v+ ?1 t - }
8 ?' H# i, }; b, S" P& E- _ - - ]5 ?3 n8 ^# t
- // 运行所有的worker
4 X- U& c/ y4 |+ R - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');1 |# ~; |" t( }5 T4 @
- ws.onopen = function(){; J3 R7 K0 a6 d
- var uid = 'uid1';
2 I4 b2 w9 b( H& f3 C - ws.send(uid);6 K" E. F. Y: J8 c$ L* f* _ o
- };( L0 {2 x" o6 g2 i' B1 J
- ws.onmessage = function(e){' `3 T1 u' \( R" W4 f! o, V
- alert(e.data);8 j9 C& s8 W1 m2 W' k
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
w; S2 P1 M, X. X - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);' r: H5 u4 v' |
- // 推送的数据,包含uid字段,表示是给这个uid推送
0 O- r( @! u& J$ N - $data = array('uid'=>'uid1', 'percent'=>'88%');
, a, e# r3 `- z n% s - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符7 J7 X. A# ]8 P& m
- fwrite($client, json_encode($data)."\n");; h2 P6 P$ ?; q5 u; s# B# x' a1 u
- // 读取推送结果% X5 s. _) K8 y: [4 U8 g
- echo fread($client, 8192);
复制代码
/ I4 Y7 @ h( X; m# Y; R
7 Z8 P! ?: F3 I$ b" H6 H! y |