- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;7 a# h" W5 P6 ^
- require_once __DIR__ . '/Workerman/Autoloader.php';* J: U; {0 B' j( l; Z1 R3 `
- ( h' J7 z2 `2 I
- $worker = new Worker();
3 x- d; [/ g, y - // 4个进程
# E( G/ {* J& t - $worker->count = 4;2 J- y' b$ [) E& c
- // 每个进程启动后在当前进程新增一个Worker监听
7 c( e" K5 z" n/ |6 O* | - $worker->onWorkerStart = function($worker)) O* n o) L9 M" O3 x
- { ^ R9 n" J5 O& ?
- /**
6 t" d* {* R) G/ X* Z - * 4个进程启动的时候都创建2016端口的Worker
5 o& ~: x, Y% M# U- | - * 当执行到worker->listen()时会报Address already in use错误
0 a; j2 i& E1 n( _, U - * 如果worker->count=1则不会报错
8 P8 j/ I" R# G6 i - */! H( \. \; f. F% l: M- P' e
- $inner_worker = new Worker('http://0.0.0.0:2016');( ?: g8 z1 @7 E
- $inner_worker->onMessage = 'on_message';& K5 d: [3 {8 n$ i) }# Q! W& l
- // 执行监听。这里会报Address already in use错误
8 L T$ c* p8 u/ l( k! D - $inner_worker->listen(); E" B" [" e8 N t( |( y( V2 Y
- };, [0 v, g+ {9 w) V" K+ q7 S* G
7 O) U* ?( b# f. Z- @$ J2 q- $worker->onMessage = 'on_message';
9 K; f. r: f$ v/ u7 H - 2 Q2 r+ a# o+ A& D- T' E3 G" d; z
- function on_message($connection, $data)
- D4 |' u- n4 P! m9 t - {7 m$ D" b+ N8 y* P1 V9 `4 u
- $connection->send("hello\n");
$ z3 m+ L5 ^) n9 B; Y2 L) w+ G - }% s" v6 \3 ]3 g: m7 _; Y/ @' s. D
- 4 c1 q2 E2 o" e1 B; M
- // 运行worker& T/ o2 W% E) I+ I3 N
- Worker::runAll();
) `8 P8 H" v) h9 g# a/ m - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
2 x m8 a$ e5 v4 k+ K - X8 D9 a$ i, v5 | B; c
- use Workerman\Worker; ?: j( [( S8 ~. w
- require_once './Workerman/Autoloader.php';3 X! R( R/ \9 _& s% D
- 6 }7 s9 w" g N
- $worker = new Worker('text://0.0.0.0:2015');* m# N) S8 t5 d+ [
- // 4个进程
4 m' b' U4 j9 o( ]1 C% N$ u$ b$ N - $worker->count = 4;" t% X0 i1 n' d3 K* j& b# F/ N/ ^
- // 每个进程启动后在当前进程新增一个Worker监听
3 h6 l5 v _; n; |$ V; q - $worker->onWorkerStart = function($worker), T8 f2 p+ ~1 Y
- {
5 C( [% s9 v; v/ x - $inner_worker = new Worker('http://0.0.0.0:2016');
9 G# \; _% u) P0 V4 F% X - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)) W0 s; W5 H+ i% P3 P b
- $inner_worker->reusePort = true;
1 f$ k0 r% k1 D$ h% \4 A$ J: l) ` - $inner_worker->onMessage = 'on_message';, {' W$ F6 h) O2 A9 A. [( r
- // 执行监听。正常监听不会报错- b D2 R2 p6 }' U/ g) b
- $inner_worker->listen();" i4 N2 D: w* U% ?5 s9 ~) V$ X
- };
6 I( M5 J/ @, J0 e( X - 8 e, ]! `0 O! X5 f$ L/ u
- $worker->onMessage = 'on_message';/ ~8 v1 P- Q, c- ?9 D& E( l
- / r2 k# I' @0 e E+ R2 w
- function on_message($connection, $data)
9 N5 K' M5 @1 k. w( R5 `0 h; e - {' ^3 j/ w1 T1 `& `
- $connection->send("hello\n");
: S. g* a5 j9 R - }
- v5 p8 A5 Y( d
7 J0 V% Z8 u4 C9 U0 T1 G- // 运行worker
: b/ R! C" ^7 F5 }, [+ ]# ^ - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php) A8 m0 y* B2 s1 |5 J, n
- use Workerman\Worker;
4 f" ]3 }) o# R- G& h - require_once './Workerman/Autoloader.php';
' h8 ^! f# C S3 b/ |" |4 j - // 初始化一个worker容器,监听1234端口
2 ]% E2 W V# {7 n7 \/ ] e - $worker = new Worker('websocket://0.0.0.0:1234');2 J% q7 F! o7 ~3 ?6 p. h b$ Q
* w+ u4 }3 l' O; b; P- /*
3 E! j; R+ c- k: k - * 注意这里进程数必须设置为1,否则会报端口占用错误
" k+ U% `* Z) w7 t - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)6 n( d- {7 Z# f, y- n
- */
* y% I# k* j2 ?, u) ?! z - $worker->count = 1;
$ \3 I9 W. y# b: W6 p6 {; D3 C! B - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口3 q5 m3 C, k) O7 |# E( r6 H
- $worker->onWorkerStart = function($worker)2 v: ~. c* O- d1 V# W
- {
& c3 V! p& `9 ? - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
9 K' k! q+ h9 A5 ~8 E - $inner_text_worker = new Worker('text://0.0.0.0:5678');2 F& j( s5 U! m. ]
- $inner_text_worker->onMessage = function($connection, $buffer)7 ]$ L; [6 F2 a8 b9 v/ B# q) ]9 I
- {
) I, L5 C, O b. } - // $data数组格式,里面有uid,表示向那个uid的页面推送数据' N. b) K3 i# t. u M
- $data = json_decode($buffer, true);2 ^! T: }1 a3 K, k1 S
- $uid = $data['uid'];
, V" Z# R( T. m- Z& K: X- ` - // 通过workerman,向uid的页面推送数据& v4 r, a! I; M* \& j! \+ W' g
- $ret = sendMessageByUid($uid, $buffer);
$ k! ^5 K9 M6 W4 ]( O: q& r - // 返回推送结果
. j) _; S6 r( M, n3 z - $connection->send($ret ? 'ok' : 'fail');. [; h$ ?' _7 ~* }, N+ \: n
- };/ i2 L1 N+ ^2 X+ l: o( I; m) W
- // ## 执行监听 ##
N! R* G7 E9 q# Y8 q3 ^6 j" J - $inner_text_worker->listen();3 w2 c( w N: b5 ?' E6 Y2 h
- };
( s+ X: K& k1 w# D! H$ r4 |( w& [1 c - // 新增加一个属性,用来保存uid到connection的映射- @) n1 W. x8 p e2 x
- $worker->uidConnections = array();
2 ?% r( r+ j7 B' L$ Y - // 当有客户端发来消息时执行的回调函数
( T; v6 b% e& m8 g0 o2 x' S - $worker->onMessage = function($connection, $data)
5 }4 I3 w& S! N% d' r - {( Y0 ^4 E7 j' m/ ]0 f6 w
- global $worker;
. t( u: m) y h2 F1 m" x$ O5 F( D - // 判断当前客户端是否已经验证,既是否设置了uid: P& t4 w0 |9 r2 i1 j. D: R# R
- if(!isset($connection->uid))
, l4 Y2 ~ a0 u/ R; R4 t# @, W - {9 \* M& u# V/ f3 N( ?
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)" m+ C9 _2 o* A
- $connection->uid = $data;
+ K5 _0 E* Y) q9 B - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,: e- y i: \8 J9 J
- * 实现针对特定uid推送数据
. k3 E/ o8 V8 D+ m5 z% s - */; {& X- i+ Y$ X5 |! v! r
- $worker->uidConnections[$connection->uid] = $connection;
# E9 M9 s% U( v: R \' W7 i# k - return;
2 L( e: B- g6 L+ Y) s - } }# ?; F& s. b2 S/ z
- };$ |& W$ C `7 p$ {6 B& |% u
( \/ g/ J! C; ]' I3 K- // 当有客户端连接断开时: }- U: B* f5 X9 t) H
- $worker->onClose = function($connection)
( D& D" q5 g! \. L7 X3 v# `" p - {$ x* p l' L' n, q$ r. ?
- global $worker;
( v) m9 F& f; w/ L8 z5 Y - if(isset($connection->uid))+ s/ l2 g0 j" B# n
- {3 d! ~0 j( g5 e- n& [
- // 连接断开时删除映射
7 [% F% G$ t' h, b N - unset($worker->uidConnections[$connection->uid]);
9 z- @& H, W# L - }
6 O6 }) E+ e0 Q9 y) X - };7 v! n" V# W8 `) j' j
/ g9 k1 `- t: g. c- W6 {( d- // 向所有验证的用户推送数据$ ~) x" d' f. F: X- [! l0 K% r' J
- function broadcast($message)/ Z6 R& ^' D( H5 U* Q; G
- {% q6 T6 J5 h$ v) k7 x
- global $worker;. e% O. `0 O* e4 D% B6 ~5 b
- foreach($worker->uidConnections as $connection)5 s% r2 |7 j' S9 t
- {
# t9 J1 f: q% S: P- ?- o) \+ y - $connection->send($message);
# H! W: B% X) ^ - }
0 r, w: G6 x4 X2 d, N1 h2 U( ` - }
0 `! [3 e+ n8 ~1 X" x - 4 O% I5 l2 g8 n- K
- // 针对uid推送数据
9 Z* q' h3 m* g - function sendMessageByUid($uid, $message)
0 N% E k a- ? - { `; Q1 L# i, b# U- H
- global $worker;
+ t8 R' D- G; l/ V - if(isset($worker->uidConnections[$uid]))
' b: C' f; B/ w3 E/ g - {2 l: P7 J* l1 `1 r) {) a q
- $connection = $worker->uidConnections[$uid];
! V9 D! \( Y: Q1 J - $connection->send($message);+ _; ?$ V6 N, K! Y& a7 M" A7 s4 k1 T
- return true; B1 w1 S/ P4 b% c \5 s4 i
- }( q3 ^6 m1 x/ z" W& m3 `1 r
- return false;
& O& x$ U' J& |* y# c, R* C - }
/ {5 Y: O6 T# W% r* t
" \7 i0 X; i5 W! p- // 运行所有的worker; G1 B/ { {" L' G
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');- Y. {! n4 _% h" u/ O7 ]3 U3 z
- ws.onopen = function(){
. F4 V# h+ U1 l2 v* D - var uid = 'uid1'; S$ A3 r% Q; x' g8 Y
- ws.send(uid);, C$ E7 s' u! O7 G6 X
- };
|- }$ ~7 b7 {& }- k7 x - ws.onmessage = function(e){- f* ]0 U/ g/ l% V1 y3 @2 F1 O
- alert(e.data);
. [6 L' ?/ `5 c( @6 G# J - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
. X$ e5 z p& n0 E$ n b0 i - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
' H7 m) A: H7 n# Y- M( K1 B( M - // 推送的数据,包含uid字段,表示是给这个uid推送# J5 R1 q/ I4 q! l* `0 M( Y
- $data = array('uid'=>'uid1', 'percent'=>'88%');" V- x( m# m0 f
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符 D, Y; v M9 w+ n) a: Z* d
- fwrite($client, json_encode($data)."\n");$ ?; Z) v8 @' l9 g
- // 读取推送结果) _9 q# V7 u! ~7 }2 e* x+ W1 t
- echo fread($client, 8192);
复制代码
) u5 }6 z6 g6 ~, f) c
0 {; Y; D$ i d- P3 _, a' H% p, p! N |