- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;8 u- i% T- P" u/ U; d( o
- require_once __DIR__ . '/Workerman/Autoloader.php';4 R! |/ n. `. A# @% S2 j
- 3 ]. C* ?6 f- |/ [0 }; D4 c
- $worker = new Worker();
+ X- S8 A, C7 ]5 I# O' @! I& x0 q - // 4个进程
j) N( w4 [4 o* ?7 T- Q) U) U - $worker->count = 4;
! w$ j1 t# f& [ - // 每个进程启动后在当前进程新增一个Worker监听
7 A/ |3 Q' d/ S3 j: C - $worker->onWorkerStart = function($worker)
+ L! I- b2 Z, F: X - {5 k t3 @2 e. C/ ] R7 \/ ^
- /**) G5 T) K# V) b. ?- M7 z
- * 4个进程启动的时候都创建2016端口的Worker
% h: ?& p0 |7 b' k - * 当执行到worker->listen()时会报Address already in use错误
; u$ y% M7 R+ r6 Y$ y( S4 l( p+ X, N - * 如果worker->count=1则不会报错/ r$ D+ s7 z9 {9 t: ] N
- */
$ s$ g9 o7 m7 F* S z( n( g7 Z3 d8 H' k - $inner_worker = new Worker('http://0.0.0.0:2016');
3 _2 ]. o) P4 u& Q$ Q# t6 d9 W - $inner_worker->onMessage = 'on_message';
* J% o- ]. G( { F6 Z - // 执行监听。这里会报Address already in use错误4 O4 P) O: u3 M; [8 q
- $inner_worker->listen();; y9 h& _- m$ W. r4 x0 d
- };
]. f) ~) s0 U
" a! T3 y8 s: d1 `) ^# u" h- $worker->onMessage = 'on_message';- o9 E& V9 c( B; a b2 L
$ K A6 t: {- Y5 y3 }6 k% [' g- function on_message($connection, $data)& Z: K2 L- y/ z, ], c& _ N8 ?/ O% I
- {
% e1 v( n8 s% l6 P9 @* T - $connection->send("hello\n");
: R5 n. x6 s3 R# h9 i6 j9 l; _5 f1 _* c - }- J5 U4 z# t3 i2 Z) r' l
- 3 g% a4 V$ Z; T1 V5 h: M' `+ _$ d
- // 运行worker
3 Y4 d* r( t# T a% D. p8 @ - Worker::runAll();1 Z" ] h0 Q. L' n8 |' Y
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
! S( {2 j+ ?2 P: b5 ?* J; p. }7 v) W - 5 q/ ]( \& k9 F' W
- use Workerman\Worker;
$ `: A' d- G% F4 y$ ?+ T - require_once './Workerman/Autoloader.php';- g* B7 b* W1 R* c4 r
x( d9 T. l/ L) R. A2 J" s) Q- $worker = new Worker('text://0.0.0.0:2015');
i% h( P5 |% @7 s, f$ Q - // 4个进程
E8 i; j5 w) E1 n* p- o4 a - $worker->count = 4;
# V6 B3 Q- K) T9 y- ^( y a! J - // 每个进程启动后在当前进程新增一个Worker监听2 c0 w% T+ X" a8 P
- $worker->onWorkerStart = function($worker)
4 E, X5 C- |" k9 [6 i s2 [& d# \8 x - {$ h3 u0 c& g5 h7 C6 L9 }6 ^( s
- $inner_worker = new Worker('http://0.0.0.0:2016');, D% l( ^( b: X u6 P- r
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0); V) G, H, L( O
- $inner_worker->reusePort = true;
/ t+ K6 V2 x) U7 H& b - $inner_worker->onMessage = 'on_message';
( D; ?4 U0 F& w. j2 D; R - // 执行监听。正常监听不会报错0 ]+ z* t. g% O2 G
- $inner_worker->listen();) m, n- Q. o0 c. g
- }; D2 z4 K& k, A5 D7 f, ]" u
9 r1 j9 {6 s4 e' c5 Q: z+ C- $worker->onMessage = 'on_message';
) M2 |6 P* K2 U
- P" _7 J% M+ C: _5 k- function on_message($connection, $data): X! o& X: a& ]3 u
- {
! d7 F2 | ]3 c# f - $connection->send("hello\n");
0 m: G1 E+ |$ K - } n6 H/ x3 F7 h7 G3 c
- ( P" L0 F/ q3 e; }$ y( p
- // 运行worker: i+ `' E' ~7 A, r6 `# f4 j
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
$ ^# P7 Z1 r1 `" f" s2 e8 h* f - use Workerman\Worker;
3 O" D# L: k) H8 F* S - require_once './Workerman/Autoloader.php';2 H+ E y& z! w$ n. y
- // 初始化一个worker容器,监听1234端口
/ w2 r; w3 E- o) I, o. \( e - $worker = new Worker('websocket://0.0.0.0:1234');- y7 p, F$ k% E9 f2 H9 e5 m3 ?. E
- 7 e7 S: A4 K- N- R! R3 K+ t
- /*
" U6 _* F R1 b; Q6 v - * 注意这里进程数必须设置为1,否则会报端口占用错误5 o- ]+ }& v' w( u2 s0 e; e
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
* m7 X3 {$ p( }/ C$ ? - */
4 V6 j3 y6 `5 E* _1 x - $worker->count = 1;5 J: j7 _- W. z" t
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# G* p, h" ]& W, C/ V
- $worker->onWorkerStart = function($worker)
9 o& |. }) s% Q; b) K - {9 ~7 D; G* a U" H% Z/ [3 X
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
- V3 i) b* L5 N - $inner_text_worker = new Worker('text://0.0.0.0:5678');$ |( ?% O* e8 {
- $inner_text_worker->onMessage = function($connection, $buffer)/ V3 F4 @! Y# J- i
- {1 w0 g" ^7 R6 P
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据- z z3 y) e2 D& Q- ]0 p# C$ o2 p
- $data = json_decode($buffer, true);6 X8 `/ M3 v7 H# U" X( q! o$ \
- $uid = $data['uid'];4 q; {0 _8 q2 B& k7 e: U
- // 通过workerman,向uid的页面推送数据
. L7 \( U3 a9 N4 w) g+ Q4 W - $ret = sendMessageByUid($uid, $buffer);3 W" d5 O3 y: ^8 v) W6 ^5 m6 P
- // 返回推送结果
6 ^/ t' l4 s3 x4 I0 v - $connection->send($ret ? 'ok' : 'fail');
1 ~5 v9 Y* D# Q2 ]# l1 U - };: z0 Z \9 _2 ]& W- ~
- // ## 执行监听 ##
4 ~5 V. Q' z5 W" H% `# E; g5 e - $inner_text_worker->listen();
1 }3 z9 J* h( r. S. ~ - };9 v5 H- H. s3 F2 l5 i
- // 新增加一个属性,用来保存uid到connection的映射+ L7 d8 n7 ^* f4 \$ F8 b/ k
- $worker->uidConnections = array();
& `% Q; W/ m7 Y1 {6 \ - // 当有客户端发来消息时执行的回调函数6 ?3 Y+ W |9 S. w
- $worker->onMessage = function($connection, $data)
% j; H$ P2 E: a# x1 {8 [ - {
$ Y& t* K* {: P - global $worker;
! Y5 a+ r6 x" f7 n3 N# F, D4 P$ j - // 判断当前客户端是否已经验证,既是否设置了uid& D6 @: k2 f* J! S3 G% H" k
- if(!isset($connection->uid))$ T$ r w; `6 P8 n, @
- {/ w1 {1 `9 |+ T1 k* S; P( f
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)% F) S! T4 E2 P) n: d" Y0 R" i
- $connection->uid = $data; n: i) @6 A+ _, Q
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
4 J4 S2 t( p0 U, `# V* y - * 实现针对特定uid推送数据
& Q0 {( S) j+ A - */7 {& {7 z% V0 k, ^$ y
- $worker->uidConnections[$connection->uid] = $connection;- f: V D v$ s4 X# ]1 ]
- return;
/ g. ?. p0 R, r g+ J& ^ - }
! |- _ Y# S, n1 p: Y - };
/ v0 }# J1 N6 B( h( Q7 L. c% L9 r( ?) l
, j+ t9 i+ U$ d0 h' ^( d- // 当有客户端连接断开时$ G2 T% |; C! ^# H
- $worker->onClose = function($connection)
3 E; X0 D2 |" R4 q S# F1 u, V! E - {
D4 m- n: P+ V, q - global $worker;& T( s$ T, w' n9 o
- if(isset($connection->uid))' ~) K5 ~. _) m$ N
- {
* C: ]+ U2 J4 `# {' e" H0 i - // 连接断开时删除映射8 e1 l# ^2 i" C- @1 o
- unset($worker->uidConnections[$connection->uid]);
& h9 ^5 Y7 u3 a: F - }% m6 t8 Z& q) u+ `7 V
- };
$ ^1 M- F; \2 y" L/ c - ! h8 Q ?4 @7 W: Q5 m4 }6 U6 {
- // 向所有验证的用户推送数据1 ] u% a9 ?6 T
- function broadcast($message)' |+ B. b; s+ z# K
- {
4 e, |6 S/ T0 |: s: Y - global $worker;
7 _2 U _& }; k, m2 ^# l - foreach($worker->uidConnections as $connection)( G$ r3 _, a, ?0 t- h
- {
$ N9 Y# k% q% p - $connection->send($message);& a. b3 r6 Z/ r( A* O# L
- }
4 N! n9 K+ \: _( z- y - }- j2 \9 z# Z" r' y! Q
* C1 z) N" R/ @2 ^; C3 h- // 针对uid推送数据
G7 z$ B9 R$ H. ^5 D2 R7 d - function sendMessageByUid($uid, $message)/ w3 e' G2 U/ D$ {
- {
" E2 ^+ ^& o! ^2 ^ - global $worker;; ^) I2 x% T4 F2 B y6 ]( A7 w
- if(isset($worker->uidConnections[$uid]))
5 q7 L6 ~( u# X! j; C( T2 R - {6 Y+ D/ v7 ?* c
- $connection = $worker->uidConnections[$uid];
% \, b/ [+ a7 {6 c# S - $connection->send($message);
) B y" a" i' P5 U1 Z% Z F9 M! G3 a - return true;
3 C, E3 I Q+ \. M+ o1 ] - }. T6 Y1 w2 a1 h/ j% C% S4 |
- return false;
: ]$ b7 L+ i% P! ~ {' t - }/ F9 `8 A0 e- Q$ ]: L
7 C' i/ E8 h; P7 g1 z- // 运行所有的worker
Q# l- ~! p! E; l; N& r o - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');/ m- ^& X9 t& V2 C
- ws.onopen = function(){
2 j+ e1 P5 W% ^ - var uid = 'uid1';
5 _: P; N9 U) Y! ?3 S9 M - ws.send(uid);5 `3 G- E% r0 B1 f! q
- };
d. J; \7 L. }! f( N2 F- M0 S - ws.onmessage = function(e){
4 q# h/ }2 G9 |& Q - alert(e.data);
7 N# U; ] N' p% y) V. k/ A - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口# s X) D( G0 g& p# x
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
7 }* _, n/ \) I% c; M4 y+ } - // 推送的数据,包含uid字段,表示是给这个uid推送
9 R" J8 h, ?7 p, b( @9 P5 x - $data = array('uid'=>'uid1', 'percent'=>'88%');
: c, e r/ u' L0 C$ ^. f - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
( q8 g& S e2 t; ~8 L - fwrite($client, json_encode($data)."\n");
/ J8 J ?; n! o) n - // 读取推送结果& ^% x U, T, _' k7 p9 p
- echo fread($client, 8192);
复制代码 2 T8 t. i. B3 {% X# n) e# a. W. J7 U
4 L9 j% o5 D& |6 f; L* a9 o |