- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;/ e* P t% r; k: k1 a! M0 x$ B
- require_once __DIR__ . '/Workerman/Autoloader.php';; N5 M { }7 Y# i3 N( }
0 ]( z+ d$ d/ D# m0 @- $worker = new Worker();
, N+ L9 W% r8 j - // 4个进程8 X8 C) L$ r( |" ^4 A8 u* F
- $worker->count = 4; n* V# f. v2 m$ c& q
- // 每个进程启动后在当前进程新增一个Worker监听& E% G9 @) ]6 R* B* V& E" K
- $worker->onWorkerStart = function($worker)
7 R" D6 N' |% `7 E' G+ g - { `; G5 U- {- v5 D& A2 ?5 O9 {' _
- /**
& r/ h5 b' w; c0 `/ k0 u0 c- d3 A - * 4个进程启动的时候都创建2016端口的Worker
- ?6 d8 ]6 ~$ T! r" h - * 当执行到worker->listen()时会报Address already in use错误$ u5 I% Q- t2 M9 H# _6 u
- * 如果worker->count=1则不会报错6 v& F! Y4 a: g6 R' `
- */6 s. P6 y0 n: q, n3 {8 A
- $inner_worker = new Worker('http://0.0.0.0:2016');( Q; t9 v8 b6 D" }4 \9 `' H
- $inner_worker->onMessage = 'on_message';
1 P) F8 k5 p8 X/ s - // 执行监听。这里会报Address already in use错误
" L( K/ q# e8 `& Q; b - $inner_worker->listen();) w* I Y- O& G
- };, L9 ?/ h# \) X- e* Y3 M A& Y
- $ J* z' S' H5 l1 n( L* s$ y7 ~
- $worker->onMessage = 'on_message';
, X: m! |7 f( f( T! z* \ - , V: ~2 a# q& h$ q6 g/ \# h
- function on_message($connection, $data)
4 Z4 @" J6 g' L% C5 e/ n" T - {( C7 e# s- w" a
- $connection->send("hello\n");$ d9 m% R- K5 i4 p
- }
6 O( U: i) ]' B- ~- ] - 5 Y5 m) F8 Z- c+ K8 D
- // 运行worker) R' n" W% U8 i t7 T( ]
- Worker::runAll();
: `7 ?/ O! A: z& H - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
; z) s/ e. ?( k: p* C
, B" J4 K- {* q! f% r- use Workerman\Worker;: m; o0 Q" p9 Z, c. S
- require_once './Workerman/Autoloader.php';
: ], `' `! t# e$ I3 \- {6 R" { L7 g
( \& f' H0 w2 M- $worker = new Worker('text://0.0.0.0:2015');- f& u7 V. L- ]( R. H9 \
- // 4个进程# G3 H* N8 S" X$ ]# O N! w
- $worker->count = 4;
$ X4 P2 v% }* V9 R: i8 ]2 r - // 每个进程启动后在当前进程新增一个Worker监听+ l# f( [% w2 c- @% X
- $worker->onWorkerStart = function($worker)" @1 B) s0 O& |! ~
- {$ i% m) z7 t" i# E( K
- $inner_worker = new Worker('http://0.0.0.0:2016');
1 U7 _9 N2 {, u* T$ X, q, K( z) c - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
, L0 F% [! b( N! e# ] - $inner_worker->reusePort = true;
/ C2 B& D/ ~& M - $inner_worker->onMessage = 'on_message';* ]; O, \/ w* N% p1 N/ k
- // 执行监听。正常监听不会报错
9 S. U! S: E' } - $inner_worker->listen();
# ?$ B2 O! d; D `0 `! E- a - };: S J' }. y3 `: O" Q. m7 [
- * e% G# S8 T' q' {5 d
- $worker->onMessage = 'on_message';' q9 _; C+ }# M' j! ]
- 4 g2 C g8 I$ t' T, x" P! P: g
- function on_message($connection, $data)
2 q/ N* \* x& b+ k - {
4 D; m) D% i. L+ e - $connection->send("hello\n");
7 S, {4 w# y& C; o - }
1 q- y6 U9 U% K0 e- [
* ~% p9 |6 c' U) r3 e- // 运行worker! j4 i1 J$ {4 M
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php' Q! ?0 K% Y& ?, k
- use Workerman\Worker;
& H$ l* u* d0 S% L2 d3 v: g7 U - require_once './Workerman/Autoloader.php';" N* q5 h n. k9 S
- // 初始化一个worker容器,监听1234端口. g7 o9 R. s; `. S1 ?; H7 J0 E6 i, Y
- $worker = new Worker('websocket://0.0.0.0:1234');
; C" }8 C% c. v2 b
' U$ `" V2 _6 n# I6 D; \- /** ]0 s8 z9 h% ?9 i
- * 注意这里进程数必须设置为1,否则会报端口占用错误4 T" i. A' C1 T8 P# n3 \6 z& w" e3 E+ J
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)/ o9 J2 e. ~+ e, G# X$ O
- */5 A' v% G/ i9 I) C2 ?5 k
- $worker->count = 1;/ L2 c% F4 B2 |" P8 c3 |& r" [/ Z
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口+ _" s( t0 j9 s! U6 d, }5 ?- `
- $worker->onWorkerStart = function($worker)
# q- C7 Z# @4 U% p+ z - {* D( ?6 D% o: F7 B5 U; o3 R, W
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符" y" `: k. d x8 U
- $inner_text_worker = new Worker('text://0.0.0.0:5678');9 ~# i. h5 c( N* e+ J4 J
- $inner_text_worker->onMessage = function($connection, $buffer)7 h: o1 S, O3 O8 {" i( W
- { {" R" Z- G9 x- q" i4 A
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据5 v3 F0 G" x- Y& B1 L. @/ T) K5 P
- $data = json_decode($buffer, true);
$ B+ U: {$ `9 H5 e- l - $uid = $data['uid'];( B. K( Q6 C& d
- // 通过workerman,向uid的页面推送数据
5 K6 ?4 F, z/ C( N% _* h - $ret = sendMessageByUid($uid, $buffer);
7 _* H/ X4 l: J) ^/ y - // 返回推送结果
) N( F& @$ U: X8 k" l - $connection->send($ret ? 'ok' : 'fail'); `5 _+ E4 x' U8 s: V
- };
' P, h2 [6 a) K" d - // ## 执行监听 ##
9 N; j* C* {! {' J - $inner_text_worker->listen();
4 `$ X) R; A: I5 Q; a, l - };# I4 i) {, o! i8 [
- // 新增加一个属性,用来保存uid到connection的映射) C$ v* @8 G; R8 b8 a2 e' {
- $worker->uidConnections = array();
( |% R; Q( c4 m( R, \7 q3 ?; F6 z0 a - // 当有客户端发来消息时执行的回调函数- u$ r5 d# Z0 v4 r
- $worker->onMessage = function($connection, $data)
. t" L7 s6 x4 s! a4 e( h5 L - {
1 M% ~8 B/ X9 a! Q) ~6 Y8 H4 P - global $worker;4 N0 S) R8 ] t
- // 判断当前客户端是否已经验证,既是否设置了uid2 R! ^# q, ?: K9 r
- if(!isset($connection->uid))
0 U; j. B% Y7 x* Y0 F; X1 ^* T - {
, Q; C, x. n9 ]# U) H - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证) T: q: V8 k# `. }
- $connection->uid = $data;2 _* m) G$ M- o% w" H% I$ W
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
9 c* Q: R0 [$ D$ {$ w - * 实现针对特定uid推送数据
$ c/ N! j8 l+ J7 E' b - */
/ k1 b- _! G/ u( U; V/ e k2 f - $worker->uidConnections[$connection->uid] = $connection;" u2 d+ V- R1 P' J5 R, d7 c$ G
- return;
! Y% M$ w# B1 T+ p - }" U& |( _ X/ X$ W( b
- };. G/ |1 X8 O v- z
1 m* C7 K0 |5 t! [# a" E4 I, N3 }5 e- // 当有客户端连接断开时
( l2 b& O& `' c9 L$ N3 \/ H - $worker->onClose = function($connection)
/ ^) [( J) d" K: e( X - {' s: [3 S2 `& z) Z$ O1 F# g
- global $worker;
$ p4 w6 p/ g, l& X _/ d - if(isset($connection->uid))8 q/ w0 g( F- g# Z* ?0 Z0 `. Z
- {6 g* D; `/ T4 g* k( B' E
- // 连接断开时删除映射4 ]/ x' y- N, z0 r) k4 T
- unset($worker->uidConnections[$connection->uid]);
, P& [. Z) a# }$ j1 W- z/ L - }
5 d/ ^4 x$ a$ b% i, Q - };
! O1 H( l0 f; a# W
B4 D F! l& S. x* n- // 向所有验证的用户推送数据5 M O( {8 G3 w3 j9 f, P q
- function broadcast($message)* i& Z; R( h" C+ a5 f5 [2 d
- {# e# a+ \4 ?% i& X* T3 o! X
- global $worker;* }# b+ r: u, W d2 o! G& U8 k
- foreach($worker->uidConnections as $connection)& B- o3 F, @$ k) z' Y
- {
3 j6 J$ _' q# a! c e! v& j! g+ u; s - $connection->send($message);
3 a. V0 t% F0 J$ C) ?: Q1 ]! L - }9 Z/ ?4 ^) O& t1 M2 j
- }! ^$ Z% ]' b" H; q
- $ P0 {+ Y$ W+ z4 s
- // 针对uid推送数据! ^( r& q- K/ M6 _1 k
- function sendMessageByUid($uid, $message)
1 `$ X1 ?8 \9 L8 p9 H& {5 p( D - {
( F2 N1 A) ^. M n+ ]) m: N6 [: o - global $worker;
6 r: y5 {5 C9 a9 j; F5 n - if(isset($worker->uidConnections[$uid]))- B! I9 r/ M8 {' Y4 i. c9 j% U
- {
; h8 N- |1 F3 ^7 z+ `" Y5 j: ]! s - $connection = $worker->uidConnections[$uid];
$ O, [9 h* b! S - $connection->send($message);) v, ]) r8 }% Q& P; \
- return true;# G- p: j. |, T" J& R, e
- }4 n1 h# N" T. v# }5 c4 _1 P3 e
- return false;
( L7 K& w: X/ U2 Y* ]& u ]1 W7 R - }
+ A0 J a( l7 E5 z/ [
: y" ]+ b8 |9 a% a0 e ]- // 运行所有的worker
! x0 y# W8 R% i( i+ X' E - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
5 x2 @3 @ w# c* ^8 A) s - ws.onopen = function(){
& H5 y/ h; D+ |* P6 d+ L - var uid = 'uid1';
- v+ L7 r& j2 e3 B - ws.send(uid);* f8 G3 q6 L0 Q6 c. D9 q
- };
, L, p1 g. h2 Z% P - ws.onmessage = function(e){0 w% S+ V9 [. ]7 S5 O
- alert(e.data);
. `, A/ p/ Y; s, X' G - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
: ~# D% R& z# K: h: t+ R x# N. ~ - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
) g. ]4 j/ ~- m- L7 a+ I - // 推送的数据,包含uid字段,表示是给这个uid推送
; W' u; f5 p, j+ p$ J J - $data = array('uid'=>'uid1', 'percent'=>'88%');
3 u( B1 A I+ c2 Q8 C& L! T - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符+ L7 r3 z; A, s) ]4 P) V
- fwrite($client, json_encode($data)."\n");
' L' h# i+ w9 }3 @( k* ^1 ], u - // 读取推送结果
+ W2 T7 P) M+ x2 i1 z' W4 L - echo fread($client, 8192);
复制代码 0 e7 D( [0 j; J
5 p9 T; v9 j! J0 J) k. r
|