- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;( Z* F( m5 C) f2 }3 g1 t
- require_once __DIR__ . '/Workerman/Autoloader.php';% u2 O# g- E) N7 Q1 @
- & r. `- ~6 K& j" i+ t* l# S4 U5 z
- $worker = new Worker();
) U/ _2 T/ ?$ G+ u$ e3 v - // 4个进程! C' B1 J( {: I' W' { _& O
- $worker->count = 4;: C8 ~1 b" ~; W) O- ]: q: d9 ? q8 J: B# e
- // 每个进程启动后在当前进程新增一个Worker监听7 C6 N ^, w4 k$ S* T
- $worker->onWorkerStart = function($worker)
. J; b4 z) W1 k: p k7 W9 W9 D - {
3 X4 ^9 |& y5 L6 b- }. L - /**8 ~- b5 \/ x5 |9 P9 D: Q
- * 4个进程启动的时候都创建2016端口的Worker
h9 q1 F( J& f' q6 G - * 当执行到worker->listen()时会报Address already in use错误
! Q' _$ L9 R* I - * 如果worker->count=1则不会报错# R! F1 _* M. Z3 D1 Y' V/ a+ }
- */1 z" P/ X# Q9 F' w6 P8 G
- $inner_worker = new Worker('http://0.0.0.0:2016');
) f1 f$ ^. `; u+ z1 | - $inner_worker->onMessage = 'on_message';- ?3 T/ g1 R4 s6 l+ E& L
- // 执行监听。这里会报Address already in use错误
' a" r+ v- U7 `( e- x4 e$ ` - $inner_worker->listen();( q. @# w, B7 ~- L# N1 y, e' R
- };- I1 T* ?: |8 D4 n- a( d6 j0 @% L
5 x/ g1 s x& R( ~* \, F- $worker->onMessage = 'on_message';0 B- l" k6 P1 D7 f
- : f8 Y, C$ G9 `- i
- function on_message($connection, $data)2 I4 b% ~& K7 k! E9 Y% |8 I
- {
) _0 m7 E0 S G& H: w - $connection->send("hello\n");% ]1 K9 j; C( A! S7 F; m( L
- }5 k8 K! L4 ~2 n0 B1 Q. B4 O) a: N# c
9 t+ e! Z; P! z- // 运行worker
! Z3 m' L* @* r: u - Worker::runAll();! H4 H$ B# A" C' y4 [! _6 O9 M
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
2 {$ q) t+ d" ]& m# s* @ - 8 h! L. {2 b" j! \, d, i: l
- use Workerman\Worker;. w. F+ ?8 V4 H$ [4 Y
- require_once './Workerman/Autoloader.php';( S* C1 X6 t [. S
- 2 l( W7 O. r" s% d8 n
- $worker = new Worker('text://0.0.0.0:2015'); G# E. ]+ @; ~) I
- // 4个进程
' {- U8 a; i6 }. C7 B - $worker->count = 4;
6 Q+ V/ H" S( b+ z' |# L - // 每个进程启动后在当前进程新增一个Worker监听
' u( N& D+ e/ T7 d ^" M - $worker->onWorkerStart = function($worker)
$ o/ e: a2 Z4 p* D# x2 H. ]/ k# X" r4 h - {' {+ \$ y* c. n) w) y3 p
- $inner_worker = new Worker('http://0.0.0.0:2016');& u# G1 F* D; m4 ?
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
% f- a, k/ t/ o' Y( \" | - $inner_worker->reusePort = true;
; k# x& X! K W/ z5 c$ P \ - $inner_worker->onMessage = 'on_message';
' g" R2 u1 n$ ?. g: l6 m( @. g - // 执行监听。正常监听不会报错' L7 q) A5 ?# g1 W$ G/ o2 i) B
- $inner_worker->listen();
) J& n. ?1 L8 R) U! p0 S - };
# b7 J2 _) ^0 b0 v' \- n
& B& c+ W: h B: R* |) b) H- $worker->onMessage = 'on_message';3 m: ~ H% Z4 ~+ w
) J; R" k/ N8 {- function on_message($connection, $data)
9 m0 s L9 k2 ~; _2 @/ ~ - {' V, y# o0 f/ w j" n3 v
- $connection->send("hello\n");1 ] [4 k4 p( b! h$ Q' g0 f0 N
- }" Q) F0 o/ f6 s5 j& X2 ~
4 D. m* F5 U& m- // 运行worker
/ _. x d) d& B7 n. F - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
2 ^) R! e% q' ]) Y2 q* N R/ l3 G - use Workerman\Worker;% n; y4 T, s. D: I# G2 _
- require_once './Workerman/Autoloader.php';
8 ]2 E* g+ x0 e$ q - // 初始化一个worker容器,监听1234端口
s1 ?" y7 f$ F$ Q6 x; N - $worker = new Worker('websocket://0.0.0.0:1234');1 v' X8 d) y2 @' g* Y% L9 [' O
- 8 D2 S4 w% I9 z
- /*
4 q; z- b0 g- l# K3 J - * 注意这里进程数必须设置为1,否则会报端口占用错误. v8 t; N H& d. x; Y' h
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)$ F, W" T5 s$ I+ |( o
- */( S# ]* _/ B% B
- $worker->count = 1;
]: L/ c% d! R# ^) Z - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口6 G9 N. ^+ i- i+ Q* I5 ?9 `) v) q
- $worker->onWorkerStart = function($worker)
/ J% W) G7 t6 C2 M3 s- I - {
/ f; d5 p6 P" i! Z! V( a - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 A$ ?& H) n% w2 u0 W. `2 P5 }, t6 P
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
+ Z& X$ l$ r2 h* r: m - $inner_text_worker->onMessage = function($connection, $buffer)" j$ Q! f# [$ q, U
- {
8 z1 f" U. A1 V; q4 p9 O$ z - // $data数组格式,里面有uid,表示向那个uid的页面推送数据. v! A- j5 p! j$ X* g C* b
- $data = json_decode($buffer, true);
' @( _' |! k) b2 o - $uid = $data['uid'];
! h x# f2 ^9 w6 i5 } m* n - // 通过workerman,向uid的页面推送数据
6 M0 m7 g6 |, v3 }% [ - $ret = sendMessageByUid($uid, $buffer);$ X# [$ x7 ]$ o" d8 m
- // 返回推送结果
0 v1 i8 N; f6 B0 ? - $connection->send($ret ? 'ok' : 'fail');
9 r+ V; B7 M9 [ - };
# |9 m0 N( o e5 { - // ## 执行监听 ##
- T" u6 }; c" \& A( R2 M - $inner_text_worker->listen();# I: Q% W6 T/ G \' I3 U
- };
) F4 V6 z7 P) D( F7 r - // 新增加一个属性,用来保存uid到connection的映射3 d/ J( k7 W* u; W
- $worker->uidConnections = array();
: J! P8 y* E E; X# i/ O" {8 {! k - // 当有客户端发来消息时执行的回调函数
$ e2 J% k5 U" f, A- P; ~ r% F - $worker->onMessage = function($connection, $data)0 x9 K0 m) u6 r7 s3 d$ n
- {/ A7 d) z- g+ V& k2 ~5 ]
- global $worker;; |6 k6 k4 [- w0 q. k
- // 判断当前客户端是否已经验证,既是否设置了uid
* q l% d( s( ~7 j0 r, I* L* q - if(!isset($connection->uid))
; x( M4 o: A% v( J3 V6 o - {
( R L; R. O$ w! F2 e5 n - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& V3 T D6 X/ P4 ?3 c
- $connection->uid = $data;/ ^ g' L; \1 w+ L9 I: ~
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,0 i* Z5 }) A( }9 |$ K
- * 实现针对特定uid推送数据
, {* M9 h" Z' Y/ A1 G8 N( ~3 a - */
: ^) [0 _8 \5 K8 r - $worker->uidConnections[$connection->uid] = $connection;
) u1 K4 c6 S7 D. p) i# b7 o - return;
$ G5 Z3 n+ W; [" A: _ R - }
; ]: b f* I7 O0 z# _ - };
0 Y7 C: y8 g4 [! E6 F - ) t1 h/ I7 ^0 }: C1 q B3 N8 m* |
- // 当有客户端连接断开时+ D5 [( A, y# g: i3 T$ U: { Y
- $worker->onClose = function($connection)
7 b- U! _$ U: A9 m: {- q$ q - {8 y- {; O. O& [) i; [9 L
- global $worker;
4 H0 V. x' F0 c - if(isset($connection->uid))4 W+ g2 R& m( V' H5 n) B3 W4 Z
- {
7 d% [6 I% b3 s" A; g - // 连接断开时删除映射
# k; G6 U8 }* K0 N( m: D - unset($worker->uidConnections[$connection->uid]); n0 k* t0 Z' L
- }! P6 k& t& a5 Q: p/ p e, d
- };
- g. F1 E* N% _; L3 i. ~ - ! Z+ H% a# v+ {9 u
- // 向所有验证的用户推送数据* s+ m/ `9 w) L2 |
- function broadcast($message)
`5 t) T8 c9 P% n5 m' l - {) y" k: g7 j$ o/ Z
- global $worker;, R7 o. s! K9 @
- foreach($worker->uidConnections as $connection)
) v- a5 t8 E( x6 Y W- A0 H8 Y - {9 i; I4 H e( K* q( g
- $connection->send($message);
6 h+ U# h9 U$ R% B- ]8 ^! k - } {' ^1 m. D6 \7 w
- }
- r9 X0 P9 M+ C/ M3 x& F - 4 S$ M; L$ d& d& B( { V: d# b
- // 针对uid推送数据* V! ?* ]8 x: h* e- j
- function sendMessageByUid($uid, $message)
+ Z& A9 v& q4 ~) } k z& ^ - {) g# a9 R( _4 L6 Y0 q
- global $worker;: e2 F: N' P) k: s
- if(isset($worker->uidConnections[$uid]))
+ _ Y* j, {! g/ B, z& Y2 i - {
. h2 k2 X) E ^) T$ o - $connection = $worker->uidConnections[$uid];1 N$ v. R3 E3 o- [+ y
- $connection->send($message); F0 `2 H" L0 U: p! W
- return true;' X& O! q! ^! ?; p3 Q/ l5 [
- }
/ X/ q# _5 C5 O6 \ - return false;
5 _- r9 v: U) U/ V- t - }
$ M0 ?: K9 h) d* b
4 F$ F, `" d) G2 e% j2 f- // 运行所有的worker
% j" f5 ^ e2 X: J - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');3 s# u2 M+ ^2 N, z4 V& [0 d
- ws.onopen = function(){
+ B9 ~1 o8 x8 ]9 N5 [( M5 ~ - var uid = 'uid1';1 W" A s3 T1 K" _5 K1 Q+ x
- ws.send(uid);
0 d+ P+ Z2 g) V8 f# N& D9 S - };
( v$ F: ~/ e7 y& { ?" l - ws.onmessage = function(e){5 |1 }# v/ B+ x6 A1 ^) g* p
- alert(e.data);
, P+ G. O* A2 N6 G9 @ - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口8 D- ?. g7 p3 C9 \2 C) ~* p" [
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
9 V) Y$ B2 M2 x! k; F; W3 B& w - // 推送的数据,包含uid字段,表示是给这个uid推送
2 T. v' h+ I/ E$ X0 k& A4 u# H' ` - $data = array('uid'=>'uid1', 'percent'=>'88%');, g. v( x: i$ J* ]
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% u- b$ e5 T5 Y' L) V( q% H
- fwrite($client, json_encode($data)."\n");1 l& v8 X/ o: O) e2 @
- // 读取推送结果
M+ J2 I1 t0 L5 a1 r, x: u - echo fread($client, 8192);
复制代码
7 `, h. w# Z& l7 M; s
% P6 S- n* \8 V |