cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;9 f/ n+ u6 a% y n- m7 }
- require_once __DIR__ . '/Workerman/Autoloader.php';) s( t9 @ U0 p$ F' Q0 z/ ] c; F* {
$ W" S4 Z" U+ u" v- $worker = new Worker();
9 N: w- Y9 U) L: o - // 4个进程
$ e! _5 h- G1 g, C; u - $worker->count = 4;2 w0 K) O' l% q: A: r) s
- // 每个进程启动后在当前进程新增一个Worker监听1 h3 e/ w+ a9 t) q Y2 }
- $worker->onWorkerStart = function($worker)" u/ [1 Q( f3 u7 p! O: ^ E y9 U4 L
- {
# [/ y7 k. V, m7 O6 V% y - /**% L" v# X/ \/ d$ i
- * 4个进程启动的时候都创建2016端口的Worker
0 i6 l7 ]9 o9 f - * 当执行到worker->listen()时会报Address already in use错误
' w2 {# ?" ?# e) ]- d - * 如果worker->count=1则不会报错
# o2 x3 \3 u; F& @8 z) A2 j$ ^ - */( q, }9 ]" R) y
- $inner_worker = new Worker('http://0.0.0.0:2016');
* S) M8 @0 g3 U; H5 ^2 a - $inner_worker->onMessage = 'on_message';
$ x5 o! s N/ c, @3 |! n0 C% a - // 执行监听。这里会报Address already in use错误7 D% l p" U& e! Y1 O& s7 R( U" w
- $inner_worker->listen();
4 h* {& j5 B% w2 D) Y" g - };
, m7 M3 U4 L- x0 l; H! m7 v
( r S2 D. J& ^6 D) _5 W2 p5 O- $worker->onMessage = 'on_message';
5 R% b2 {2 r% l9 y% e+ ]
' Q7 b: C% Y U. |% i Q- function on_message($connection, $data)
5 f3 ~+ J2 i3 ?& l- F& a$ `5 o - {5 d1 M/ k6 m: S2 A
- $connection->send("hello\n");6 n- l4 L0 d- _- g# Z
- }
/ Z2 z F' E7 Q4 t+ t1 Y0 k
2 B. w8 \4 A" P ?/ J% u. g( I6 v; Z% ~- // 运行worker
^) J! B% S! Z5 R" F: C _4 f0 w - Worker::runAll();+ N% k( k8 W( H X
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
: E6 a2 F, l3 z: i+ S1 M$ M0 a! y0 q
; [4 M. l& g. n- use Workerman\Worker;
; W# J3 M1 d) ?; ~3 ~% C% p2 o' K - require_once './Workerman/Autoloader.php';
3 f: }) I' ~" g$ u: Z" G! R - ) _, z; _% c3 A5 Z, |
- $worker = new Worker('text://0.0.0.0:2015');
! U. m. Z: \; n( c" C7 Z6 R I - // 4个进程6 F$ u8 Y! M* d9 f5 a
- $worker->count = 4;
) d7 a, n, N; w5 _; N0 k- y! ~ - // 每个进程启动后在当前进程新增一个Worker监听" j5 S( N! R1 S: W, Z! A u2 O
- $worker->onWorkerStart = function($worker)
) `2 Y& g3 H4 g7 a/ i8 t9 y - {
" k: Q' E8 m; K, G - $inner_worker = new Worker('http://0.0.0.0:2016');2 I6 r4 Z% u, L$ x2 v6 }0 d
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
1 N5 E, s3 w5 e7 }8 m& p8 Q. C$ s - $inner_worker->reusePort = true;
$ H; R2 J" S" {7 K+ j0 P$ F - $inner_worker->onMessage = 'on_message';
b5 C4 c4 r3 h" o2 m - // 执行监听。正常监听不会报错' b7 k5 a. \! G7 H5 E; b1 R
- $inner_worker->listen();
8 f: p" j, y2 a. J4 Q5 X& a2 f4 ~, x - };0 H- y4 a7 r- x; v! e2 R2 Z1 \
) f* F+ f( O; C& k- y- $worker->onMessage = 'on_message';8 h, ]3 l2 A2 Z
- 8 |. I9 x/ Y2 l7 l
- function on_message($connection, $data)
( x) N- _4 g$ y4 g; i - {: X+ q( a C5 ~) O+ x" W- {5 D: Y: X- h
- $connection->send("hello\n");
! v! Y9 F* V! Q! H t9 x - }
6 A% x: C+ K6 D
2 K5 P4 G# S' @; J) x. X) {- // 运行worker0 V. `0 w: o* }+ O3 y- G7 J
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php1 n; p' l, C: H$ a1 z8 @, q
- use Workerman\Worker;
0 v; `: I% a6 o. ^8 D3 d - require_once './Workerman/Autoloader.php';
/ \& A: p5 T0 b! U) W - // 初始化一个worker容器,监听1234端口. j; V# {1 W( t' o* I, V
- $worker = new Worker('websocket://0.0.0.0:1234'); N4 q) p9 @# X+ m$ p y
- 3 N$ g |* k4 S0 s" O
- /** K$ f: h' A& @' c4 A5 U" M
- * 注意这里进程数必须设置为1,否则会报端口占用错误
& p( m# K/ F# W - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)" B) u% v3 E! @, E) i
- */4 X3 v6 Y- Z! G5 ?/ @( \: i, g
- $worker->count = 1;
7 Q- R5 S/ C5 P R - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
8 g, q) l+ s: ^, r7 m - $worker->onWorkerStart = function($worker)2 G+ u# {2 ^$ @6 \# v' a5 Q/ ~! Y
- { u! K. p' Z$ F s" `3 d' z
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符' c$ H* n: d/ a! ]4 H- n3 u6 m
- $inner_text_worker = new Worker('text://0.0.0.0:5678');9 K4 l8 B8 O" w& u
- $inner_text_worker->onMessage = function($connection, $buffer)! J" o4 N8 T9 P2 T @: E
- {
. Z% z" n" ]3 n O; Y - // $data数组格式,里面有uid,表示向那个uid的页面推送数据7 Z* _9 X' P0 j: v3 k
- $data = json_decode($buffer, true);
5 O# }7 s0 l \" b- N: h7 D% G - $uid = $data['uid'];
& x, u+ ]3 E! i# k" _ - // 通过workerman,向uid的页面推送数据. [, Y6 }5 r* X. K* }4 g3 d: s
- $ret = sendMessageByUid($uid, $buffer);
( ]/ d5 b( N5 H - // 返回推送结果
) x* A# H8 I+ J2 J5 T - $connection->send($ret ? 'ok' : 'fail');
+ u4 _3 q4 e9 r8 I3 i - };4 ?# ~( [ d7 `0 `7 l
- // ## 执行监听 ##+ x/ _3 Q2 Q: \: E9 I
- $inner_text_worker->listen();% b2 [* l% {* T. g" B. k1 X
- };* k \/ L. N. D$ e& K. f
- // 新增加一个属性,用来保存uid到connection的映射/ O2 q2 C' e. J7 M9 q- y8 V
- $worker->uidConnections = array();0 Q' P. T( k! Z% _3 X; q2 b
- // 当有客户端发来消息时执行的回调函数
- u7 x% q2 m" H2 c - $worker->onMessage = function($connection, $data) L# B; A# z* e
- {9 L& b! o* M+ D0 ^# R/ O0 D* ?
- global $worker;( h5 o* O4 U4 B( d" s% u! S) X# F
- // 判断当前客户端是否已经验证,既是否设置了uid% ?9 Y" m4 I4 R3 E/ \) i, {& h
- if(!isset($connection->uid))% y# }8 z5 b0 M" G
- {) t# Y& J2 k2 y1 ?) c# B. f
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
* D8 n1 m7 S8 u- ]6 O/ G" w - $connection->uid = $data;
+ O6 ^. p! e9 S3 T9 E+ r0 { - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,0 k* h% C, ?+ s
- * 实现针对特定uid推送数据6 Q1 N! E) O; l0 D! u
- */
2 ]0 d' s9 u- A" c- I1 p - $worker->uidConnections[$connection->uid] = $connection;
5 ?, P5 u$ S7 m% H; K W2 m$ }2 Q* C# X8 ~ - return;5 l Z# K8 ~+ S$ v+ u: Y6 O3 _& h
- }6 v. E3 L' ^! e, g
- };9 d; o4 o8 [) ~( ]
- 6 D+ i! M3 b! {( H2 ]
- // 当有客户端连接断开时: m5 S/ V' R o' I9 G5 d* ]
- $worker->onClose = function($connection)+ D& G1 k( J: B% f) D# j
- {
# F# b# Z- B4 {- J6 u6 Q4 g - global $worker;
* c7 K9 Z, f: K) [) r( ~ - if(isset($connection->uid)). X! \* v4 h' s: @
- {
+ T% P' C. ?7 o7 k9 F, { - // 连接断开时删除映射: p2 x* m8 R0 ]- @
- unset($worker->uidConnections[$connection->uid]);, U- w( ?: m; y( O2 P
- }; a2 L, S2 G4 H
- };
# g- h( T9 _7 |8 F - 7 @3 J# k& \# s$ n
- // 向所有验证的用户推送数据: t* K* }0 ~3 R% {6 u
- function broadcast($message)
9 @& _ A5 P% S# h* \ - {; `4 t! w6 ~3 Q+ _0 [/ A
- global $worker;' ]- [: _: p& p
- foreach($worker->uidConnections as $connection)" p% C$ i2 [) O$ u
- {
8 [* @: P% Q* z. g - $connection->send($message);" S p o! \5 e
- }2 K' Z: c. J5 J$ t
- }
" P3 v2 l }( I, `% T- u - & S) k# M# E; f7 J
- // 针对uid推送数据6 t7 W3 `5 t1 A. R
- function sendMessageByUid($uid, $message)
9 h. W$ l* A% D2 U0 o - {6 b- N5 B: v9 F; S' l) f% }+ ?
- global $worker;
1 H/ l: g* p3 m/ ] - if(isset($worker->uidConnections[$uid]))
4 q# t5 B4 Z) k7 ] j6 L - {/ m M; c. @, e% r8 S- E8 y
- $connection = $worker->uidConnections[$uid];- e* L* K+ ~! G$ E3 L1 ^3 {- v
- $connection->send($message);6 \6 `8 y4 b2 K$ F% n
- return true;& r1 o% ^- I' I$ G \
- }
8 y( G# g4 W7 x8 G8 W ?( U: w* Z - return false;
0 j9 O7 ~. E4 M1 r+ y - }
6 {% r* W' ]8 C
# U. L: q6 e; H5 k, T- // 运行所有的worker
( g% b9 U* [# `9 N - Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');
L$ d' t0 _; J - ws.onopen = function(){
, z8 S5 d' Z/ X* T - var uid = 'uid1';
: n+ t, f7 U" x. C1 y- R4 R - ws.send(uid);
& q. r' c( n( ~. a% i - };
2 m9 q( B' J% [! s* B, U( _ - ws.onmessage = function(e){3 h0 ?' z/ A0 J8 m
- alert(e.data);5 x9 f! ~- J' q. g0 g7 `( I
- };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口
t2 v5 i& V) M+ s. H! _ G - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);7 I2 s! K" S2 K! A7 [) X
- // 推送的数据,包含uid字段,表示是给这个uid推送$ N) z/ Z# g9 T9 D4 ?& K* I7 J m0 K
- $data = array('uid'=>'uid1', 'percent'=>'88%');8 ]. O) i" X2 x+ z7 z
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 d+ J. {7 G2 ]' b: m
- fwrite($client, json_encode($data)."\n");
- n9 R* l# q4 t. J4 ` - // 读取推送结果. m' L- V4 B, ?' s
- echo fread($client, 8192);
复制代码
1 o! D9 c0 P" r% Y
9 V: t7 H% t8 B* A% [
| 欢迎光临 cncml手绘网 (http://www.cncml.com/) |
Powered by Discuz! X3.2 |