- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;/ m5 c7 }# j4 X6 ]/ N* h
- require_once __DIR__ . '/Workerman/Autoloader.php';
* ?" v9 y) F& G! Y3 l' X - - [) Z0 V6 ]6 J! l4 S& j
- $worker = new Worker();
* v* z& v, \/ k7 M6 F - // 4个进程
, M* \8 }8 i" h5 m' S - $worker->count = 4;' e( a9 ~. {4 p/ j; O
- // 每个进程启动后在当前进程新增一个Worker监听' S# c; l1 [4 c+ \0 `. T
- $worker->onWorkerStart = function($worker)( _7 U" K. m& V V3 `# H' E$ j
- {: j" S, B/ a4 f' O) @
- /**
0 ~) ~/ L H: v, a1 b - * 4个进程启动的时候都创建2016端口的Worker! {6 d3 Z6 Y6 h* I, X
- * 当执行到worker->listen()时会报Address already in use错误
9 J# T$ D; C1 T1 m) S, ?, h% s c - * 如果worker->count=1则不会报错; V. Y7 r5 X' D: K. t8 O7 C
- */
. d- g; @7 ^4 O - $inner_worker = new Worker('http://0.0.0.0:2016');. [, g2 }9 @9 V
- $inner_worker->onMessage = 'on_message'; C) f2 Q5 R6 _9 t" F3 J" J
- // 执行监听。这里会报Address already in use错误- o. F, f0 u) s% Q
- $inner_worker->listen();
9 G! N- }3 I4 M& m2 k" b6 u/ s4 t" D - };% \/ I0 o" z" A
- & [0 w7 o: q* k0 A% H
- $worker->onMessage = 'on_message';3 X* c, |1 N: @4 ^1 w4 [1 m& F- c
( e- ]( X6 I5 B3 C: r1 ~7 I( @- function on_message($connection, $data): G( y& C7 S" m1 A
- {, s1 R9 q" W6 f0 e, i. |
- $connection->send("hello\n");
8 \, Y, p; o: e* m7 C& B q4 ^! R - }7 f9 Z0 p8 @# {2 j& e+ ~' R
) @: ]# E% L* E- // 运行worker. |2 m8 u8 j% [0 v. B; J
- Worker::runAll();, I9 J- \+ B, T
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
" f( L8 C1 @. U7 ?9 ^1 \ - 8 ^ \3 @# b2 Q8 `+ ?4 o
- use Workerman\Worker;
7 W* S: [* H. |& t% h- r - require_once './Workerman/Autoloader.php';; D! q! @8 [$ x! [+ [
" T9 s! t* d2 y) s# L% }- $worker = new Worker('text://0.0.0.0:2015');2 |" q/ q" y8 m8 P4 {, {5 c
- // 4个进程
$ j& Q4 X3 g9 Y - $worker->count = 4;& \# e9 E8 R' W; D5 P
- // 每个进程启动后在当前进程新增一个Worker监听
3 X5 G' K7 S/ P9 i1 j( r - $worker->onWorkerStart = function($worker)! v& p% }& k5 H7 i3 R, {
- {% \ a& A( P. M+ `% C
- $inner_worker = new Worker('http://0.0.0.0:2016');
- s5 K0 l6 v$ P, A4 \ - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
( g3 O0 D: F/ w K# ~5 C) j- i - $inner_worker->reusePort = true;
$ h7 f6 z7 _& M O1 _( ? - $inner_worker->onMessage = 'on_message';
# [5 d; ?" f+ L# W" x - // 执行监听。正常监听不会报错/ X9 l' W: d' i% _3 x; d
- $inner_worker->listen();0 v1 E" N% L& C; E' G
- };
: s" s# O% ?7 L7 w - " [0 k$ p. a) o- e* _+ {
- $worker->onMessage = 'on_message';' H6 l7 x7 ^; ]4 g# J; P
- / X! o [; \2 K: {3 m9 `
- function on_message($connection, $data)
* F \/ \7 Z# B# {' p" K5 I* t - {
( p7 P* L- l5 y, ^2 p% x+ [ - $connection->send("hello\n");
) N7 k* _8 k2 ]! ~) x - }, L& r; Q. b- ~7 r9 i3 s0 y
- 8 F8 q# W% D3 }$ ~& ~( o
- // 运行worker
0 t; g6 \6 u* z& t. x6 z - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php& P8 {. T+ H/ s& x6 j& Z& y; l
- use Workerman\Worker; b9 G* B3 Z: E7 c* ]
- require_once './Workerman/Autoloader.php';" F$ z" k) {: L
- // 初始化一个worker容器,监听1234端口
1 A" Y) {, W" r& B - $worker = new Worker('websocket://0.0.0.0:1234');
- R7 E$ f/ K# T0 Z, S. ^& ~
) O' u1 I9 [2 T) x4 m2 E- /*
- w1 i( W- t, S3 t+ F - * 注意这里进程数必须设置为1,否则会报端口占用错误( j5 V C5 E: F) n) i) T5 b; |# v1 e
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
\7 Y# Y9 ]( d - */
U: P2 y7 A" {1 A& Y - $worker->count = 1;: v* G6 r8 `" {! `
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
. `/ m6 Y$ i* e, @$ e$ G3 n6 W - $worker->onWorkerStart = function($worker)
/ V' f( x8 p3 u% M8 o& @! Q: o - {- t3 U5 K A3 C2 I- T1 z/ _
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
- U7 R. Y" Z- @ - $inner_text_worker = new Worker('text://0.0.0.0:5678');6 v( v. `/ g% ]& s
- $inner_text_worker->onMessage = function($connection, $buffer)) S5 I" [$ B! ], v" A
- {
( a, C$ ^. _4 b" y) Z - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
0 X5 e+ T* H B3 w! k5 E - $data = json_decode($buffer, true);/ P! x! i. ^& n3 ~' l
- $uid = $data['uid'];% d( o- S$ c: K; H, `6 {
- // 通过workerman,向uid的页面推送数据5 S1 ~7 F! Q1 m
- $ret = sendMessageByUid($uid, $buffer);) o4 z8 _7 ?0 V0 {% g
- // 返回推送结果
2 b" u3 M! r |! ` - $connection->send($ret ? 'ok' : 'fail');1 ~# K- u( U9 }0 x: x
- };
9 p: V( G% V3 e3 c2 y4 q3 Y - // ## 执行监听 ##" {/ z' m; Z3 v5 @7 e
- $inner_text_worker->listen();
2 n0 g# d p5 K/ T3 e0 f5 L - };3 F9 c( Z! d! a8 B+ o: f
- // 新增加一个属性,用来保存uid到connection的映射
5 P# U# k0 n# u1 s; R; u - $worker->uidConnections = array();" D$ h7 b- w* ]2 M) o
- // 当有客户端发来消息时执行的回调函数) x8 f" y2 j# N, S1 ~
- $worker->onMessage = function($connection, $data)
7 E% a4 ]7 L1 f n4 I! u - {
6 G7 ^3 Z6 [8 d( v7 C+ @' y { - global $worker;9 }# N2 l! }5 {
- // 判断当前客户端是否已经验证,既是否设置了uid
1 ?# o1 e$ |0 N, Z - if(!isset($connection->uid))6 i; l# @8 b1 o" E
- {, H5 i, E) k6 s/ |+ m
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
+ h! M( m' X5 v+ U+ a9 [ - $connection->uid = $data;
4 e2 ^7 o: O3 X9 y* W0 Y - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# l* b! Z0 R* V9 h/ K! Q- P
- * 实现针对特定uid推送数据; G b. e$ S8 i9 e4 d2 P- q& Q
- */- d& r" I4 l/ |# }' Z4 g1 g
- $worker->uidConnections[$connection->uid] = $connection;
; \0 _7 p2 r' U1 f - return;
5 f+ ?" J% k/ Q5 l0 i9 R U - }
7 N3 v! Y3 E5 ?4 n' ] - };
/ u/ T: ?9 B, r! f+ e6 X R2 j
4 T8 g2 L5 p! h) S+ ~- // 当有客户端连接断开时8 I% P; R' f' Q
- $worker->onClose = function($connection)0 V( V9 ]. B8 ]) K/ \$ x8 i% U
- {
" a t1 g0 K8 k9 e& c - global $worker;! n* |& R- j, @* f
- if(isset($connection->uid))
6 Q: G9 ~7 _: H% u/ p* V - {+ u" x: }) q1 l1 ?0 X6 r/ g. P) w3 ?
- // 连接断开时删除映射) y: `7 Q8 O0 |/ g1 r4 S+ f( j
- unset($worker->uidConnections[$connection->uid]);6 z: i% p# l: V. _5 s
- }$ r9 h: U% c+ y5 q
- };
. @0 a1 _% S4 [* N
' u0 {4 @5 H2 d- // 向所有验证的用户推送数据. R7 r. G7 D+ o8 f( h
- function broadcast($message)% T$ r/ n# C6 g! ~0 M2 e/ k
- {
" a- R) N. s. C+ e( [ x8 E - global $worker;
& _* k% W) H7 @$ K - foreach($worker->uidConnections as $connection)
8 h, a' {8 Z: i2 C - {2 X: r/ c2 M8 f4 f r& R
- $connection->send($message);: ^- h7 @ ]) [
- }/ b+ j: ~3 E Q* q. C6 p
- }$ T0 M) o7 u7 I' c
- 8 O8 h. b. U! s! e3 _7 r% h
- // 针对uid推送数据
8 A. H' h% ]( F- D; _ - function sendMessageByUid($uid, $message)
: j# |) ~" i& X$ g% ]6 d( J - {4 O: {6 Q+ ~4 g) w; L
- global $worker;- V% N# l ~# S, `) {
- if(isset($worker->uidConnections[$uid]))
L9 P: j9 C+ Y: ~ - {# O/ v% t6 ]' h; c, V W# H6 b
- $connection = $worker->uidConnections[$uid];! K* T4 S0 I! @1 C
- $connection->send($message);. v4 g$ s8 ]0 _9 Q
- return true;3 c+ q/ D, d' W4 ?/ d
- }
1 O" f/ w2 H+ u8 O' M7 Y - return false;8 f( p2 N) K C8 }0 u: ?. n4 y
- }+ v1 [: g# V1 M+ k
0 d% O* t+ t6 @! v- // 运行所有的worker
/ Q! I5 s6 [5 _/ h8 l - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
! g& Q2 l4 {& z3 ? - ws.onopen = function(){
$ X5 s ~7 A7 e+ S1 v - var uid = 'uid1';- c+ @; ?* Y1 a {+ Q; P
- ws.send(uid);: y$ M! X# R0 C& V
- };+ \+ k9 X) ~3 W4 X" @/ j
- ws.onmessage = function(e){) Q& c' e( W1 _9 `' J
- alert(e.data);5 l# Y0 f% L3 x. Q' I5 E/ ? X
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
: `; w7 @9 P: ~7 M2 |, t - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
2 N$ t4 j( H5 Y3 a/ }# N7 O0 x7 ` - // 推送的数据,包含uid字段,表示是给这个uid推送
3 Y* ]; N2 f/ X$ C" ` - $data = array('uid'=>'uid1', 'percent'=>'88%');/ o3 J: p& J! _1 m7 L3 a( V3 m4 \. ~
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ s$ P3 @9 x3 @/ w5 U$ w( g p
- fwrite($client, json_encode($data)."\n");# c4 H6 } u" c0 q) [# j
- // 读取推送结果' ^* X( ]' p( \ o. j
- echo fread($client, 8192);
复制代码
" p! J7 r7 {: q- v/ A0 d8 M' J% A! _
|