- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
( f% C. }! U, k! S( M, ] - require_once __DIR__ . '/Workerman/Autoloader.php';
3 K: M7 s2 o- W4 m8 c, q
5 n5 R0 o8 j6 O5 b- z* y( W- $worker = new Worker();
; R; C# t# K8 l, h - // 4个进程
+ X" X" u# v& V6 t5 C! T8 h - $worker->count = 4;
- G& \; E$ q _5 q2 w - // 每个进程启动后在当前进程新增一个Worker监听* I/ g& n5 g% D4 _0 V
- $worker->onWorkerStart = function($worker)1 T8 E7 L: ]8 A& E
- {
$ h1 O8 t2 s1 n - /**' y8 F w3 C3 z2 `# Y1 L) C9 D) q
- * 4个进程启动的时候都创建2016端口的Worker
9 h, N& K5 y9 I1 i - * 当执行到worker->listen()时会报Address already in use错误
& H' S6 Q. Z+ H R" H6 ]% U - * 如果worker->count=1则不会报错4 J+ p8 R: Q' o8 ] ~; s" H
- */: t0 Y* H. t3 u: X
- $inner_worker = new Worker('http://0.0.0.0:2016');9 |8 A7 q, A( a+ I' Q
- $inner_worker->onMessage = 'on_message';
( b8 J5 I4 e9 g' { - // 执行监听。这里会报Address already in use错误; _- x/ ^. v6 F( H2 R1 c
- $inner_worker->listen();8 z$ _5 X3 g! l9 `- }
- };
0 r( n: E+ Y0 J' ^ q
+ v$ @8 c3 `; l/ d/ W, i0 x# c- $worker->onMessage = 'on_message';# k- R2 s% X6 R. t) [
X: A x' M% _" K' E1 M# E. U& Z. j5 d- function on_message($connection, $data)
! g6 J3 K6 _- @, l5 T - {
3 s$ g2 g5 L; [8 @ - $connection->send("hello\n");
: }0 d9 ]! `# ~ ~& g( i' W - }
; M; E2 t; x5 o4 O7 ~) c - s9 y6 u, l8 U/ |
- // 运行worker% J) z% `7 @1 k4 M' k
- Worker::runAll();. {( ~5 q' Z# [: K0 Q
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:% f% p) R$ R- m/ d
# N2 `8 h+ J3 T0 M+ ]0 c9 [* k9 o- use Workerman\Worker;
* z! c; {, T4 X1 O" C% @% O. @ - require_once './Workerman/Autoloader.php';
c# g$ i% S! O; s1 a: J - 9 M/ _ M0 H% w& x/ T8 x" e& I8 q
- $worker = new Worker('text://0.0.0.0:2015');
b# t9 v3 b* n) I( R( _9 e - // 4个进程
7 ]. ^/ b* X+ ]* K7 n - $worker->count = 4;1 L2 D8 f6 ]4 d1 N5 d+ F3 G
- // 每个进程启动后在当前进程新增一个Worker监听
3 K. I* f2 Q+ R/ m - $worker->onWorkerStart = function($worker)3 P) Y n5 f4 J' ~3 m9 a1 }
- {
- q$ c$ p/ c/ X1 z# n - $inner_worker = new Worker('http://0.0.0.0:2016');
0 s7 W5 u# v, `# v# U' O - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0). B9 y9 I2 c! r$ Z% t0 h3 Q
- $inner_worker->reusePort = true;! h) S `( o$ o7 [
- $inner_worker->onMessage = 'on_message';9 O. S# c5 k( W2 Q, C0 H
- // 执行监听。正常监听不会报错$ t; O0 l2 ^8 R% k+ E
- $inner_worker->listen();
% \9 k y& n# {& T7 x0 ~ - };
# u b7 N% R0 \- v, J& S" a - : N5 o4 O! k$ S& R/ G6 l
- $worker->onMessage = 'on_message';9 { C; N m$ O' L
6 E6 C$ N5 @8 B* z" a+ X" f- function on_message($connection, $data)# ?2 J0 Z$ n; g, O
- {
, Y3 Q& {- R7 y6 V$ N) S& ]. r - $connection->send("hello\n");
7 \- t# D$ o/ B - }
6 K4 _9 o8 J4 p! E4 K2 f3 i) A/ j9 y - 2 e" A- ^& W; L$ i1 a+ w! _
- // 运行worker( p, _% h x5 R# i
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php0 q: U# e8 F; f0 f6 Z% g
- use Workerman\Worker;
: O1 L& G/ U k: H - require_once './Workerman/Autoloader.php';! r6 }/ z. l5 o2 j2 g, f7 W6 h! p
- // 初始化一个worker容器,监听1234端口
% n" T/ |0 D; m+ H6 v6 P - $worker = new Worker('websocket://0.0.0.0:1234');) B. I& m. p) p) u$ |& H
- 0 P# l& k3 v$ A7 i! h' z1 \+ V
- /*
/ n4 q% b; V, ] - * 注意这里进程数必须设置为1,否则会报端口占用错误3 R5 c6 j p' j: e) q# J2 h
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
" V }# ?7 ?: }9 I - */) J+ w- a4 M+ y. Z3 K3 u1 J, D2 R
- $worker->count = 1;/ A# e; ?1 A7 q: A
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口" G. e5 _3 f% r, I: n0 m
- $worker->onWorkerStart = function($worker)
8 V& u2 U* D( z- V; ]2 p5 m - {
- B% i: N# j f* k - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ u. J A9 a# ^' t, r6 s
- $inner_text_worker = new Worker('text://0.0.0.0:5678');' h# L1 u% |0 Q0 T" r' x8 J# q
- $inner_text_worker->onMessage = function($connection, $buffer)- @# m1 \: b3 z
- {
& _ b% R" ^% ]( K. F8 y - // $data数组格式,里面有uid,表示向那个uid的页面推送数据% x! G/ ]& U9 R3 t0 u
- $data = json_decode($buffer, true);/ H' A5 k, b+ p% E- T
- $uid = $data['uid'];* D2 d, |% u" P; G
- // 通过workerman,向uid的页面推送数据
2 Q- }4 j" W z) F9 n - $ret = sendMessageByUid($uid, $buffer);
D+ d! C, o: x# v6 Q - // 返回推送结果
+ V2 V3 ?( l3 l9 U: [ - $connection->send($ret ? 'ok' : 'fail');6 h3 ~# h: A4 j" e, r
- };4 c9 t+ C, |# y( u- i: R* l" Q
- // ## 执行监听 ##
4 ^4 B7 Q9 y8 F7 @ - $inner_text_worker->listen();2 B1 ]+ \! c; G" b2 b
- };3 y/ r+ I, L- @$ h& C1 u8 o! q( i
- // 新增加一个属性,用来保存uid到connection的映射6 L3 h7 k2 Q7 ~
- $worker->uidConnections = array();
- d/ W" ?' u w2 T4 v - // 当有客户端发来消息时执行的回调函数
9 ^ U8 [8 M6 X2 V8 X4 H3 U - $worker->onMessage = function($connection, $data)' h! Z; t8 n6 D- K, i l5 n% @
- {
: l+ @4 i" o3 H+ ~ - global $worker;
& e. t) w0 n* c9 E% J1 f - // 判断当前客户端是否已经验证,既是否设置了uid
% V l9 T! b I5 _ - if(!isset($connection->uid))2 `8 C3 |# i1 ~1 E
- {
" J/ d& Q! [ u - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
$ E3 J$ O5 }$ L6 V' @, @ - $connection->uid = $data;1 N( |4 n1 h7 b L7 A: t5 l
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
& e9 l3 h4 ^, v' @+ r - * 实现针对特定uid推送数据
4 Y+ w0 ?$ P S6 Y2 C0 r - */0 L: \0 f) w: J- y& M9 k
- $worker->uidConnections[$connection->uid] = $connection; y5 m# U5 W9 t' ^7 d
- return;
; O C( J8 s2 [$ `2 E5 o - }
$ ]4 B- t9 B" N - };* `$ K6 A' {( P# m. ~
1 _ W U$ _% l3 R _# I( U2 W- // 当有客户端连接断开时
3 V6 t2 M, i3 [ m% n N - $worker->onClose = function($connection)
& q' k/ ]( R4 m - {
( e) F) M! d1 d6 L% [; I# P# x - global $worker;
9 o) _) |/ h' l: R5 k3 T1 n7 f- A - if(isset($connection->uid))
* L9 u$ S4 ]( @2 T, v E+ `: P - {- K8 z: d0 ?4 r8 q) p: b$ E$ D
- // 连接断开时删除映射
2 i/ i0 v4 L6 ^" a - unset($worker->uidConnections[$connection->uid]); x1 r$ V) i* c
- }( a( Q F& l1 t9 \1 A1 t* Y3 M4 W
- };1 i2 W; j1 d$ p5 k
2 D. U/ S- T f R( d- // 向所有验证的用户推送数据' d. P+ { ^/ P
- function broadcast($message)
1 E2 }6 x% ]( O+ t* `; |6 N - {
; [5 R' D2 C1 V' A+ g5 u - global $worker;
1 J- ~7 F9 i6 y l- y; S - foreach($worker->uidConnections as $connection)
1 B) @7 c: a1 d4 w - {' H# Q6 M* a, _2 |- w* b+ M
- $connection->send($message);# h/ ~5 O. u, x: w5 \4 Q
- }/ N' n: E' W; O* v% o5 |
- }
1 w: }! i' n$ R3 { - 4 r6 ~2 P4 r$ z
- // 针对uid推送数据8 `; B0 x1 ~/ i% Q" x# V
- function sendMessageByUid($uid, $message)/ a9 V0 ]& K/ M
- {
3 ^: u5 e. q2 O! S6 u# K - global $worker;( y. t" [/ V5 P$ l
- if(isset($worker->uidConnections[$uid]))9 ]8 T0 N+ _) ?# Q8 H, Q
- {
3 Q I7 l: |( d' Y! M0 s# x - $connection = $worker->uidConnections[$uid];
" q0 k. A7 _6 `# Q - $connection->send($message);
5 A* r5 L* _* R! C! g p - return true;! A- H: i- m5 S8 r
- }
# v1 R1 x+ ~9 G - return false;4 G+ o0 X9 {9 i
- }/ u2 b* S# _% L5 h# U
! X, U8 ^4 b& B) y3 p- a2 q- // 运行所有的worker
! x2 t$ V( J9 w9 p - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
0 R; Q/ ?0 m o& Z* H - ws.onopen = function(){" L4 F0 d1 w( C* p# t$ g) ^* A
- var uid = 'uid1';: ]2 W, M" p+ w1 ?8 _
- ws.send(uid);4 L9 z; Z# D5 s; j% ?
- };9 L8 E# w; M& H1 t) V
- ws.onmessage = function(e){
, ]9 B2 _+ P8 }3 J% u' l# @ - alert(e.data);
8 p! G8 E! y% n* F- C - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口/ r4 \5 [$ A8 `" g( D% J
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
$ n* w1 W0 ~# \ - // 推送的数据,包含uid字段,表示是给这个uid推送. H$ q* {. [- R0 k g- a
- $data = array('uid'=>'uid1', 'percent'=>'88%');
$ l4 c% D$ D4 R" E U9 c; P5 [" o - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符 @. K5 B3 X, V
- fwrite($client, json_encode($data)."\n");
4 ]9 X, x, b+ m# h; E, s - // 读取推送结果
$ c9 i% d: u, x - echo fread($client, 8192);
复制代码
# H; `" Q7 {2 z. f+ {
5 J2 Y& ~6 d0 W6 c" h |