- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
" ^# i0 D9 |9 a+ j - require_once __DIR__ . '/Workerman/Autoloader.php';3 z M+ W: p) H' A7 z$ U; I
- 5 C c$ J& x+ j; k( H" t
- $worker = new Worker();. R6 u4 a" t: ]& p* j7 L& ^# E
- // 4个进程
4 A! |& R% v! q5 i$ L/ T - $worker->count = 4;
- D7 l/ l2 M7 |+ z% c - // 每个进程启动后在当前进程新增一个Worker监听
1 [$ l6 ?, E2 }1 O; f3 B - $worker->onWorkerStart = function($worker)
; O1 X% u; C5 R - {9 W- h8 x7 b9 j5 R, h0 y" b
- /**& }, h, Q" C* a9 n
- * 4个进程启动的时候都创建2016端口的Worker
, Y2 s+ |, K* | - * 当执行到worker->listen()时会报Address already in use错误
. m4 e$ ~. X5 S* X - * 如果worker->count=1则不会报错
0 d: M8 Y. S* M# ~" p1 ~ - */) X) X' `7 Z+ n3 F9 e! J) v
- $inner_worker = new Worker('http://0.0.0.0:2016');7 M% r8 k$ J4 t7 J4 m
- $inner_worker->onMessage = 'on_message';
: f+ c* G2 l' Q- B: q ] - // 执行监听。这里会报Address already in use错误
: V* _7 f! k4 i( ]8 g* S6 ^0 ]0 D - $inner_worker->listen();- V. \! S$ j! V/ u. F) x( ]1 k% |4 w
- };
6 j! C& E) s! m; t$ U% Y- l
7 e" j& b, ^2 K- q+ a# A/ L- $worker->onMessage = 'on_message';
* x2 ?* H2 ~- u4 g
% l# a& K; e9 i6 m- function on_message($connection, $data)
8 ~( ]" D3 L6 P% ~: d# { - {% T a3 u5 Z- I/ ]1 [) G
- $connection->send("hello\n");9 z$ R( N: e, t% f. s% D7 v
- }
' r3 L0 V% m1 n+ ^, V
1 S7 e8 E! ~# l- // 运行worker+ _) P: f$ e& J7 \
- Worker::runAll();
, C# H- n; {- E9 Z. ^! G9 w - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 ?3 K5 t; X2 E% b
- ' h# v1 l1 S/ H) n9 s4 U
- use Workerman\Worker;
0 r: _4 Q2 x3 N0 @4 B- z. j - require_once './Workerman/Autoloader.php';# R+ A7 d0 d4 }' i
- ' b- z2 |1 f+ D; M: ~/ h
- $worker = new Worker('text://0.0.0.0:2015');
* T' V3 S8 Y! N - // 4个进程
, w* } |0 J$ ~: P+ j- i% t - $worker->count = 4;
8 l' U; u; n3 E) S* p/ Q - // 每个进程启动后在当前进程新增一个Worker监听
! D% o8 C0 w0 m* l* k8 n. ` - $worker->onWorkerStart = function($worker)4 N3 H* l: |% X5 Y4 \5 I: V0 v
- {
3 b- J7 o# s& z( \ - $inner_worker = new Worker('http://0.0.0.0:2016');
: v' u6 Q5 j6 u+ n& ~0 J - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
$ G% M- X( m* g) f7 _0 R7 z - $inner_worker->reusePort = true;7 p' H, P9 ^# e" s5 Y# ]3 J! D
- $inner_worker->onMessage = 'on_message';
5 v. d7 B/ ]& Y' S/ W - // 执行监听。正常监听不会报错; v( J4 J1 ~/ y1 M
- $inner_worker->listen();
- t# c( n# C6 M; l' ?) O6 \, I! X - };* H; ?3 O3 J( j S" M
: O e0 n2 ?- b( I3 u' V- $worker->onMessage = 'on_message';
$ X( Y9 L0 Y. C: r
s# E; d0 ?& f* P0 N! V [- function on_message($connection, $data)
' e1 m) u9 z1 B$ P* p4 A4 r - {& Z! u# K0 R5 G$ g4 [! \4 N) O
- $connection->send("hello\n");
`: v$ H9 F8 D5 R5 T - }' Z+ T8 q% B% J$ {# u
- # b" F, x" S: q4 O
- // 运行worker l$ B$ `3 U. R7 a/ x
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
: C8 A' B" S- @; p+ t: F7 u3 X - use Workerman\Worker;
: l r+ E* Q ~ P0 ~6 l - require_once './Workerman/Autoloader.php';% i8 ]+ l8 o1 c# L4 O
- // 初始化一个worker容器,监听1234端口4 a+ D# R+ _" b \% d2 W
- $worker = new Worker('websocket://0.0.0.0:1234');
! L; q- r* m! K- u1 V
; w2 c, M8 L/ Q5 @0 c- /*0 k* r7 w+ z. p2 [# l$ q6 s2 e
- * 注意这里进程数必须设置为1,否则会报端口占用错误* U# O" P# U7 |. X
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
+ u8 e, }1 }* p$ ] m - */; i- K! |. l8 z" Y/ a8 D# M
- $worker->count = 1;" t& L' O& d9 D2 c3 p: ]
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
W s5 _; Q$ d: v5 M2 x5 w n - $worker->onWorkerStart = function($worker)
0 B( a% X* e: T" u, u+ [ - {6 |" ~! j/ M8 z8 e; R" U
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符; A5 E ]! D& G' W
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
! A& S. G+ C2 ]' n) k: a - $inner_text_worker->onMessage = function($connection, $buffer)
8 y: J! @4 V; S: a; P - {
$ V0 E' L6 B# ~ - // $data数组格式,里面有uid,表示向那个uid的页面推送数据- S' k$ n! ~8 ~! ^, G
- $data = json_decode($buffer, true);, b0 I4 R' \- q" {8 O. B/ R" t6 W
- $uid = $data['uid']; E$ _4 t u; O0 b( x
- // 通过workerman,向uid的页面推送数据
5 t$ i8 ~6 X1 Z5 n5 c* g, n3 m8 s# w - $ret = sendMessageByUid($uid, $buffer);
' u# {( A0 S$ }& Z$ ?: S - // 返回推送结果8 |4 @9 t) k1 a& P1 V
- $connection->send($ret ? 'ok' : 'fail');8 e8 Y: M/ h, f! F6 `8 ^, q
- };
/ r [/ o, K9 D3 L# r' ]2 y - // ## 执行监听 ##
0 F, J) M' o& ]5 ?. f - $inner_text_worker->listen();
% A" B( m5 S6 ]6 l+ v! |% N - };; u" f" b: G1 r
- // 新增加一个属性,用来保存uid到connection的映射
" E( {5 X) s b ] - $worker->uidConnections = array();8 L& B8 ~1 h" X5 ^5 e9 [( |
- // 当有客户端发来消息时执行的回调函数. F3 U C. M2 }0 q3 Y7 H, b
- $worker->onMessage = function($connection, $data)0 Y6 ]; e! r9 Y5 S
- {
2 W7 W& c& P6 g4 g5 E - global $worker;( x1 Q0 i7 ~; ?, y9 f) T/ @9 r z
- // 判断当前客户端是否已经验证,既是否设置了uid+ n3 |2 ^: f4 _6 e5 S, [
- if(!isset($connection->uid))5 \! t- [- Q( X
- {
- n/ A0 i+ C% O# O - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 ^% v9 z. |7 r6 @
- $connection->uid = $data;! l" E: m) D% P5 ~
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
) ^) K9 `/ m, }, P: v& J; L/ ^1 u - * 实现针对特定uid推送数据
4 e Q1 I! u7 k" ?6 ^ - */
5 a0 E5 F7 B3 N; p! v# j - $worker->uidConnections[$connection->uid] = $connection;
1 l6 L6 z- e ~" ` - return;) V! D* b. Y5 j9 g" K" n
- }
0 w$ B3 H; i! e( L1 j - };
( d: L- ]: a, o/ n5 c% M9 T7 n8 Y; \
0 Y4 m5 ^+ x- t( d$ o1 k2 ?- // 当有客户端连接断开时* J7 t- k% F' o5 u
- $worker->onClose = function($connection)
8 p" { n; T8 r. T/ Z' M - {' }" }/ ~$ r! H+ q+ u8 N) d8 c$ {
- global $worker; `% q( m4 Y) u t7 b( A
- if(isset($connection->uid))
o* o8 y0 x# P3 w - {
% W4 |' n% i @. m& x A - // 连接断开时删除映射4 n. J+ m: X( u8 y
- unset($worker->uidConnections[$connection->uid]);* M6 ]5 z$ x" } \
- }1 [6 l' ?' L! B2 N
- };
( P8 U9 B$ d) R. b - ( z. o, z3 S; |& ?. j# @+ ]' d6 Y
- // 向所有验证的用户推送数据
+ c* Y; T+ e3 M3 f+ V; \ - function broadcast($message)! F0 {3 A" C8 j P; b! c! `3 g
- {2 T5 Y' \/ ^* c9 H, m7 B& n" \. P8 K
- global $worker;. |+ v$ N0 V B9 w# l) v1 {% O
- foreach($worker->uidConnections as $connection)' p4 w; Q! F* u% ]7 j
- {
1 h0 r* d/ [9 ^ - $connection->send($message);8 r5 i- _+ L+ F. M$ O
- }0 o3 o: @5 o8 c" R& ~
- }
; J& q. D) M& A - - g: v3 T0 p1 u! u% j
- // 针对uid推送数据: C; x0 x, W. W$ k, G
- function sendMessageByUid($uid, $message)
2 C: Y4 N5 @# A1 x- b- r - {
* m; T( U# i& r& ~9 ]" b8 {& X - global $worker;
: {, x! n$ m1 r/ b - if(isset($worker->uidConnections[$uid])). a& J3 S8 U' g% T+ b' i/ R
- {
3 V F; S) h3 d5 j) Y - $connection = $worker->uidConnections[$uid];% E: C& H- `) Q8 A& [0 C
- $connection->send($message);& W& F H' u" w M+ l7 @
- return true;
K# ^5 `" ?8 l# X - }
* g4 J. k+ {* @6 }. R! v5 O0 P - return false;
5 k, }) I* V0 E1 U - }
# `6 U" G: N6 j- { - 1 B* o) o0 T9 B$ Y& T4 s9 ^9 q% i
- // 运行所有的worker6 I4 w8 `7 D' N9 P. t* [
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');# M6 n2 b# k8 ?, p2 P6 ]* k) c
- ws.onopen = function(){
$ j4 u7 w4 q/ |: D6 ]! A- z - var uid = 'uid1';
7 D$ F2 t' ^) r5 b - ws.send(uid);6 _, c* o/ _5 N2 P
- };( c; A0 H8 C, G8 G6 N
- ws.onmessage = function(e){
" J5 o+ ?8 N. {7 e. b1 G3 `0 w& [8 k - alert(e.data);( x, v4 i) D0 W# w- J- ?9 [
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口3 s6 r& ^0 q" H& \8 D# C
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1); b/ \/ o4 l# }* L
- // 推送的数据,包含uid字段,表示是给这个uid推送4 o3 h% a1 S, x; c; Y! ?. s
- $data = array('uid'=>'uid1', 'percent'=>'88%');9 p5 ]! g. j) z/ Q5 p! o! q6 j
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符- ` j$ F5 _6 _. q+ K
- fwrite($client, json_encode($data)."\n");% n! K* q! A. M$ X
- // 读取推送结果
' ~" @8 I F. a; h - echo fread($client, 8192);
复制代码
, i9 f6 I( d1 v; ~
7 H8 H# o9 b0 |* H3 ] |