- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
0 s5 g% F8 ^, w; z8 w - require_once __DIR__ . '/Workerman/Autoloader.php';
' K: f9 t: d7 |9 B" J" d) U - / `8 Y( @& g4 A% {3 k
- $worker = new Worker();
& n; v1 t( }/ f5 h$ F - // 4个进程3 k5 T5 m$ ^" I% R8 Z
- $worker->count = 4;; y! S J8 l: J1 K# \4 K0 B
- // 每个进程启动后在当前进程新增一个Worker监听/ O$ y: a) w. G4 F1 _! D
- $worker->onWorkerStart = function($worker)6 }# U, g( V2 M
- {
3 s/ F& ?% p& Z7 \/ B: C - /**
1 G! J! i0 F# g# ] - * 4个进程启动的时候都创建2016端口的Worker
5 m# }& n. n# }/ W7 e - * 当执行到worker->listen()时会报Address already in use错误
5 }3 Z r: u" Z; f - * 如果worker->count=1则不会报错5 M& \) S G. ?! D9 P C- B7 ?4 V
- */
?7 q$ t/ r: t1 A( A3 O4 F/ P - $inner_worker = new Worker('http://0.0.0.0:2016');
" {2 `; D2 Z+ k% N - $inner_worker->onMessage = 'on_message';, n! d) _- b V& b6 {- u- ^# h6 _% C
- // 执行监听。这里会报Address already in use错误
- \0 v8 l5 r6 K! j* ]% p - $inner_worker->listen();
& K R- j) O2 ?2 u - };3 t6 i; v" x! |4 a- {- z4 m
; H" Q" _( i% y7 l3 b- $worker->onMessage = 'on_message';% w+ Y2 @) C$ K) U& N
+ V" T. |5 x5 N @4 v" x& }- function on_message($connection, $data)$ e+ K* k, p0 X5 a: }
- {6 U" e# X- d' K! \& @" E( Z
- $connection->send("hello\n");
0 E3 S# h! K; P. Z2 j4 V9 M8 k - }4 }0 a2 Q9 R/ A
- 4 N8 O3 K K, t9 p3 ~9 C
- // 运行worker$ `0 B; u- ?+ ?2 g4 f: {9 Y9 B
- Worker::runAll();
n2 R1 Y# w/ v" i+ Z+ C - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
! \+ ?# e2 @) x& q8 L C - : ^- }6 m( s" f, ^8 H' q
- use Workerman\Worker;: u* n7 g- S7 L
- require_once './Workerman/Autoloader.php';
S" N% t6 W8 U, o W( L8 s* ]% K* k
: ? g! m. _, o; z9 s! M- $worker = new Worker('text://0.0.0.0:2015');
! e6 B7 V1 E, `" a$ }/ v - // 4个进程; y i/ t4 K& K: j1 [! A- S
- $worker->count = 4;: a) |- t3 Y9 ?3 J
- // 每个进程启动后在当前进程新增一个Worker监听
% u2 X7 b2 D; w4 d1 n( {- c - $worker->onWorkerStart = function($worker) I/ D0 H' @3 K) k* j: @! g& c
- {
. e H9 Z( a. z7 P$ [ C8 x - $inner_worker = new Worker('http://0.0.0.0:2016');
, u- a' m2 V- f' b" l6 y - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& h) |1 s# Q( l/ D, L5 K
- $inner_worker->reusePort = true;
9 d1 v: s( ^4 c9 ]4 w- q4 n - $inner_worker->onMessage = 'on_message';- [0 I; C4 q/ @& m
- // 执行监听。正常监听不会报错1 \# h8 I+ h; j X2 o
- $inner_worker->listen();8 O) n6 B1 M4 ?" U' t
- }; i5 E6 W! G* X) D
8 a5 N2 S) X5 J# z) G, s0 T5 `- $worker->onMessage = 'on_message';8 }6 C, Y4 o9 i8 x
- 0 E; Q' ]$ f) j [
- function on_message($connection, $data)* ~( @ F5 w3 ~' j/ V
- {" b$ E& K) v9 E9 e
- $connection->send("hello\n");1 e7 {2 G. n( w; p* ^
- }5 Y. \# K9 I- k8 R! ~! A
- & z1 D7 |" z, V( r3 L3 {
- // 运行worker" N1 b3 Z$ q6 I5 B6 G
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php: p Y% U1 z7 k( t$ Z+ o6 i" A
- use Workerman\Worker;5 w) v( M; E: r. w% \: P7 x0 A. f5 \
- require_once './Workerman/Autoloader.php';0 g8 ]" h# I2 d, x" Z7 a- y
- // 初始化一个worker容器,监听1234端口
9 \ P3 W' E/ f7 _; ~" D% m - $worker = new Worker('websocket://0.0.0.0:1234');
8 l9 W0 G7 }0 w# O
7 f" E; q: o& b" b/ p- /*8 F1 w# n; u9 S# O3 D D* d
- * 注意这里进程数必须设置为1,否则会报端口占用错误
& q7 Y2 M9 C& \; l - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
/ D# A& \) z6 G7 g1 i# W* k - */
( q2 S( h: F' g7 `+ G, M - $worker->count = 1;8 X) l8 L" A* r! E
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
1 ]5 K9 h) n% Z c9 [+ I( v - $worker->onWorkerStart = function($worker)
8 \9 z' x- p2 s, f - {
+ o" k( P; L; E - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 J$ J. b, A! ^3 x+ J0 u
- $inner_text_worker = new Worker('text://0.0.0.0:5678'); U! g% x5 e- `$ L: u
- $inner_text_worker->onMessage = function($connection, $buffer)9 c8 l N) G$ W' r6 Y
- {7 U9 ?- J3 F4 b" D0 |6 N. n) b, ?
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
$ c; E# o3 d4 [' h% d - $data = json_decode($buffer, true);/ @9 d p0 \! `" K+ |+ a# n
- $uid = $data['uid'];
0 {% R q! ]& S - // 通过workerman,向uid的页面推送数据) ^" @' k1 u+ I1 o
- $ret = sendMessageByUid($uid, $buffer);
" a7 e9 c7 c. N0 W5 Q$ I% S - // 返回推送结果7 V( n1 j$ ?+ e. A" O
- $connection->send($ret ? 'ok' : 'fail');
- [3 B( a: P) S( {+ j9 G$ @$ |" V - };# W$ C. E" y5 o# q: A
- // ## 执行监听 ##* \+ r8 L4 u6 H8 o
- $inner_text_worker->listen();+ s! N( ?, R* _' n4 O, u
- };
( X4 t5 ^5 H8 f! | - // 新增加一个属性,用来保存uid到connection的映射5 T, z. d( z2 f/ }" d2 {
- $worker->uidConnections = array();" X/ N5 {& C' g* S* W9 w
- // 当有客户端发来消息时执行的回调函数; E" h7 k z5 L0 M* l, Y- ?
- $worker->onMessage = function($connection, $data). }* N2 x4 E8 l! T
- {; P3 l; R3 Q9 C( ^
- global $worker;
7 i6 |2 y/ H& V% v7 [( ~' Q& ?7 C/ i8 C - // 判断当前客户端是否已经验证,既是否设置了uid
( I& m' A- f D( C - if(!isset($connection->uid)); z/ Q: x9 R' {3 V
- {( y7 W5 v2 q+ U; L$ } Y1 ~7 B
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
% h1 r/ m8 X9 j, j" P& y - $connection->uid = $data;/ l6 Y# q, V1 w' X7 R. P* u6 d
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,& ]3 C% z/ C4 ]
- * 实现针对特定uid推送数据$ `" K# e+ ?* @' ]; E
- */
+ u5 h9 v5 K+ h( H) i/ Y( a - $worker->uidConnections[$connection->uid] = $connection;
" E; L4 @$ ~6 Q; O) j - return;
- H' L( Y7 K8 U1 ~3 c6 s% n - }
5 L7 M9 m, N/ m# c) `8 E P - };4 v0 `; ?/ j5 _: A# s
" {# c( g" }( T: U- // 当有客户端连接断开时% x% F! K2 s. P8 M9 P$ k2 m+ L
- $worker->onClose = function($connection)+ {0 Z, }8 S% g/ \9 x' Y" x( A
- {6 Y, o4 J4 f0 A) [* b. t" l
- global $worker;2 x# o. M, u6 G/ k" w
- if(isset($connection->uid))
/ B+ d) C2 {# A+ H - {7 C2 Q9 B0 N2 @; O% k) z0 ?
- // 连接断开时删除映射
1 ^; |# W: J6 m* B; B% p- T9 \ - unset($worker->uidConnections[$connection->uid]);! F% t: V& A" Z7 g. _& u: a
- }) `+ Y% S* B) ^' [% Z
- };" I: J0 o% _. K! h
- 5 b# w w% ]+ [2 i5 t' i3 g
- // 向所有验证的用户推送数据+ X2 N8 }* ^1 a! j" S
- function broadcast($message)
, U/ w" o# ]& |8 N+ o - {5 K4 X, E$ N$ z7 e) W& D
- global $worker;
/ F* V6 s. F' } - foreach($worker->uidConnections as $connection)" S& Y3 o$ s. c8 o4 F2 ~! x
- {1 P% [3 E! B7 a8 k8 L- j# [
- $connection->send($message);
3 q8 A" O8 k1 b" N: X; u0 v+ L+ | - }3 Z/ M3 r. Q+ x
- }
" q& Q! { u6 m) z
7 x2 q4 F% j, `: T& u( D- \- // 针对uid推送数据! I7 W: P; f3 Q+ G7 f
- function sendMessageByUid($uid, $message)
% S# A5 Z% t* g/ D - {, V( @4 X, K0 r- O( T1 B7 X0 T2 G! i
- global $worker;
- ]& ~ e& C% o; f) T" ^3 S - if(isset($worker->uidConnections[$uid]))
; v( O# N0 e) i9 Y0 X2 y' \% s. Z - {
4 _" {4 W% e2 d) @ - $connection = $worker->uidConnections[$uid];) |( m; r( r8 K; s+ o7 @
- $connection->send($message);
% J1 t! e% ~' C- P- u8 u& f; { - return true;+ E6 ]1 `6 e) X7 w$ }( J
- }
8 \+ t' m, L9 R5 V0 ]# f; b9 i) W - return false;; ^5 L" d `/ ]1 S& w
- }
! u& o6 A" G2 d: s - 6 k6 r; w0 F/ U$ T9 g; _% C: O I! A* E
- // 运行所有的worker
- v4 s& V% n; h - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');9 W4 Z; b8 P1 n) v6 e3 k
- ws.onopen = function(){3 p' U* v, e) @; n
- var uid = 'uid1';
. r v5 q- N3 ~* S! O - ws.send(uid);
! ^* F+ w( j! I6 _) V% ^1 _# P9 [ - };9 S f2 g' P+ F
- ws.onmessage = function(e){
" h% ]1 D2 H7 r. T, o - alert(e.data);9 P: K0 c e, B+ e# j
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口8 X1 d% a0 @+ l) d9 |" ~1 I
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
, A4 R% P5 Q4 p3 d6 b - // 推送的数据,包含uid字段,表示是给这个uid推送; e3 I7 m' Z9 K$ }) L
- $data = array('uid'=>'uid1', 'percent'=>'88%');
8 @8 ^" P' }: |+ x1 Z! v - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符9 S ]* J! V1 G7 Q5 u m5 b
- fwrite($client, json_encode($data)."\n");
5 m3 @$ a. w* _; g3 I1 r% E4 s - // 读取推送结果
: `, h6 w+ p% ~- P; ^ - echo fread($client, 8192);
复制代码
$ J! r& r3 Z3 F5 u5 _8 b. \7 j# o! m" Y% ] D
|