cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;
Q/ b( o# D1 v) t5 ~1 w% F - require_once __DIR__ . '/Workerman/Autoloader.php';
6 O% V3 n7 Y0 H. a, E% w V0 M
8 @7 {3 T1 T" q8 B) X- $worker = new Worker();; D0 `6 c' Q; w% L% b
- // 4个进程
; K' o9 ~5 w. A) F - $worker->count = 4;
- g7 o$ X7 o- | - // 每个进程启动后在当前进程新增一个Worker监听
- M8 ^9 |" W1 g) F# _ - $worker->onWorkerStart = function($worker)
' E" y! ?6 _/ J" _ - {
$ S! m" f3 W' F: N. L, J - /**/ B. E7 N7 A: z& |. a N
- * 4个进程启动的时候都创建2016端口的Worker% G! ~9 X, v) w
- * 当执行到worker->listen()时会报Address already in use错误
& U4 @ C% |3 g( ?1 P$ @ - * 如果worker->count=1则不会报错2 C4 u7 B3 e$ T6 ]$ f, F8 u
- */+ w5 B7 m4 E. Z; @+ F S3 a
- $inner_worker = new Worker('http://0.0.0.0:2016');
% h& ^, `+ O3 M) F" m - $inner_worker->onMessage = 'on_message';
$ L! _) i( @/ M - // 执行监听。这里会报Address already in use错误 T0 a% b/ D# g5 B
- $inner_worker->listen();: m, C* Z- v& ~. e2 }( j. [
- };/ O/ E. h; ]3 n9 b
- 6 p1 e- i, w4 l; J- z6 B+ n/ }
- $worker->onMessage = 'on_message';
V3 K$ u; R4 z, y4 W+ l( c4 V. n
& M2 @) @4 G X0 C- function on_message($connection, $data) l6 X6 M: N2 b, Q4 S# j3 r" q' j |
- {
% Y* w, ^, D8 b - $connection->send("hello\n");, c$ N0 v% V6 X7 I6 E
- }. z1 K$ Y3 ^! F+ Y) j( W: E
- ; j7 [' i' O+ M% j
- // 运行worker
$ I% j. ]9 [* Q, x - Worker::runAll();
7 v2 A' R# C4 d" Q - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:2 o! r# d# d( ^
- 1 a/ {; S) I V. U
- use Workerman\Worker;
. {$ p3 @( A4 |, E$ k; k9 q2 h3 ^ - require_once './Workerman/Autoloader.php';
* u) A! U/ ^! \% I6 S, v
' t, ~0 E4 P8 K% z( G6 Y5 f- $worker = new Worker('text://0.0.0.0:2015');
! G" o, h. l8 u& T2 I7 U& {& K" B - // 4个进程2 n* D- L) x" m9 Q
- $worker->count = 4; b! m8 O; }" B# ?
- // 每个进程启动后在当前进程新增一个Worker监听
6 S. \7 |6 a- J$ p2 K8 | - $worker->onWorkerStart = function($worker)5 @. w4 V& I4 p% u j, K
- {' d" d2 \) @* F& P. M" G5 t
- $inner_worker = new Worker('http://0.0.0.0:2016');
1 p4 j* v/ A) ^" }7 q - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
) n- X( P# G# L% b+ P8 G8 I3 | - $inner_worker->reusePort = true;
. ^( e5 Y! U2 ^; }# S - $inner_worker->onMessage = 'on_message';
; b0 M: X' n, [6 |4 a - // 执行监听。正常监听不会报错
3 y" W2 O4 z/ j) ^. S. V2 j- o - $inner_worker->listen();
9 Z" a8 d4 {) F5 z - };
% B$ ?' q, z2 s' w' }( g - 2 ]: ^# L" j7 I0 t. j- D0 g# A
- $worker->onMessage = 'on_message';& B' J# L$ x6 [+ w @
- 9 A; j$ W1 T) M7 s4 l" G
- function on_message($connection, $data)6 U( w. }, @% K1 c3 r# D
- {; [2 V0 X' U7 T5 X+ c
- $connection->send("hello\n");
! T% ~2 ]9 y$ A, N' S8 |, m% Y* m - }4 e3 M: w- c G* O. U r
B: l# n( T1 N+ n- m( q ^$ M- // 运行worker
9 ^+ [/ ?- s _& L. ?& u - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php0 C9 F! ]+ O' W) ]' s0 \: ?
- use Workerman\Worker;
+ J5 Y: V9 Q1 w# ^. m, n+ j" C; s - require_once './Workerman/Autoloader.php';
" W+ Y, I1 _1 y/ n9 s* s - // 初始化一个worker容器,监听1234端口
( Q2 d. C2 Q3 p2 P - $worker = new Worker('websocket://0.0.0.0:1234');
9 X( C, T+ U1 z" N' m) p$ i - / x6 [; }* j* @, Q: l
- /*# @1 G1 C s& ~2 U4 V
- * 注意这里进程数必须设置为1,否则会报端口占用错误
* p5 W: a! x9 F6 a4 U$ ?* z+ D - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
" q# l1 u2 F ? - */
4 ~+ N& E- |% x0 [/ S( X - $worker->count = 1;
/ z# l* p& ?$ ^2 l: B- P - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, g- H$ Y2 F) E! N' p- t2 {0 T/ L* ]
- $worker->onWorkerStart = function($worker), ?" l' c6 I( j5 s9 S3 @* L5 A
- {6 z& p" o$ T8 g/ Y
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
9 G5 j: W$ }' ~0 g5 F6 o+ P/ ` - $inner_text_worker = new Worker('text://0.0.0.0:5678');
4 z3 @; g" k- p& L5 J2 d3 w - $inner_text_worker->onMessage = function($connection, $buffer)
2 W5 F0 C2 Y0 j& g% ~9 W4 a - {$ g, X# v4 ?( i, J5 ]3 x3 P4 Z8 ^
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据
8 ~% e' M9 P( J a9 o - $data = json_decode($buffer, true);
( e: B1 Z' V- ]5 r+ I& _2 L6 s - $uid = $data['uid'];/ {7 G/ n3 Z1 w
- // 通过workerman,向uid的页面推送数据
# D* a- c- j8 b7 _# \ - $ret = sendMessageByUid($uid, $buffer);4 X# S. U' F# |3 v/ M+ T( x
- // 返回推送结果
' ]$ z$ |' ], m9 t: Y- {# Q K - $connection->send($ret ? 'ok' : 'fail');2 h* P$ Y; n5 m/ b# f1 y
- };5 m& u, U6 a# m/ l) ^0 k
- // ## 执行监听 ##
7 R- c, Y) {" F2 W3 V5 l9 \; W( \ - $inner_text_worker->listen();
* ]/ I( i% U( T" t - };
6 g' V' g1 Y; o, F, K- L: w( i - // 新增加一个属性,用来保存uid到connection的映射
. a4 H! k: j F0 {. | - $worker->uidConnections = array();
7 H' f1 ^! X( S1 Z, ^ - // 当有客户端发来消息时执行的回调函数, N3 h( h$ x- @' G3 Y9 i$ I. m5 l' c
- $worker->onMessage = function($connection, $data), C% U. e- ~. ?7 L% j U
- {
$ p4 i5 ~- Z6 W" x0 I - global $worker;
) |' X0 o) j( i/ A' z @6 |+ ? T3 U - // 判断当前客户端是否已经验证,既是否设置了uid/ b' j* `6 D: [" X2 S
- if(!isset($connection->uid))# A3 Q }0 J0 z
- {
( X" {/ _# p2 X6 E; J V/ h5 X1 s4 z - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
# E h( y: R' V5 Y8 W: S. v - $connection->uid = $data;+ N. H/ ^3 D$ h- j0 R& c; A5 I- e
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
4 d: V) _! u4 v - * 实现针对特定uid推送数据0 s( y' O5 A3 i9 v: k0 t
- */
6 B& K, O6 E& Y) e' Y7 V% e4 {) }9 ? - $worker->uidConnections[$connection->uid] = $connection;
0 v: t/ P) n2 c- B8 m: E - return; f& m, A0 U+ V, Z- I& W3 g, n
- }
9 y1 F5 N0 J5 B& C* R) H+ o - };
7 y, c* H* ]4 k3 R q- ~- |, U9 J) A/ x - " Y1 R* v! r* ~* Z7 y( c1 j# }
- // 当有客户端连接断开时& G9 L+ `9 n+ a: ^8 v. a
- $worker->onClose = function($connection)
; m$ o( k" v0 u - {" `, _9 O0 n# x. t0 m/ \% j7 w2 u
- global $worker;
$ v% Z% Y# T4 Q - if(isset($connection->uid))
; m! }* P% u+ d& ?0 Z$ i - {
6 F2 C' [4 S% w) F: d4 [& q - // 连接断开时删除映射# w% G8 s( j1 f+ ]/ C
- unset($worker->uidConnections[$connection->uid]);
( A( Z# z2 `& ] p, G - }
' }- l% J) a/ }, a* z - };
& i7 Y8 K5 @2 m( N! G* H
; g/ k m2 G0 N3 v' T) ~- // 向所有验证的用户推送数据
: G6 {7 y+ _2 A- v - function broadcast($message)
8 U/ z7 |1 D% k1 l# G - {! C/ C! n$ f$ l- g
- global $worker;. J% l3 s1 R, a2 L# e! B/ m
- foreach($worker->uidConnections as $connection)
$ [; R0 N# r; s; g - {
+ e3 o2 S7 r C - $connection->send($message);
6 O+ \! D3 D5 ~3 M4 L, H - }
: ~# [/ G8 l" t - }
N* O0 l& I3 t' K
8 `/ n, y& ] j7 ^9 B# Z5 z; z- // 针对uid推送数据: Y4 m( V4 ^: ~* @ a
- function sendMessageByUid($uid, $message)
$ P# V) W. V7 e7 W6 ? - {
- f, L! Z" Z& U8 X' ] - global $worker;
$ a, d2 l# n$ H/ ]7 \# l - if(isset($worker->uidConnections[$uid]))
" P" `, {& W0 N o - {% a- l9 Z! g" V) _/ H
- $connection = $worker->uidConnections[$uid];
. Y% L. N9 Q! j - $connection->send($message);7 ~' f' t; d. D" n- ?# U/ T0 L
- return true;
5 ]* F* j/ n$ ? ] - }
2 X: a# u# \. j0 m7 r* J: m8 c - return false;
# u* `4 F) t/ V* n) y( M - }
0 v; |0 Z* U0 h' }) g
5 m, ^/ @2 C! x5 D- // 运行所有的worker' x6 `- d5 _5 T: x' Y
- Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');6 Y3 t- l% I Y$ z
- ws.onopen = function(){
5 y1 _' e0 g/ G6 W1 r3 D - var uid = 'uid1';
$ i% z( j h2 {( Q& b# _ - ws.send(uid);& X! Z0 `3 F5 R6 Y
- };9 @( [1 B9 B& {% x2 H; |6 g
- ws.onmessage = function(e){1 x0 L6 m+ Y. K5 M
- alert(e.data);1 ]( p% r8 _* A* B
- };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口$ s3 K% y( \6 Z1 A' ]' L
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
" i0 o5 c+ e1 O& l0 {" J - // 推送的数据,包含uid字段,表示是给这个uid推送
/ p* |3 ~" @0 H4 ` - $data = array('uid'=>'uid1', 'percent'=>'88%');
; P8 U# A7 U+ A$ M6 s - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
7 ?- v5 j4 a9 i- K3 M - fwrite($client, json_encode($data)."\n");! q5 y6 V. D* C
- // 读取推送结果9 ]$ |; U8 f: ?' a
- echo fread($client, 8192);
复制代码
: E. g3 ]4 f0 C7 z# A0 c- m9 P$ Z; v
欢迎光临 cncml手绘网 (http://www.cncml.com/) |
Powered by Discuz! X3.2 |