您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14661|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    7 c  |. J6 c7 @& ~$ w
  2. require_once __DIR__ . '/Workerman/Autoloader.php';* s* j3 {+ n! C4 n0 R
  3. " e5 T  ~' `, L
  4. $worker = new Worker();* E  E( ]) z: D2 a$ }( x, x% `' U
  5. // 4个进程
    . F1 B& j7 s6 f. B
  6. $worker->count = 4;
    & n5 b' Z8 ?  B) @. i9 |- y
  7. // 每个进程启动后在当前进程新增一个Worker监听
    & [) g0 \/ C% X1 e1 {6 L5 q) x
  8. $worker->onWorkerStart = function($worker)
    7 r8 f9 ]; e( }/ R
  9. {( x/ s% k& ]0 p  ^& {5 z4 g
  10.     /**
    - k3 v- w/ Q8 L% X9 w
  11.      * 4个进程启动的时候都创建2016端口的Worker
      o9 F. Z  P& H) I5 g( b; T* X
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ; I, y: a8 S1 U: n
  13.      * 如果worker->count=1则不会报错! P4 U6 l( D. [" E
  14.      */: `9 M7 V: D  D; p3 C! R: H6 V
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');: M$ W  d0 ~2 B! G
  16.     $inner_worker->onMessage = 'on_message';
    , B* t3 I: @% r7 v" w) o- Z5 a
  17.     // 执行监听。这里会报Address already in use错误+ _2 i2 `' w$ p$ l2 S4 f% {
  18.     $inner_worker->listen();
    1 W6 |( U! L. e3 A* b* v5 i
  19. };9 E( P0 G. O- S" W! ~
  20. 3 P* T2 J5 l7 Z8 _# g7 O/ H* X
  21. $worker->onMessage = 'on_message';/ L) r2 C7 y# @6 F$ [' z

  22. 2 r$ ?) `5 B% D5 p- k9 [$ G, o! v
  23. function on_message($connection, $data)
    0 M: m* N% n6 E% m* w0 K- o
  24. {7 U7 v# l( C2 E8 ~' I
  25.     $connection->send("hello\n");: I% c% ^, {. e, l. N* S8 |
  26. }( D+ {7 R9 l0 v, W

  27. 9 u# N0 f% x: [  u
  28. // 运行worker. u" X6 {' g3 E+ u" X
  29. Worker::runAll();4 C: e' h* M4 [% N
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    * {; f9 P* p6 p
  31. - P" E; f) L! m
  32. use Workerman\Worker;! e. J$ K3 ?( x0 w0 h2 D7 \+ a& ~
  33. require_once './Workerman/Autoloader.php';
    7 c+ @' L) \* x+ k9 z0 e- w
  34. , \1 @; w; O. l# w) {, \; g
  35. $worker = new Worker('text://0.0.0.0:2015');
    1 i' Y; Q0 v7 H' h' o
  36. // 4个进程
    - `) E7 {3 g3 B: w) \% X: u" q1 a4 |
  37. $worker->count = 4;
    ' h" |5 q* ?( G0 |
  38. // 每个进程启动后在当前进程新增一个Worker监听% {0 Q+ i. y0 J: M/ ], v
  39. $worker->onWorkerStart = function($worker)' k$ N& G8 b6 V! H+ T5 x3 A
  40. {3 y! I0 ]  G1 U. P4 h: f3 Q
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    + |. y0 Y! N, r' _8 H3 C
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)+ ]5 g, a! e  |; r/ U
  43.     $inner_worker->reusePort = true;
    * V9 {; g$ x$ P! N( d
  44.     $inner_worker->onMessage = 'on_message';. b9 F+ v. q* S# G6 i
  45.     // 执行监听。正常监听不会报错
    % i/ c: `& P" R  y5 Z; A) @9 j
  46.     $inner_worker->listen();' A: j" _/ ?, ]$ Y1 n4 {
  47. };
    ; Y+ @* j& u8 ?6 Z. P* n
  48. 9 U' X2 D  T! {- b7 {. Q
  49. $worker->onMessage = 'on_message';
    ! l3 @2 R  b) @. q1 j  P: O

  50. - }6 m' D4 e( e: v0 ~  U+ h+ U$ V; D
  51. function on_message($connection, $data)
    * }' ?) p2 p4 Z$ k$ N
  52. {4 [6 N! }7 O3 E
  53.     $connection->send("hello\n");
    - p8 n$ `; G: E. K$ e/ _( A2 m/ ^& \
  54. }
    ' K* I7 m" r0 v5 i4 t8 T

  55. 9 `0 @4 G/ ]  [- a6 M4 s! j1 I
  56. // 运行worker
    " f7 J% ^, r4 ]
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php8 `) N6 C+ R8 W, b9 K) J; c# y+ p
  2. use Workerman\Worker;" a, J1 }( N' E4 Z$ A
  3. require_once './Workerman/Autoloader.php';7 W$ w! \- A; B  g6 r9 C
  4. // 初始化一个worker容器,监听1234端口
    1 L! x- a' B6 V5 \8 _) E0 ~
  5. $worker = new Worker('websocket://0.0.0.0:1234');1 \# F# Y3 f0 k# L
  6. ( Q6 x9 ]' b2 h" B/ ^9 f$ j
  7. /*
    % p+ B& d( k# m% g' g/ J# _4 E
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    - J( O7 `, G' k7 L: A. w
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    & x& W: O8 D' G% w, A8 s) G% X0 J
  10. */
    & n) D6 Y$ Z" E: p1 A
  11. $worker->count = 1;8 X! X" I/ |5 u* ?
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# S* {6 m- D7 a, [% D4 c1 ]
  13. $worker->onWorkerStart = function($worker)
    4 }. M) t( r1 a
  14. {8 H6 L# ~6 R6 X; @9 H9 O3 G
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ g7 X: ]: O$ e2 Z8 n& t
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');9 q- @! F1 h1 V$ h) |
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( i. v) I+ F6 ?/ F  ?% S. }% Y
  18.     {* g* C" k1 r5 v- W
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据4 s  F! q6 j2 j/ A" X# F
  20.         $data = json_decode($buffer, true);% M4 x  m3 D4 I3 e+ ^
  21.         $uid = $data['uid'];6 S2 I3 p2 a: P8 e
  22.         // 通过workerman,向uid的页面推送数据: k% V2 X. h- v( S2 ^
  23.         $ret = sendMessageByUid($uid, $buffer);, J! Q" X6 d% |9 N
  24.         // 返回推送结果
    7 `% P7 K& p  S6 x
  25.         $connection->send($ret ? 'ok' : 'fail');3 z, [0 A+ Y% P8 v; N/ d: I+ ?
  26.     };
    7 a6 j% R/ ?3 k- K- x- k
  27.     // ## 执行监听 ##
    8 v# ]' z9 W6 n+ Q" H6 p
  28.     $inner_text_worker->listen();1 N2 X+ R/ c1 q& y
  29. };
    % e; z& u2 D% H6 B
  30. // 新增加一个属性,用来保存uid到connection的映射! x5 r" T# ]( N- I6 D
  31. $worker->uidConnections = array();* b2 W) H. D" ^/ L4 A- w0 W# ~
  32. // 当有客户端发来消息时执行的回调函数
    + j, O$ o4 J2 i. b: |8 B( Z9 ~
  33. $worker->onMessage = function($connection, $data)- V2 G( l+ y  J' ?7 n: h0 m
  34. {
    8 @0 d0 b& I3 M3 I: E
  35.     global $worker;5 a  q" F8 g' Z4 P
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    1 F( t! a( P2 a- E( P& W' ~
  37.     if(!isset($connection->uid))
    # M5 s2 ~4 G! c+ e
  38.     {
    . ?3 t5 o% G2 y; Q( A' T0 E; K
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& H, I0 k; W* m3 Q4 q4 ~: g& Y
  40.        $connection->uid = $data;; {! H. _! H9 [/ j- E* ?. |
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,3 _+ r7 c1 T8 b0 V9 k6 @8 C
  42.         * 实现针对特定uid推送数据- Z; Z0 J0 F6 A( s8 t, h! a( g
  43.         */
    " u! }# \5 Q- V
  44.        $worker->uidConnections[$connection->uid] = $connection;
    1 w# H; r" a: n" e- G* c+ B& R3 W
  45.        return;( S; X0 G9 i  o6 s
  46.     }- Q, k+ l# X0 O1 g+ u" c$ C
  47. };
    1 W3 D4 m2 S& @2 l2 e6 a3 e4 C- P5 G
  48. ; q! k1 y+ G3 t1 [4 D) y
  49. // 当有客户端连接断开时
    2 M# v6 W# d# ^: a$ U+ O: o, N/ A
  50. $worker->onClose = function($connection)
    % Z) c! F$ q! X! U" ~
  51. {
    7 N  C3 h; J; }& j& ~
  52.     global $worker;, a7 t' t; f: c4 `
  53.     if(isset($connection->uid))
    9 f% B5 M! v+ S2 E1 n9 x, X
  54.     {
    $ q2 [* j" b0 W
  55.         // 连接断开时删除映射1 l3 y" E( V& C9 u
  56.         unset($worker->uidConnections[$connection->uid]);
    4 F: O* E$ n$ W: p- F1 f
  57.     }
    0 e5 j3 s3 T1 ]: I' F* {5 N5 I/ i
  58. };
    * |6 {% m6 w9 j, y

  59. * w" ^9 A' X+ ]/ O
  60. // 向所有验证的用户推送数据; ?6 l4 p* _. j& R0 O
  61. function broadcast($message)
    3 `5 V. E/ \; Z# J9 {9 m4 {, l
  62. {. I( J3 c! C( J2 F; P
  63.    global $worker;! a" o- n( s5 }* b0 O  G
  64.    foreach($worker->uidConnections as $connection)
    3 W6 o) }8 e$ E/ n- S$ f
  65.    {: s! h3 L+ J' c2 I/ t- S( ^6 Y
  66.         $connection->send($message);1 A6 V4 X7 F* J, r: g; X
  67.    }# {7 c2 {. y4 I) s5 V
  68. }: _. _! n9 M& @/ @, @
  69. 1 ?: i/ s1 w, V& k% C; Z) W
  70. // 针对uid推送数据
      e  N; L4 x% i; l1 c& n
  71. function sendMessageByUid($uid, $message)3 D; A# Q/ T9 x* C
  72. {& C6 ~% N( `. }4 L5 e) Z
  73.     global $worker;
    ; |, m5 t8 E' G6 J: D
  74.     if(isset($worker->uidConnections[$uid]))$ q3 @- n* }; O/ c8 c* L
  75.     {
    6 r) H1 C/ f& d! S. E" Z
  76.         $connection = $worker->uidConnections[$uid];
      k9 c- g; R. ~) F% X2 l$ i/ o- M
  77.         $connection->send($message);
    & [& {! y- w) ~- D
  78.         return true;
    6 [7 E, F& c# T: k: y
  79.     }* n% e3 n4 M& t$ i6 e2 h
  80.     return false;" C; W, G, l% S1 k$ G: E5 b( L
  81. }
    - K& d5 R. ^# F1 w  w1 b

  82. 6 R. h# V) {) Z0 f
  83. // 运行所有的worker$ Z9 F: k7 o9 I8 ~: a$ T- B
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');3 Q& A- L# F! Y. w8 s- e
  2. ws.onopen = function(){  X6 c* q( k8 O3 W: l' K
  3.     var uid = 'uid1';! Q; H/ O# z! a% O+ K  S. O8 l
  4.     ws.send(uid);
    ' @( p4 o. Q* i( X  b- p
  5. };  [5 Z( b7 U: Q* X: A/ `
  6. ws.onmessage = function(e){
    * P# ?( n) K& d$ u4 T
  7.     alert(e.data);
    4 @$ U. y: E6 [! B6 }7 C/ ^* b
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    & G/ i  [9 I: A! O" F1 i" f' o3 u
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);/ x. u  R2 n2 g2 }# u0 m9 Q" ]
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
      y# Q, x' b+ y4 @
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');7 k% \9 B( a) p% A" N8 s6 l% A. t
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    , W. r9 ?7 `( i2 T# r
  6. fwrite($client, json_encode($data)."\n");
    + I8 M/ z6 e( \" [4 Y% }
  7. // 读取推送结果
    6 `# ]. d* s0 O! e5 |" ^
  8. echo fread($client, 8192);
复制代码

: v  n0 Z+ ?/ X5 c$ C% d; y/ M" f6 K& b$ [
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 14:45 , Processed in 0.061079 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!