- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;) P0 F1 u+ G. `5 L* o
- require_once __DIR__ . '/Workerman/Autoloader.php';
5 Z1 E4 { ?$ d( E - . c( l( ]4 |8 Z& t! F
- $worker = new Worker();# {4 d2 q& E5 d: v1 i, M# M# d
- // 4个进程. q {! Y# @" I+ | }6 C: U7 M; o' }
- $worker->count = 4;9 a$ ?) E6 v8 k. h9 ^ P! u2 F
- // 每个进程启动后在当前进程新增一个Worker监听
, r( n5 `+ X& x5 b" x- h% ? - $worker->onWorkerStart = function($worker)
7 c: ~, Y7 C. z& Y - {
9 b( O; E+ ^) `6 b2 S* }) M- g - /**- o$ B8 S' w7 p; D0 p. e% U5 R; \
- * 4个进程启动的时候都创建2016端口的Worker) d+ E- ~0 d( f8 k+ m, X
- * 当执行到worker->listen()时会报Address already in use错误+ u" R4 h: M% h* Q6 W6 ~1 V1 v! A( E
- * 如果worker->count=1则不会报错/ t+ _5 ~: l2 M! c: s
- */, V! X; j4 n) R
- $inner_worker = new Worker('http://0.0.0.0:2016');8 m& }. r6 F0 v2 a' y }2 E
- $inner_worker->onMessage = 'on_message';, P2 Q4 p8 c2 i2 p& O/ Z6 D3 w" H9 q4 ]
- // 执行监听。这里会报Address already in use错误
3 { j) e( L8 L3 u3 b) i - $inner_worker->listen();
* J" u! u' k" |. W2 r - };
5 P$ j* u5 F. W5 J1 k8 b
! e/ R) E1 o5 H- $worker->onMessage = 'on_message';
; ^" `; L; ^! r
3 v3 O9 `- n; v) {9 B* T: n- function on_message($connection, $data)% ]! I" }% j4 L6 f* J! j' E u
- {5 C O( r, e7 |0 f
- $connection->send("hello\n");8 D- U6 P2 K5 Q c$ I) V/ B
- }
- t' ]# C q$ x, X) a9 @& x; B
# O, M0 p, J7 E$ V- N( T- // 运行worker% s. e7 m: t3 E
- Worker::runAll();# u4 ~$ W) U0 e* K
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
. [2 o7 `* }1 N9 C" F - : }1 i6 D$ [$ R8 M- F* w6 g
- use Workerman\Worker;
/ d+ ^% |! Z4 `% `9 A - require_once './Workerman/Autoloader.php';
: }& {2 c K' q9 |5 u
8 _& p& M% B$ C ~' L- $worker = new Worker('text://0.0.0.0:2015');
+ G) L- y9 B0 i7 E5 ] - // 4个进程 t) l* y0 U3 t# e. V
- $worker->count = 4;
' c9 D) F$ W1 j6 K - // 每个进程启动后在当前进程新增一个Worker监听
4 `( J; J% _: T5 Q3 e - $worker->onWorkerStart = function($worker)
9 V1 j) H) ^% ]! G - {
4 H* }/ a$ `/ _ - $inner_worker = new Worker('http://0.0.0.0:2016');7 [( M3 t; Z( R4 N
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
; I1 N1 q% K4 f* Z- W) b - $inner_worker->reusePort = true;+ A4 j6 B! m, X
- $inner_worker->onMessage = 'on_message';
& w( O3 l# u i1 n5 ] - // 执行监听。正常监听不会报错
% n+ {9 C: m" l) H2 G - $inner_worker->listen();
% |/ a% f& [ Y, z; I. @ - };, P! A$ Q; P" T# v6 q/ Z
- 8 C* ]+ I' ~% l
- $worker->onMessage = 'on_message';8 _5 X- ~2 [, d/ X- E! }& w# i
1 o6 q2 h7 j' G9 j9 D3 E8 X- function on_message($connection, $data)5 Q2 |4 C! ?: r
- {
2 s3 F, F6 [8 I* ?' k+ v: f: G, } - $connection->send("hello\n");
* x- u" g7 r4 U) t) d9 b - }
+ ~5 g, \' r' Z( n( d8 a% \& y+ F, J
# H: \+ B8 `" @# g# ?- // 运行worker9 P" Z8 w5 t( G6 h9 [' n* ]
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
" e* s0 s: L4 Y. c9 M5 X4 X - use Workerman\Worker;
$ t% o. T- s4 o+ i8 N3 ` - require_once './Workerman/Autoloader.php';4 L. k5 ^+ u; o, l" ?3 P, E# @
- // 初始化一个worker容器,监听1234端口
* p0 V5 }3 v! B" V - $worker = new Worker('websocket://0.0.0.0:1234');
- k! Y/ Y! m, z, p) a+ q. A/ ` - ! ^4 A& s) v0 t( D
- /*+ u; X. g) |5 F o
- * 注意这里进程数必须设置为1,否则会报端口占用错误
' v5 D$ o2 ?' { v - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
9 }( U- P7 z- a8 m; i. N5 ` - */
& T6 E4 ?' }7 N' D7 x - $worker->count = 1;
8 U0 J. o6 y6 a - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口! }' [7 @3 \& t+ p0 H
- $worker->onWorkerStart = function($worker)# s! d9 C. q/ @ ]
- {7 V* G" o, j# P! q
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
! { [3 K4 J( y. ]& Q2 c - $inner_text_worker = new Worker('text://0.0.0.0:5678');; q- E8 p$ R! I
- $inner_text_worker->onMessage = function($connection, $buffer)
% z9 u! n2 H) K2 K/ N! ? - {6 E3 Y Z: V8 H4 f
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据3 s6 n9 Y' z" P& Z
- $data = json_decode($buffer, true);
+ c. b7 D( J% X& T9 M: b% J2 z2 ]# R - $uid = $data['uid'];9 D# [6 z/ M, f$ {
- // 通过workerman,向uid的页面推送数据
0 I' L* u3 Z ]) p) ^' ` - $ret = sendMessageByUid($uid, $buffer);- t4 O/ s) C3 ~) A# M: _9 t4 m- q
- // 返回推送结果. s+ z' z$ }! d l+ j0 U
- $connection->send($ret ? 'ok' : 'fail');
: U7 j1 ]! _7 T* `) U, x2 _8 Y3 } - };0 Z5 Q( |( q( S! {3 b4 R
- // ## 执行监听 ##5 C" _* _) H6 F' \% t7 Q: r. [
- $inner_text_worker->listen();
) _5 U8 G/ b( u - };
& t- j9 j5 s! H8 g - // 新增加一个属性,用来保存uid到connection的映射$ P, }/ \1 P( o* k+ f' n! O
- $worker->uidConnections = array();2 r7 @1 ]/ ~% H$ x" k$ K
- // 当有客户端发来消息时执行的回调函数: z7 ^2 Y5 Z4 D) a+ B l, H
- $worker->onMessage = function($connection, $data)5 Z: r9 o/ C) ?: y6 Z6 I# i
- {
& o/ P p; x9 {) W* g; h - global $worker;
$ G t9 p) ^" T, c' H k( q - // 判断当前客户端是否已经验证,既是否设置了uid! V* \2 _% N+ D- s
- if(!isset($connection->uid))7 f% s. w# j1 K& O) ~
- {8 {. n' M% o. O) Y
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
! N6 ]# [1 p" W) |' S Q+ a - $connection->uid = $data;
9 _$ I2 M5 a" N, r& F# W - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,& V. w" y m8 w: h
- * 实现针对特定uid推送数据) c6 A0 v% \$ d3 D# m: V* l; l
- */1 J" y3 E1 ~3 p7 H' ~
- $worker->uidConnections[$connection->uid] = $connection;6 I+ k8 A1 Y! t& ?2 w
- return;
- b/ N+ f; c! O - }
: `/ m0 D+ Y& b. C" L - };# s* J7 m! L2 v, u7 @/ l7 N
+ T. J8 ?5 v( A3 N: v9 n; W+ a+ D- // 当有客户端连接断开时
4 S& A$ s5 d+ {0 I' n p - $worker->onClose = function($connection)
' H5 _: d( ~0 D0 V9 l3 X - {
! `$ W8 @6 H D2 s* Y4 ^ - global $worker;4 C2 K0 i( j% o, X) `9 z- E
- if(isset($connection->uid))) S/ v2 W; }. g8 c1 E& j4 p
- {8 S0 x( `/ S' f: a5 U9 i
- // 连接断开时删除映射
& J+ A f( L4 `2 a - unset($worker->uidConnections[$connection->uid]);6 P: X3 E! _' j" R: \' W
- }
+ }7 k' f" {) N" v - };
3 \7 }0 p5 F; N7 S - ! R; D$ W! L8 h7 \& Y% `
- // 向所有验证的用户推送数据
1 }- }2 K, Z3 | - function broadcast($message): `5 J: d" q7 W2 O, ~
- {) y. P0 G$ D& G0 T" j
- global $worker;
. v$ s* O2 o: m* E' \% H" y( e - foreach($worker->uidConnections as $connection) D. _+ _6 |( b. d, }5 P
- {+ Y6 T" B: T5 ]+ z! p% [- w
- $connection->send($message);* i+ k: l* N. [/ h$ M% c
- }
4 ^% e5 P4 t* {5 l& B - }
N }7 R/ C" m$ z. Z - 4 T% V0 {$ |2 x! J
- // 针对uid推送数据$ v7 a( M9 [6 A% t2 [
- function sendMessageByUid($uid, $message)
# X, w2 X7 `, ?; _: J - {! f! R2 ^0 m; s- g1 `* V
- global $worker;0 P6 A) x; H9 Z) ^$ o: X
- if(isset($worker->uidConnections[$uid]))
! `5 _' q5 m5 F( S - {# S: E1 B# t7 [! W7 i
- $connection = $worker->uidConnections[$uid];
8 B# F7 _' T# B! r3 g - $connection->send($message);. i' d! A L3 i* }; j, _( { J( l
- return true;
; Q- C9 s7 ?3 z) G& U# C. t& z - }
; m' P& l( b6 D0 c - return false;
V5 l3 p. ~1 r0 A# R) G - }8 u! Y o* ~$ Y' C' \ l$ Y# {
! z! w# p7 K1 W: ]$ ?3 G% e N4 Y+ u- // 运行所有的worker
4 U- q, G' W& F/ N# ? - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
$ Y6 X% f+ y3 W9 \/ _* d( Q1 p# Z - ws.onopen = function(){, c/ g! H' y; J8 C( K
- var uid = 'uid1';* G3 P2 h( _6 }5 i/ r- `# b
- ws.send(uid);
/ \3 d3 T7 P% o. H# V - };! B6 }. y6 e& N4 }; i5 \
- ws.onmessage = function(e){
l6 d3 [* v/ b - alert(e.data);
) {& S) L9 g" |3 W+ d - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口7 \. t" a3 O# v* ?% _; H
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
5 j9 k. V+ p0 E$ P1 S9 ]7 U - // 推送的数据,包含uid字段,表示是给这个uid推送
. k* C; V1 t1 [. @ - $data = array('uid'=>'uid1', 'percent'=>'88%');0 B8 J+ [! ~% x& p# e
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符( h7 M0 x1 k- ?; ]4 Z z% h$ j
- fwrite($client, json_encode($data)."\n");- R( P( x2 Z+ \6 B& C/ T
- // 读取推送结果
, N8 p6 z1 G6 `+ L) M - echo fread($client, 8192);
复制代码
& Z- M' b2 H/ i+ z4 q9 o; L# A6 g* K3 ^7 ? f3 I
|