cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;% q8 K/ ~, U( M& A; ]$ j) n
- require_once __DIR__ . '/Workerman/Autoloader.php';* y9 H, J8 n2 V* R' W7 v5 X6 U$ f
- * G5 v4 w; e6 j# O' U8 V) ]
- $worker = new Worker();- |8 G' S* l7 ]4 b( c$ M% p* Z, I
- // 4个进程
& S- u3 x+ w- w& B4 ?* m - $worker->count = 4;2 n2 y5 S4 _ N" f( r
- // 每个进程启动后在当前进程新增一个Worker监听4 A" N5 b* ]( e3 t+ [4 f8 p& K
- $worker->onWorkerStart = function($worker)
. D4 n' c, H6 E ?2 } - {9 q2 e$ ]2 P# M a
- /**: {( z) {) r: ]" a) `0 i0 {/ E$ Z
- * 4个进程启动的时候都创建2016端口的Worker
. X5 c) F/ g5 L! s - * 当执行到worker->listen()时会报Address already in use错误
. _1 d2 l' x* g8 L Y" \3 T7 F - * 如果worker->count=1则不会报错
+ L! W) ^4 n9 V$ I2 y9 O8 x - */
. J+ U% t. n! t+ h - $inner_worker = new Worker('http://0.0.0.0:2016');
5 {# r9 `4 Q3 j t# W H0 o8 ` - $inner_worker->onMessage = 'on_message';' {) @6 Y& O( @( R' r5 r
- // 执行监听。这里会报Address already in use错误. \$ h1 S Z6 j) \. _4 ~4 S+ E
- $inner_worker->listen();
8 d( e/ ^ d9 D: K. H w) } - };8 U5 d# T( q: u5 \3 m0 Z. n1 H
( N& s5 E1 [. U: y- $worker->onMessage = 'on_message';
7 t+ v/ @8 i9 O# ?+ H9 [, Z# G - : x' [ [1 j% m1 X, D6 [
- function on_message($connection, $data)
. t/ o# g0 `1 {3 Q - {
) B" [, C- l6 u9 g+ { O1 ? - $connection->send("hello\n");
- Y5 I4 m( S) z - }, K* V: Y2 }& |% q
9 }0 n) B l4 E0 w4 r- // 运行worker& B8 L; B+ O' m
- Worker::runAll();" j2 L" p7 y7 d
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
8 j& B, n9 c$ H' T4 i - & A$ M$ Y+ M+ ]
- use Workerman\Worker;, F3 R5 L* P% B' {- F$ P. N
- require_once './Workerman/Autoloader.php';
, E v, u4 J6 A# s! c
7 x2 O8 {6 a+ `# r* k0 O- $worker = new Worker('text://0.0.0.0:2015');$ p6 q! e% H7 O) c
- // 4个进程
$ Q. u3 q t: w - $worker->count = 4;
; v' O- h+ ?3 z- M$ V - // 每个进程启动后在当前进程新增一个Worker监听
+ x' E8 S# [, W - $worker->onWorkerStart = function($worker)5 Y; m9 c1 w0 ?9 _# v }
- {
* J: s# `/ y4 u3 G - $inner_worker = new Worker('http://0.0.0.0:2016');
# `: A5 x7 W8 x - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
3 A: q; H8 T4 b; v. f3 ?+ f - $inner_worker->reusePort = true;
5 ^) m$ y( Y5 Y- l' F - $inner_worker->onMessage = 'on_message';( W: u7 X( m! p. L
- // 执行监听。正常监听不会报错* |/ o4 m, r! Y( L
- $inner_worker->listen();
' N7 J/ d8 Z; [& L; ~* q, \( a - };
* R# O3 u' @& D- s' w
7 I' w7 f: u4 O- $worker->onMessage = 'on_message';8 M* ^; t: E5 P
- $ z P$ z4 S/ D* O" @( ^
- function on_message($connection, $data)
3 }0 O, D$ D3 }' v - {6 Q: `& c! C/ i, o& L
- $connection->send("hello\n");4 p( d9 y# D; }+ a: e9 M8 E
- }
* d) I1 G9 p7 B$ f
' Y' }; S# g. K! z- // 运行worker
% A% @ T- M E: s( Y# n& r3 h - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php
/ \. t% K# G8 `6 B8 q - use Workerman\Worker;% C8 ~! P: Y% Y% n4 F$ i
- require_once './Workerman/Autoloader.php';
6 O5 n8 W. U# Y( B9 i6 X$ E( k# A - // 初始化一个worker容器,监听1234端口1 |, w* T! f" a2 } Z0 |
- $worker = new Worker('websocket://0.0.0.0:1234');* x/ v' |9 [, I" I0 i
- 5 c) U0 L$ C" j5 S& [ f0 O- ~7 D: Y
- /*
. y, L7 r8 }0 y$ D J Z# _ - * 注意这里进程数必须设置为1,否则会报端口占用错误
1 ~6 Q; I! e; C$ G - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
* D% [; p- B" l. o$ t) T& v - */
$ ]3 R; h0 l4 T- ] - $worker->count = 1;" j! ?4 q, i* Y
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
9 a; v: `5 T! \/ h) w/ w$ Y1 x - $worker->onWorkerStart = function($worker)7 _- y% w% e$ E3 f! \: V. D
- {
1 q6 b6 Z; f# X& V) I$ _ - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
9 O# Q* e: X, S+ O V3 Q - $inner_text_worker = new Worker('text://0.0.0.0:5678');: V& p# c4 y+ e/ B+ B; f/ I
- $inner_text_worker->onMessage = function($connection, $buffer)
2 j- D' z' [8 z0 `! e - {
, ~" n [) U' H- J+ L1 F- \ - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
, Y. Z& x! c, s& [2 b& j - $data = json_decode($buffer, true);
4 d8 e- ?% A( j/ w0 Z+ V* [ - $uid = $data['uid'];0 n4 Z7 ]' J( J- R1 @
- // 通过workerman,向uid的页面推送数据/ T1 G6 K" W2 A9 k6 D% ~ {2 _1 g5 B
- $ret = sendMessageByUid($uid, $buffer);; q; u! u4 j h( z$ Z
- // 返回推送结果: _" @6 n# K) b* d1 ~
- $connection->send($ret ? 'ok' : 'fail');# I& ?" N8 i0 K; X
- };
4 `4 m2 L0 u# H. h, f - // ## 执行监听 ##
% I/ k6 U+ N; k/ G - $inner_text_worker->listen();
7 S) x% |: w( Y" X5 Q* g - };2 t! W4 Q% k5 o$ E; _9 ]3 n
- // 新增加一个属性,用来保存uid到connection的映射4 ~2 b# h" d+ C3 q+ \
- $worker->uidConnections = array();
& T0 \2 f0 }6 T: A% x r - // 当有客户端发来消息时执行的回调函数
) L. B U; O# j - $worker->onMessage = function($connection, $data)' w# n7 L1 B- D) r: A
- {
# Z: _9 H. V6 E, @! j - global $worker;# p2 I( C0 ~( U# F% |" O
- // 判断当前客户端是否已经验证,既是否设置了uid3 [3 V7 e% ]1 s6 _
- if(!isset($connection->uid))5 c. c; [; i. G
- {
F; h& f0 l$ |& S - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证). X& E2 s7 U* y
- $connection->uid = $data;) a% F9 G* ?' [/ ~! D# x
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
g. J1 }* R' Z# s+ V9 n8 ~0 l - * 实现针对特定uid推送数据, s/ o: |$ v6 q6 B4 I1 }
- */6 r* q/ h, W$ W9 j1 @- e5 s
- $worker->uidConnections[$connection->uid] = $connection;, n. w: h, I+ |( @4 F0 N$ ]2 p
- return;
( X- O6 c% k4 `1 [, |, K - }: i' R1 ?5 S. Z0 [7 w+ n7 U
- };
w( u' j: R8 T4 v0 Z* F
k, y7 Q6 D' m! Q- // 当有客户端连接断开时
) @- i% Z4 g+ C4 l% w* l" v - $worker->onClose = function($connection)
- Q4 ?6 |! L' b2 [7 Z+ ]( ~( ]. p0 \ - {
' U& O$ S% u9 x( d# S4 d - global $worker;0 F$ ~4 N6 e( D3 `
- if(isset($connection->uid))
2 P) S1 U7 N$ ]7 |' [8 k N - {
/ d. d6 A! K8 T! ^( T5 }5 `" {6 o - // 连接断开时删除映射
, x& x4 w- B0 @( I* j+ F - unset($worker->uidConnections[$connection->uid]);
) B' l. _. ~7 Q' R$ d - }) A2 |- A9 ^. b* W, e# w
- };( O" r+ v g/ l. ^2 C6 j' Y
- 1 }% C8 v8 o- c
- // 向所有验证的用户推送数据
/ E( Y& F: ?* g9 r - function broadcast($message)
( u' Z9 M1 s c5 I0 S8 H6 [* S - {# B& m* b$ E+ j5 v* B T7 @* z8 b
- global $worker;1 _, q# c( u* c/ S: R( P# }
- foreach($worker->uidConnections as $connection)3 P/ u( \: @; d
- {" F# ~3 {/ ~( m! c. z) W) \
- $connection->send($message);$ X/ F9 J4 F% J! ?1 V. K6 K& D
- }
1 y2 l: b3 Z) f2 c - }- X* g4 l: `6 g
- 7 w( ]8 w! P5 ]
- // 针对uid推送数据
1 l6 m( H7 b! { - function sendMessageByUid($uid, $message)# g& E$ j* J4 r6 L
- {
- Z" C( n; w K; E4 A - global $worker;
4 j! C; v+ [6 A+ X% ] - if(isset($worker->uidConnections[$uid]))# Z, F" c2 [( D0 @( w8 F( ?
- {5 B. q S0 {! x3 u3 _" V: V9 H. h
- $connection = $worker->uidConnections[$uid];
* X, b8 E* k4 C9 l, y( d - $connection->send($message);" R- P: {$ r% a7 N9 M3 u3 G
- return true;
! g. M4 O }% u+ W0 y - }5 C. c: f F9 Y; x
- return false;
5 t2 D, A: T) V - }% r6 b6 M8 Q+ }0 C
- 1 W* }8 q# ~5 q) p: M; @
- // 运行所有的worker
, `' k- L. W2 b& P5 N - Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');
( X/ A I4 P( O3 x5 q1 m - ws.onopen = function(){
( q2 g; s. [( P2 _ - var uid = 'uid1';
0 o# _4 m$ u$ r& W0 M& f) k - ws.send(uid);
$ A5 t& W( L6 d0 ^6 i% g - };. m" {; r+ N; | a5 I( [9 @
- ws.onmessage = function(e){
( y, q6 J! ?! j1 L! w - alert(e.data);" k+ Q# o' j' y. ?, W
- };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口( Q+ W- B" [% ?/ O- L
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
$ _ J" H$ s/ F! K# ] - // 推送的数据,包含uid字段,表示是给这个uid推送
7 M& f) ~3 Y: b: R; G3 F - $data = array('uid'=>'uid1', 'percent'=>'88%');2 L4 B: o; D% `4 _$ `" e
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% H+ M( f( j2 c
- fwrite($client, json_encode($data)."\n");
" B& I5 A8 B" q1 p - // 读取推送结果
4 o9 _/ b i" {" I3 r* \, F: G - echo fread($client, 8192);
复制代码
1 w1 @- i2 w( P% y
( N5 }/ Q6 b; {1 t: x
| 欢迎光临 cncml手绘网 (http://www.cncml.com/) |
Powered by Discuz! X3.2 |