- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;2 i: u0 n) P. d( Y8 P1 k! W
- require_once __DIR__ . '/Workerman/Autoloader.php';
) V6 K9 Z# Z, r, G9 \ ~$ O0 u6 \4 R
8 I3 F4 b! w; L$ Q/ o3 o3 M: h- $worker = new Worker();# y+ x8 K' O7 x: C9 g P, C0 R
- // 4个进程
# S# X, z9 u, @; I+ }! u - $worker->count = 4;" u2 J% e1 k F Z
- // 每个进程启动后在当前进程新增一个Worker监听# ~4 j- p' Q. L
- $worker->onWorkerStart = function($worker)+ [* r4 l6 p7 O9 x X; S3 l
- {
6 f0 I ^ Z# [# s$ P4 ^ - /**
$ H% K v$ a5 e8 W! `" | - * 4个进程启动的时候都创建2016端口的Worker
- \' g* a m8 D5 R* \+ I* Q - * 当执行到worker->listen()时会报Address already in use错误/ \& J- H" Q( _3 I
- * 如果worker->count=1则不会报错; }# R4 r4 @ r; F6 b: y- G7 C1 r
- */
" ]. J1 p& C3 X) T: P; Z1 d1 H - $inner_worker = new Worker('http://0.0.0.0:2016');
: q( W7 r5 ?9 ^" @ - $inner_worker->onMessage = 'on_message';
% j# M" V4 D) F" ]2 ^5 U& O - // 执行监听。这里会报Address already in use错误! a% Q3 k+ p" T7 N# f
- $inner_worker->listen();9 _) q2 |# T- f# v& j0 e! ~; v
- };
1 C; s* z4 H: O' G6 U/ r" l - * c. ~5 I; |$ s1 ?" A8 n1 Q6 H
- $worker->onMessage = 'on_message';) l! a) S, b$ |
- 5 G. u9 W3 W5 e- P) S
- function on_message($connection, $data)
8 o! s4 W1 R8 F4 R' o6 l! {2 t+ z - {
* A1 X# ]5 Q7 A4 p+ N - $connection->send("hello\n");
5 z1 f1 m, N- _3 @# p, \. h - }
# ^0 l- v) b/ x( @3 f
+ V( R% y6 d$ ^) V" `- // 运行worker9 ^1 {! T7 |: o- S6 G, h- N
- Worker::runAll();7 p* a; I1 s: l, r- _
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 a# k. f2 y6 U0 n& p) }0 l
, g M" t4 G0 C) K- use Workerman\Worker;; R8 G$ w6 a! D0 u5 e2 f' P
- require_once './Workerman/Autoloader.php';
% A" J( z9 O# g/ L& u$ A$ t
( {4 e2 R( K% w; n* I; T, E1 e- $worker = new Worker('text://0.0.0.0:2015');: b) ]1 T3 ]5 Q
- // 4个进程9 {% t- x( l; _4 G( z/ `
- $worker->count = 4;
# A4 @1 u3 S3 q1 E$ G - // 每个进程启动后在当前进程新增一个Worker监听# G1 y4 H7 I9 W
- $worker->onWorkerStart = function($worker)
# t( ?; E0 o! y9 D, S( T" S% k - {
' x# Y- w' Z% v - $inner_worker = new Worker('http://0.0.0.0:2016');
' {* ~( j; i( A. w' Z - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& i$ \1 J- ]: c8 U% F* ]& f" ~
- $inner_worker->reusePort = true;
( U9 E. j+ _9 ~+ h - $inner_worker->onMessage = 'on_message';
- I( y/ G& D6 ]/ W9 V; c - // 执行监听。正常监听不会报错' m6 B8 B7 @% ^7 T
- $inner_worker->listen();9 t2 R3 ~2 h( @9 o9 X
- };
& V* }' ~* Z/ \& l
2 I' h! s, d3 {( S- ^6 M+ v: ~- $worker->onMessage = 'on_message';
4 H" B; I: v) f5 b+ w - / g, Y- ]# M# p$ I2 ^! w
- function on_message($connection, $data)/ f. q: _' i1 @* t
- {& S( v3 r0 C; u) H
- $connection->send("hello\n");2 z: J$ c; l0 m6 Y8 S6 h { h
- }) y& @' U+ ]& m/ ^* s, D
4 z0 G9 q5 D/ g M0 K; g- // 运行worker
/ ^2 \, w: W2 n% n& q4 g - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
, R3 d, Z, F8 G+ Y5 |1 z# z# w - use Workerman\Worker;: ]4 |4 y# F p: f# b7 I
- require_once './Workerman/Autoloader.php'; K! }& e, a5 ^2 W+ g4 v
- // 初始化一个worker容器,监听1234端口- Y" n5 T+ ]7 b* J& K* M
- $worker = new Worker('websocket://0.0.0.0:1234');) j/ X+ ~8 C9 M" ]
- 5 C9 t; E5 n7 h0 b' y
- /*
6 s5 R6 l' A% R' l* e3 b6 T: g - * 注意这里进程数必须设置为1,否则会报端口占用错误
. S; g' T4 m9 S1 p X - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)' @+ H3 ~* N* j H4 t8 K) d
- */: u4 n h1 x/ @5 g- G
- $worker->count = 1;
) }1 @, ?' n$ i - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口( f- E) Z2 G4 j) R, ]
- $worker->onWorkerStart = function($worker)
3 D+ T( I- h; {+ g% y ~ - {6 m. K9 Y( @+ r
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
R1 L5 i' {8 T& ?0 { - $inner_text_worker = new Worker('text://0.0.0.0:5678');
( \+ R+ j) W1 J- }4 _ - $inner_text_worker->onMessage = function($connection, $buffer)
4 k0 ?9 n H, p$ [# } - {
, O) f- Y9 `+ ]; a7 x% i- V - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
9 x' U7 I5 s' P: j: P! p! | - $data = json_decode($buffer, true);
- r' [. h/ E, \9 f4 | - $uid = $data['uid'];
4 c4 E+ |- g+ |* @* V - // 通过workerman,向uid的页面推送数据
3 x6 [( o% M; k2 { - $ret = sendMessageByUid($uid, $buffer);
) ?2 f, u& [1 i- D: i( h- f - // 返回推送结果
1 X1 E- r" N/ z+ ~& | - $connection->send($ret ? 'ok' : 'fail');
: w3 N1 q) B5 [- U& z$ q& o - };
% r3 [( q5 B) c" o/ g+ ` - // ## 执行监听 ##0 [' `; W' {$ Q5 `( o
- $inner_text_worker->listen();
% n3 C" x: U, d, v2 k4 J6 t( O! `1 B2 i - };
. S2 j9 J& D/ `& ?4 b6 {, j% O - // 新增加一个属性,用来保存uid到connection的映射
7 F( J& L5 x" A$ } - $worker->uidConnections = array();; e1 m" F& O6 f$ T" N, B8 `( v
- // 当有客户端发来消息时执行的回调函数& k8 S9 ^2 g0 b" F/ P+ d
- $worker->onMessage = function($connection, $data)
" l- D1 v: Q5 b# }& f" I4 B) }. ?0 m - {
% C; t: f( V/ n0 H3 B! E - global $worker;- \( _5 |5 B( m2 V
- // 判断当前客户端是否已经验证,既是否设置了uid& B M. o9 A3 b' b
- if(!isset($connection->uid))
. y1 e+ i# B- a. u( u2 ^& p - {
. \% q5 K( X, m ^6 j, s3 q - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)) _ Z! e5 A, v4 _' [$ J
- $connection->uid = $data;
3 q2 N. f2 ^7 ^4 Q: r3 U3 Q3 } - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
/ t6 S2 d, f r5 ^6 V - * 实现针对特定uid推送数据
" y# x1 {. F7 ~/ E9 G9 ~+ T - */6 b8 S( X( D& n$ y
- $worker->uidConnections[$connection->uid] = $connection;
- O9 p( g7 F! H, {0 n7 @/ F - return;, M1 B' M' C7 y8 x/ b: `
- }. i+ R( U" M7 \/ z
- };1 a8 S6 Z1 U. ^- I( f2 D
- 4 i- g. \$ O5 m; W& D$ V
- // 当有客户端连接断开时
( ^7 I9 Z, S5 D) y - $worker->onClose = function($connection)) X4 ^" F% G9 d. |: C. @
- {
1 \( ?. ^, m0 w& W7 m( ^ - global $worker;4 |6 e( k/ V* X
- if(isset($connection->uid))4 \+ s" x9 Q" l( b1 @- \; d
- {8 s' I, @4 K f, p3 S
- // 连接断开时删除映射/ \2 W: q V& @6 a$ A
- unset($worker->uidConnections[$connection->uid]);
6 `1 ~, {% l) Q/ q0 i3 } - }
e+ m u; a F* A% G9 Y - };& l3 {- g5 s0 [
- * t9 H! `& W. R5 F: R
- // 向所有验证的用户推送数据( W+ {' ~1 Y5 x
- function broadcast($message)5 d4 V) \+ `$ |9 i3 @4 ?
- {
, j" O z& u9 c, X \! L# i8 e4 L - global $worker;, S0 I* T8 N: D$ G3 T; K. u
- foreach($worker->uidConnections as $connection)
) I2 d A5 N3 G Z - {
( ^( A, n9 E- T& z - $connection->send($message);
& r$ n% w8 H9 t: \ - }: O3 V& L+ S4 S0 |# I: s* f
- }8 d0 {/ G+ J* V$ N* I
- * `; W; Y! T" b- M9 B% o5 B
- // 针对uid推送数据% \ q7 e6 K7 [3 u' Y" l
- function sendMessageByUid($uid, $message)+ H2 R! j" ^ x0 W, Z( {
- {# _5 E u7 a: j! P _# c I
- global $worker;! }; E7 T9 S5 Y$ O
- if(isset($worker->uidConnections[$uid]))8 h) I$ f* F6 g, u6 w$ r
- {! E3 u; j7 x, y8 A0 a
- $connection = $worker->uidConnections[$uid];, H2 p/ H d/ w$ T9 |+ m
- $connection->send($message);
& _5 Q' \3 X. o& k5 K - return true;5 \; Q) L; {' Z1 z$ m
- }
6 k; Q0 _" f' ~ - return false;! i; a1 R7 q6 n* Z8 g
- }
- G1 H5 y( N) o2 A
1 S P% b) [6 U- // 运行所有的worker# @5 a: t4 u" t4 Q9 I
- Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');1 A; H7 h" A3 K b/ [
- ws.onopen = function(){
; R5 c) a! w. s, Q9 u" d/ J - var uid = 'uid1';
4 O" o* d. S5 s, e4 d* X - ws.send(uid);8 Q8 n0 e# i- \2 S( V$ e
- };
; n5 L. U' T% p$ A- {# z - ws.onmessage = function(e){
! d$ i& y) I, b- _$ F# B" Y6 k* ~ - alert(e.data);
9 ^* G& a( e! ]; m U5 H1 ] - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口# ^; W+ V, C7 C" w
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
9 D1 p& R! I6 f1 H7 G8 E - // 推送的数据,包含uid字段,表示是给这个uid推送5 Y" R+ j2 {: }4 T. c
- $data = array('uid'=>'uid1', 'percent'=>'88%');+ o9 P) |. H8 c+ |3 f
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
( m* p$ j& J6 ~9 W9 j - fwrite($client, json_encode($data)."\n");
/ v3 m" o- L& a0 E" O - // 读取推送结果
: A1 \" l c& N2 n1 L7 Y8 p0 ~, E - echo fread($client, 8192);
复制代码 0 I y+ K2 d, a. X; z
; k$ F- L. {& W# ^0 T |