- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;( T3 e8 ^/ {& w, s6 `5 {* Z
- require_once __DIR__ . '/Workerman/Autoloader.php';
2 R7 l6 G8 R9 K8 h' j6 r- Z( i
5 y9 b: b; F" ?' t$ ~5 x- $worker = new Worker();# T; I/ Z$ ?( T/ T5 x2 j
- // 4个进程
. T" M/ o+ ~; K1 n - $worker->count = 4;* c2 O% A" ^- M+ X' v
- // 每个进程启动后在当前进程新增一个Worker监听
. b' x* Y, V- x1 l. U* E" B - $worker->onWorkerStart = function($worker)% B1 D0 }( `+ u
- {5 j# g' i; G7 U" C
- /**
) A6 E1 d6 [) W+ M' M$ C% [5 s - * 4个进程启动的时候都创建2016端口的Worker0 @8 ? I/ V: l
- * 当执行到worker->listen()时会报Address already in use错误' _/ z1 u# C3 Q. A6 x
- * 如果worker->count=1则不会报错2 @& y! G0 `8 ~
- */9 F5 r1 o- s( a0 S
- $inner_worker = new Worker('http://0.0.0.0:2016');, X, G% \" O6 Z. d' l y* U
- $inner_worker->onMessage = 'on_message';. \3 ]) B5 E; S/ u G, F
- // 执行监听。这里会报Address already in use错误- _& F, d+ y$ e/ }4 h( f3 q
- $inner_worker->listen();
' c- p/ w8 m, I8 y) c - };( q- @0 k+ u) L% e
( ] m2 T1 [" y2 R( R- $worker->onMessage = 'on_message';
; i8 Y4 @, C2 d: n - 4 I, e7 A' q/ A! D! b
- function on_message($connection, $data)' r3 b4 f% B5 ~( }- c. A
- {9 h3 r7 k" _, u9 D; p d- \! A
- $connection->send("hello\n");
! Q5 i' [+ P" Z: b/ [* v - }
6 T/ y w( i w7 j: @ - / S- G, }- d& Q+ T6 ?6 |
- // 运行worker" J. b; z1 |1 V' C
- Worker::runAll();, l" ?1 M) f" T- i# p ~8 O- j( q! O# a
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
, D' G/ i6 ?1 u1 T: S' D* k - 4 m; K: N. m7 C1 A! F- A
- use Workerman\Worker;
5 {- U; K/ X' ^& r3 v' f) m/ H, J - require_once './Workerman/Autoloader.php';0 [, Y; Y/ v; m5 x9 K3 V3 p6 F
. x2 x; N8 G* ]- $worker = new Worker('text://0.0.0.0:2015');
. C" S' ^* w2 G2 O - // 4个进程
; X% ~" J, j M% @* Y- T; N$ q - $worker->count = 4;
A0 ~! X6 H& L, g2 U) ` - // 每个进程启动后在当前进程新增一个Worker监听; N- X- g9 _5 c, d# _. s3 U
- $worker->onWorkerStart = function($worker)6 f% D; I* F+ e2 L. ]# @# {
- {0 k j4 G, _) r
- $inner_worker = new Worker('http://0.0.0.0:2016');
/ @0 v; r5 u7 d8 I" ?3 }7 q - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
- L/ f: U7 `% b+ C- e& U- S. G, | - $inner_worker->reusePort = true; |0 M2 n/ o. u; S' o- l- |
- $inner_worker->onMessage = 'on_message';9 c# ^6 G7 ^( ^/ ^) |
- // 执行监听。正常监听不会报错
) ~9 L# L7 u6 w" R - $inner_worker->listen();
% \5 i' u3 M2 |% y: S - };
: F0 H* g+ _5 }3 @( M H1 X. {) d
( X7 N$ n. L. x1 j- $worker->onMessage = 'on_message';
5 A; H) I6 q9 j% Z: ~# K - 2 O9 z6 o9 g7 b- V
- function on_message($connection, $data)
1 G# D* c& X& u" h) E+ F5 i7 Z - {
5 }1 T! j! c% D2 [' y - $connection->send("hello\n");( e0 }1 ^1 {, p9 H, |" k( y
- }
. R- g+ k$ e# U$ \3 l/ A' N& B. V - & S" p$ [: \8 L7 O; W* P/ o. y
- // 运行worker
; |1 N3 ]" I3 v/ N8 v. W - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
3 ^- ]: n( H$ H1 v/ e# V8 ] L7 e* J - use Workerman\Worker;+ d6 K% K# j! D5 ?
- require_once './Workerman/Autoloader.php';
: [5 R3 v) y4 j2 I) L t: X2 h) V - // 初始化一个worker容器,监听1234端口
9 {: A4 i, V0 T" p - $worker = new Worker('websocket://0.0.0.0:1234');
, [( w1 y+ U; F2 b1 I0 i( p5 @ - / \9 R( ~6 x2 ^; u x
- /*
4 U7 T9 m$ N6 V: s) n' g - * 注意这里进程数必须设置为1,否则会报端口占用错误1 G/ S0 L1 H* ?- a
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! w3 L. R3 S$ E; W6 _: ? Z0 a! K
- */6 h) }- R/ k- z4 G9 w
- $worker->count = 1;
i) S2 N" K: R - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口9 _, T! @9 |/ \) @# x7 G! c: R% b
- $worker->onWorkerStart = function($worker), A( W6 K. c& R, ^4 D X$ B7 F: S
- {
1 F% Q, v8 q. \" }) v3 U. @ - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
% Y( @* i8 T" v - $inner_text_worker = new Worker('text://0.0.0.0:5678');
; H( m* d3 ^$ Q$ n9 q: e: ?( q/ k1 s# } - $inner_text_worker->onMessage = function($connection, $buffer)1 B! z: v5 e7 ? |
- {6 V% K& O. D3 @) q. A7 t8 u, w
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
) a& W8 I/ R) {/ v2 M! F0 l - $data = json_decode($buffer, true);
8 k4 ?4 K3 a! Z* i4 b - $uid = $data['uid'];
" l0 {8 v V" g, f+ `1 D* \9 G) x% t3 m - // 通过workerman,向uid的页面推送数据
N0 i' Q% ~# w* N. P1 f - $ret = sendMessageByUid($uid, $buffer);
0 m4 u. X, U9 [' j - // 返回推送结果$ E8 Z8 ]2 f5 b/ U$ Q5 S" \
- $connection->send($ret ? 'ok' : 'fail');
2 `* n# V/ ~9 n' z& _- |4 t4 O - };
9 e0 y' p2 v* r/ Z( C - // ## 执行监听 ##* ^ X8 x6 z( a! Y( s: ]
- $inner_text_worker->listen();# {8 t% J3 y4 @4 `
- };6 b) B& P% x$ G( K, S
- // 新增加一个属性,用来保存uid到connection的映射+ u5 V! k4 V! K. ]4 L
- $worker->uidConnections = array();
! D8 F9 W9 H/ ]+ K - // 当有客户端发来消息时执行的回调函数5 Y( t- x! }" q, y
- $worker->onMessage = function($connection, $data)
! ~! ^# H: K- h0 a0 ~- o" L7 M - {+ S/ u P1 Y& {2 z6 U6 w2 ^
- global $worker;
3 n3 O2 X7 e ^4 n - // 判断当前客户端是否已经验证,既是否设置了uid
0 c: D/ k1 [- ~/ S$ ?- T O( I3 D7 h - if(!isset($connection->uid))5 [& _# R# G; n% X
- {
1 O7 O3 L0 g; b8 b& `0 p0 g - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
! ~% r- b Z8 q7 Q - $connection->uid = $data;8 R, ]1 z2 }2 t4 M) t$ W0 \
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
" q8 Z& `' Z2 H4 Y0 u - * 实现针对特定uid推送数据& Q, c5 V5 ^# S, Y4 d% w0 }. }" {
- */
, E; e* C0 i. r - $worker->uidConnections[$connection->uid] = $connection;; ^; X4 N% z1 ^ @
- return;, g; L: K" z0 w2 ?& n# |$ G( y8 s
- }+ v. F2 t; b* y5 r
- };6 x# ^0 d) B7 X/ h* G; b% J& {
4 h; f: K# q$ F. w9 w7 w, A- // 当有客户端连接断开时4 ]9 E2 `8 C1 u+ a# v" |' o
- $worker->onClose = function($connection)
T7 _+ l) d' j0 ? - {7 `# d. u, \* q- H1 B1 `
- global $worker;
* ]" u7 D, j' g. O6 K9 X+ L- u, y - if(isset($connection->uid))
# V0 q d a8 L* p( N$ A - {7 M7 F) A0 D5 o) r% k! `
- // 连接断开时删除映射( S1 ]% J$ V3 K+ h' Z
- unset($worker->uidConnections[$connection->uid]);1 l9 ~' {# b4 A9 F" f
- }- u2 u+ j _" c: J9 c6 O9 i/ q! [
- };1 _3 `, d2 H5 s O, Q5 }
- # v" ^$ k9 M5 V! [- N; r7 J, ?
- // 向所有验证的用户推送数据" y1 m# A) z( y5 w }6 S# m
- function broadcast($message)- U! F7 E- d, R
- {# W9 ]; A. S' ]
- global $worker;! D/ B! }; Z( [" b
- foreach($worker->uidConnections as $connection)
# R2 r1 V S+ |- W - {7 r! Z+ \! i# I- v+ |+ @# Y& v$ i
- $connection->send($message);* W2 ?5 X# {; D2 z5 K( u) q0 U0 s z
- }
6 l5 D+ h5 w5 R+ j7 S) C0 N - }
/ F: |3 Q- _+ @& Q" c" `& x - 8 k: W( R; l4 S# K0 u6 G
- // 针对uid推送数据
! ~& E1 @' x" f1 Q4 a w - function sendMessageByUid($uid, $message)
* X8 r, Y- g" b1 K( j. [! Z - {
* @% K/ ?) Z' [/ l! `: j0 [2 M - global $worker;
[, h; I2 k$ \# i* l1 i; U' Q - if(isset($worker->uidConnections[$uid]))% [ F4 s+ y" j/ C4 y
- {, j! s& y8 ?# w3 C
- $connection = $worker->uidConnections[$uid];
1 W. u {! c: o/ y* o9 B - $connection->send($message);, }3 T ^; j. t2 a, ^' F: s' {' }8 a
- return true;4 V& D8 E' {- W* q0 d/ B
- }0 R2 T' `6 v, h _ \
- return false;6 V" ?$ `2 g h+ \3 u3 G$ e
- }
; g" Z) l3 J; f& N4 G R - 1 i, k- E' c- z: D$ @
- // 运行所有的worker
6 Q' q3 R( v2 O- ~ - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');4 j5 P1 u5 w" Q1 V
- ws.onopen = function(){
( m# _' y; V- p# \ - var uid = 'uid1';
; \3 @; C% B: _ - ws.send(uid);
" G1 J8 h6 ^2 g$ P - };
4 g) Z0 q7 e4 e9 [4 F# t: C - ws.onmessage = function(e){ G; l" W. C# J$ s- Y) ~
- alert(e.data);
& p; m; b" |- W1 R# y! g% X1 r4 B - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
$ o, X2 L H: d8 n1 m - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);# S" I+ ~7 {2 t' o6 z
- // 推送的数据,包含uid字段,表示是给这个uid推送2 ~4 K! Y! J- _8 ] A9 x
- $data = array('uid'=>'uid1', 'percent'=>'88%');
5 W- z- q8 G' E- z8 d" r - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符4 ^! R3 j! \3 W) X
- fwrite($client, json_encode($data)."\n");
# @& Q0 w5 Q: \ y N9 j5 A9 n - // 读取推送结果+ G5 W# O* ^: C/ L* s4 q
- echo fread($client, 8192);
复制代码 & @5 V/ C! [6 H- q+ W
1 g" @4 }2 m: j
|