您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10559|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    * y' f3 H# J/ J
  2. require_once __DIR__ . '/Workerman/Autoloader.php';9 F, `' }  c* Z0 p
  3. # q# u2 T1 B, G9 z3 f/ U
  4. $worker = new Worker();' |0 \$ l; `7 _( o* s/ d
  5. // 4个进程
    . m, }, n( B) M
  6. $worker->count = 4;
    2 V' P# x# z( q# N3 ~3 R" r
  7. // 每个进程启动后在当前进程新增一个Worker监听
    5 M% r* ~9 b) w3 \+ `, r5 d. ^" X9 M
  8. $worker->onWorkerStart = function($worker)- h0 ~& }( N6 |, K
  9. {8 V( S' @! Z6 D9 i+ v
  10.     /**- R/ z: p; Y0 ~. s) g1 c
  11.      * 4个进程启动的时候都创建2016端口的Worker' x; Q8 i0 \1 x
  12.      * 当执行到worker->listen()时会报Address already in use错误  o) i3 z9 m; R/ }
  13.      * 如果worker->count=1则不会报错
    % `! u9 L+ i) K2 U. p( P9 J) g  o6 h
  14.      */" _1 h& s7 ~: X) t) }, ~# P3 T) J
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ' u2 G% N0 e/ j" n+ b! Q  b
  16.     $inner_worker->onMessage = 'on_message';
    9 p# A( u. K% S( l
  17.     // 执行监听。这里会报Address already in use错误
    5 U" b7 A" }  x6 v2 \# U
  18.     $inner_worker->listen();
    8 U( B* t* P& U1 s
  19. };
    ; b, g: y& j5 z  w+ W/ R, P5 ]
  20. 3 C0 N- ~" V6 l' E/ x- y& L) L
  21. $worker->onMessage = 'on_message';2 j; M) A$ g- x- \2 }

  22. , J& O: L& G* c5 t! v  w- z% C0 l7 z
  23. function on_message($connection, $data)- k9 u& w3 z# Y# \5 {1 }3 {" E
  24. {
    & d7 k( ~- X" K% i1 d7 I- f
  25.     $connection->send("hello\n");
    ; y% x) H# P5 o6 h& u5 `9 B
  26. }6 i8 G  b, G' ~: J- W/ h: h  W: D
  27. , O, q3 u$ t$ u( ^
  28. // 运行worker
    , c1 }1 F6 O, ?3 V
  29. Worker::runAll();8 W5 M* I& _4 e( {. F
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( f; A' n9 y/ F$ Y8 D

  31. 6 h( {! `4 f8 b$ d
  32. use Workerman\Worker;/ N7 l/ [) D/ g  p5 |7 o
  33. require_once './Workerman/Autoloader.php';  D; g+ S$ S7 @

  34. 8 S6 `7 S* x" N! H/ M, w
  35. $worker = new Worker('text://0.0.0.0:2015');& a* n/ F9 A2 l
  36. // 4个进程
    . D. z* Z) V$ X& |
  37. $worker->count = 4;: ~5 c+ j) z8 P( h. C
  38. // 每个进程启动后在当前进程新增一个Worker监听1 `& c2 ?9 {/ A
  39. $worker->onWorkerStart = function($worker)
    * D! T' `. c; t  P: e0 M0 G
  40. {0 ~& b6 Y0 Y. e/ w, a+ H. l1 T7 u+ T
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ) D9 H1 ^9 n; j6 _0 R
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    + c: K; F: t9 [: V7 T* Q# q
  43.     $inner_worker->reusePort = true;
    $ E9 ?4 B0 H# C) M! x4 J6 d9 }
  44.     $inner_worker->onMessage = 'on_message';
    # P. V/ M3 f0 V9 g% e6 r& k
  45.     // 执行监听。正常监听不会报错
    # x, q; D% ]3 c! L. [4 G$ _
  46.     $inner_worker->listen();
    2 V* R; r4 G# K& \& j
  47. };
    6 ]" c# [& P0 t* v

  48. # b7 T. `/ B7 |' o
  49. $worker->onMessage = 'on_message';
    9 I8 r2 W6 C) q% j

  50. / i  S0 n7 j2 D; f
  51. function on_message($connection, $data)
    : V) s* w" d1 X7 S
  52. {) Z7 v, R2 N& L: a, S  \
  53.     $connection->send("hello\n");
    7 j0 N) D0 I$ H; H% ]5 ?" L' M% E2 h
  54. }
    , O3 }. L( w, t

  55. ' s1 f, C8 I  B* ^
  56. // 运行worker# q9 m  S) b) I# P! m$ y
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    , {' m3 R# E. k: }" o1 @5 C
  2. use Workerman\Worker;  b8 L, P7 ^/ i& h
  3. require_once './Workerman/Autoloader.php';) m1 [- o# M0 j+ w$ ?
  4. // 初始化一个worker容器,监听1234端口
    2 p1 F% B8 c* G4 Y
  5. $worker = new Worker('websocket://0.0.0.0:1234');, u/ U. Q8 b$ O% d9 |
  6. , z1 C* `( X: `4 E
  7. /*& E0 o; d+ N, W8 ]8 Q8 v  K- J3 J
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误% R4 H1 m" [' w: ]. i, ~! Y6 F
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ' m9 V9 E6 b" V' D2 u4 s0 ~! D4 O
  10. */1 ?7 Z. H& L& _- J2 `3 p
  11. $worker->count = 1;
    1 A/ z0 R& Q* S% c5 x
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口7 M. \* A, z0 Z8 w& x
  13. $worker->onWorkerStart = function($worker)
    4 g6 m8 I# ?* c; U( k5 `
  14. {
    * r; \; i- l$ _/ P' I2 ^3 m7 \
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符8 G; j" O. L$ T& P
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');) _( z$ G  Z7 j; L+ g4 B& h
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    3 q6 G  R( U, N8 q
  18.     {
    9 b3 f' V4 m$ }1 p/ O& x6 g
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据$ o6 s% p6 b: T
  20.         $data = json_decode($buffer, true);
    - j5 P' \( j' H9 D9 p: U
  21.         $uid = $data['uid'];
    + F  f( E4 |4 I1 R! `, X7 u
  22.         // 通过workerman,向uid的页面推送数据
    * B- Y/ y- ?! F
  23.         $ret = sendMessageByUid($uid, $buffer);
    1 t3 H6 j. ~$ Q* F: K3 M% a
  24.         // 返回推送结果
    8 U- s2 I6 n5 v5 a( X
  25.         $connection->send($ret ? 'ok' : 'fail');
    ( z, o- l' P1 @% e* r
  26.     };. Q) k2 A2 i' [5 x- [5 A
  27.     // ## 执行监听 ##6 s/ v- @% h: J0 \1 v
  28.     $inner_text_worker->listen();
    8 @# L  y+ d: @, c. v
  29. };% B% K6 ^- [, t" m  d5 F
  30. // 新增加一个属性,用来保存uid到connection的映射2 `6 v- x! A3 t, k, L
  31. $worker->uidConnections = array();
    , _2 }' n( j6 O; Y- @: n" @
  32. // 当有客户端发来消息时执行的回调函数) L$ ^2 d& a( C) {2 s+ K& p
  33. $worker->onMessage = function($connection, $data)3 V: x% _  V, j2 Y5 o) |/ u/ P" Y
  34. {
    9 L% ~9 Y0 @1 N* y9 L4 Q
  35.     global $worker;
    3 j+ ~$ T8 C. T7 }1 i3 v) H8 X- q) z
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
      B' T7 T) A" n" r3 M5 z6 l7 d
  37.     if(!isset($connection->uid))1 \0 j6 u: U" d: }' |9 F
  38.     {
    9 m+ F" B. i, k3 c8 M# W
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ; L# A" r* \' e  Y$ w; O7 q3 O
  40.        $connection->uid = $data;: `( N. v  j! N* q
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,. G( Z8 ?% L; T0 l0 D  _+ d6 z
  42.         * 实现针对特定uid推送数据5 Y$ C/ B; [& W. r* @$ c& K- u
  43.         */0 n$ g# J: |/ b6 @. N9 M
  44.        $worker->uidConnections[$connection->uid] = $connection;
    $ j. S- l: Z! r" a1 \+ p& \, L
  45.        return;) }; `# e8 g/ n& G/ N* U: h8 z
  46.     }# ^- X& f+ U/ Z1 u# W; w, g
  47. };
    6 U3 A7 [' H0 d0 [0 n

  48. & Y, I9 `2 n- P! W  B; f
  49. // 当有客户端连接断开时
    - y7 T9 K0 E" S3 F, ]
  50. $worker->onClose = function($connection)  o" ]( Z' m5 c  }' ?( G
  51. {- g' X2 f8 @2 x5 j! e0 M3 R5 _
  52.     global $worker;4 ?$ U" m' k# ], U+ C
  53.     if(isset($connection->uid))" g$ H5 j5 H0 p" |, N0 N
  54.     {, D6 r+ j, K' q7 A, \+ }9 ]
  55.         // 连接断开时删除映射) U# W/ j/ S# s+ \
  56.         unset($worker->uidConnections[$connection->uid]);3 j4 r. n) l5 I7 ]0 g7 `0 }+ X
  57.     }
    - [5 g* n( l2 g4 O
  58. };
    # k- s% }/ t/ l$ Z# Q/ i

  59. 2 a# h% M3 h, U
  60. // 向所有验证的用户推送数据
    ) b1 T. B) R. H
  61. function broadcast($message)4 l+ D/ ]) R% |+ J
  62. {
    ; O8 ~& @; u9 @, w' ?4 F; M
  63.    global $worker;' E0 v+ }( p) ~5 W
  64.    foreach($worker->uidConnections as $connection)3 T. ~9 y/ k1 U& U& M0 N1 F
  65.    {
    ! }  F3 h7 Q- w* l  q( I# S* X! E; N5 c
  66.         $connection->send($message);" y; S4 X( K/ _# e3 |' q% o
  67.    }2 y4 i/ `2 s4 y  C7 X' L& w4 e
  68. }8 L: y/ ]6 _6 v  |! m# t0 Q5 ?
  69. # a# Q- R  J- p; r/ I, {7 `
  70. // 针对uid推送数据
    ; F0 ?- Q1 Y' _- P( A' v$ g' g
  71. function sendMessageByUid($uid, $message)9 \  o6 I7 t; u  K$ M
  72. {
    * m2 c" L9 |- d$ v! [6 {' E
  73.     global $worker;
    + I4 x3 ?- I: I' [& r: D6 k
  74.     if(isset($worker->uidConnections[$uid]))5 t: b$ y7 r. k% z8 U
  75.     {4 K; E4 P9 p8 J& O
  76.         $connection = $worker->uidConnections[$uid];
    6 W' s; K% `" x& ]/ p3 B6 U
  77.         $connection->send($message);
    7 @; F  A) L! ^& v9 {
  78.         return true;9 _$ K2 K# @! m% V# Z. H/ _7 ]
  79.     }0 L1 _; S5 a+ y( J
  80.     return false;
    0 ^7 x9 ^. i) [) n! n: I
  81. }
    5 J& h! J# W* T% g

  82. 6 ?0 N3 x0 \: ~; k. ^0 Z
  83. // 运行所有的worker
    2 p2 Q( M1 E/ [. H2 p5 Z5 O% \
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');( q/ s" w/ Q. N2 Q
  2. ws.onopen = function(){+ R8 W* C: g+ L8 r9 @( V
  3.     var uid = 'uid1';
      {; J# _* h+ C; p" X
  4.     ws.send(uid);
    - M2 I) ^. a5 K4 O1 t- \
  5. };
    % O: q" Z7 `2 {  z+ k4 m8 u& E* V
  6. ws.onmessage = function(e){
    : k9 Z/ F  Q; p' ]
  7.     alert(e.data);
    : I: _, T6 S( N1 ~* b: c
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口/ k1 W$ v8 R( @$ U0 v9 x. ]6 U
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);% _7 B2 J8 U. |& T6 K0 T1 F+ J
  3. // 推送的数据,包含uid字段,表示是给这个uid推送5 O8 n: [7 z# d9 a5 B1 f3 @
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');4 N. }' V0 Z. K! |6 z5 C+ X* T9 k
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符' k' `+ J' T6 S. g
  6. fwrite($client, json_encode($data)."\n");' C9 L+ X. J3 R4 ?8 `  i9 M
  7. // 读取推送结果1 M3 l. w' W5 M
  8. echo fread($client, 8192);
复制代码

" |" a$ L0 L# h$ P) W' T
/ v% f' _* R4 K2 }! ], Y& g! p
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-19 22:39 , Processed in 0.116926 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!