cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
      Q/ b( o# D1 v) t5 ~1 w% F
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    6 O% V3 n7 Y0 H. a, E% w  V0 M

  3. 8 @7 {3 T1 T" q8 B) X
  4. $worker = new Worker();; D0 `6 c' Q; w% L% b
  5. // 4个进程
    ; K' o9 ~5 w. A) F
  6. $worker->count = 4;
    - g7 o$ X7 o- |
  7. // 每个进程启动后在当前进程新增一个Worker监听
    - M8 ^9 |" W1 g) F# _
  8. $worker->onWorkerStart = function($worker)
    ' E" y! ?6 _/ J" _
  9. {
    $ S! m" f3 W' F: N. L, J
  10.     /**/ B. E7 N7 A: z& |. a  N
  11.      * 4个进程启动的时候都创建2016端口的Worker% G! ~9 X, v) w
  12.      * 当执行到worker->listen()时会报Address already in use错误
    & U4 @  C% |3 g( ?1 P$ @
  13.      * 如果worker->count=1则不会报错2 C4 u7 B3 e$ T6 ]$ f, F8 u
  14.      */+ w5 B7 m4 E. Z; @+ F  S3 a
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    % h& ^, `+ O3 M) F" m
  16.     $inner_worker->onMessage = 'on_message';
    $ L! _) i( @/ M
  17.     // 执行监听。这里会报Address already in use错误  T0 a% b/ D# g5 B
  18.     $inner_worker->listen();: m, C* Z- v& ~. e2 }( j. [
  19. };/ O/ E. h; ]3 n9 b
  20. 6 p1 e- i, w4 l; J- z6 B+ n/ }
  21. $worker->onMessage = 'on_message';
      V3 K$ u; R4 z, y4 W+ l( c4 V. n

  22. & M2 @) @4 G  X0 C
  23. function on_message($connection, $data)  l6 X6 M: N2 b, Q4 S# j3 r" q' j  |
  24. {
    % Y* w, ^, D8 b
  25.     $connection->send("hello\n");, c$ N0 v% V6 X7 I6 E
  26. }. z1 K$ Y3 ^! F+ Y) j( W: E
  27. ; j7 [' i' O+ M% j
  28. // 运行worker
    $ I% j. ]9 [* Q, x
  29. Worker::runAll();
    7 v2 A' R# C4 d" Q
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:2 o! r# d# d( ^
  31. 1 a/ {; S) I  V. U
  32. use Workerman\Worker;
    . {$ p3 @( A4 |, E$ k; k9 q2 h3 ^
  33. require_once './Workerman/Autoloader.php';
    * u) A! U/ ^! \% I6 S, v

  34. ' t, ~0 E4 P8 K% z( G6 Y5 f
  35. $worker = new Worker('text://0.0.0.0:2015');
    ! G" o, h. l8 u& T2 I7 U& {& K" B
  36. // 4个进程2 n* D- L) x" m9 Q
  37. $worker->count = 4;  b! m8 O; }" B# ?
  38. // 每个进程启动后在当前进程新增一个Worker监听
    6 S. \7 |6 a- J$ p2 K8 |
  39. $worker->onWorkerStart = function($worker)5 @. w4 V& I4 p% u  j, K
  40. {' d" d2 \) @* F& P. M" G5 t
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    1 p4 j* v/ A) ^" }7 q
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ) n- X( P# G# L% b+ P8 G8 I3 |
  43.     $inner_worker->reusePort = true;
    . ^( e5 Y! U2 ^; }# S
  44.     $inner_worker->onMessage = 'on_message';
    ; b0 M: X' n, [6 |4 a
  45.     // 执行监听。正常监听不会报错
    3 y" W2 O4 z/ j) ^. S. V2 j- o
  46.     $inner_worker->listen();
    9 Z" a8 d4 {) F5 z
  47. };
    % B$ ?' q, z2 s' w' }( g
  48. 2 ]: ^# L" j7 I0 t. j- D0 g# A
  49. $worker->onMessage = 'on_message';& B' J# L$ x6 [+ w  @
  50. 9 A; j$ W1 T) M7 s4 l" G
  51. function on_message($connection, $data)6 U( w. }, @% K1 c3 r# D
  52. {; [2 V0 X' U7 T5 X+ c
  53.     $connection->send("hello\n");
    ! T% ~2 ]9 y$ A, N' S8 |, m% Y* m
  54. }4 e3 M: w- c  G* O. U  r

  55.   B: l# n( T1 N+ n- m( q  ^$ M
  56. // 运行worker
    9 ^+ [/ ?- s  _& L. ?& u
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php0 C9 F! ]+ O' W) ]' s0 \: ?
  2. use Workerman\Worker;
    + J5 Y: V9 Q1 w# ^. m, n+ j" C; s
  3. require_once './Workerman/Autoloader.php';
    " W+ Y, I1 _1 y/ n9 s* s
  4. // 初始化一个worker容器,监听1234端口
    ( Q2 d. C2 Q3 p2 P
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    9 X( C, T+ U1 z" N' m) p$ i
  6. / x6 [; }* j* @, Q: l
  7. /*# @1 G1 C  s& ~2 U4 V
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    * p5 W: a! x9 F6 a4 U$ ?* z+ D
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    " q# l1 u2 F  ?
  10. */
    4 ~+ N& E- |% x0 [/ S( X
  11. $worker->count = 1;
    / z# l* p& ?$ ^2 l: B- P
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, g- H$ Y2 F) E! N' p- t2 {0 T/ L* ]
  13. $worker->onWorkerStart = function($worker), ?" l' c6 I( j5 s9 S3 @* L5 A
  14. {6 z& p" o$ T8 g/ Y
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    9 G5 j: W$ }' ~0 g5 F6 o+ P/ `
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    4 z3 @; g" k- p& L5 J2 d3 w
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    2 W5 F0 C2 Y0 j& g% ~9 W4 a
  18.     {$ g, X# v4 ?( i, J5 ]3 x3 P4 Z8 ^
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    8 ~% e' M9 P( J  a9 o
  20.         $data = json_decode($buffer, true);
    ( e: B1 Z' V- ]5 r+ I& _2 L6 s
  21.         $uid = $data['uid'];/ {7 G/ n3 Z1 w
  22.         // 通过workerman,向uid的页面推送数据
    # D* a- c- j8 b7 _# \
  23.         $ret = sendMessageByUid($uid, $buffer);4 X# S. U' F# |3 v/ M+ T( x
  24.         // 返回推送结果
    ' ]$ z$ |' ], m9 t: Y- {# Q  K
  25.         $connection->send($ret ? 'ok' : 'fail');2 h* P$ Y; n5 m/ b# f1 y
  26.     };5 m& u, U6 a# m/ l) ^0 k
  27.     // ## 执行监听 ##
    7 R- c, Y) {" F2 W3 V5 l9 \; W( \
  28.     $inner_text_worker->listen();
    * ]/ I( i% U( T" t
  29. };
    6 g' V' g1 Y; o, F, K- L: w( i
  30. // 新增加一个属性,用来保存uid到connection的映射
    . a4 H! k: j  F0 {. |
  31. $worker->uidConnections = array();
    7 H' f1 ^! X( S1 Z, ^
  32. // 当有客户端发来消息时执行的回调函数, N3 h( h$ x- @' G3 Y9 i$ I. m5 l' c
  33. $worker->onMessage = function($connection, $data), C% U. e- ~. ?7 L% j  U
  34. {
    $ p4 i5 ~- Z6 W" x0 I
  35.     global $worker;
    ) |' X0 o) j( i/ A' z  @6 |+ ?  T3 U
  36.     // 判断当前客户端是否已经验证,既是否设置了uid/ b' j* `6 D: [" X2 S
  37.     if(!isset($connection->uid))# A3 Q  }0 J0 z
  38.     {
    ( X" {/ _# p2 X6 E; J  V/ h5 X1 s4 z
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    # E  h( y: R' V5 Y8 W: S. v
  40.        $connection->uid = $data;+ N. H/ ^3 D$ h- j0 R& c; A5 I- e
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    4 d: V) _! u4 v
  42.         * 实现针对特定uid推送数据0 s( y' O5 A3 i9 v: k0 t
  43.         */
    6 B& K, O6 E& Y) e' Y7 V% e4 {) }9 ?
  44.        $worker->uidConnections[$connection->uid] = $connection;
    0 v: t/ P) n2 c- B8 m: E
  45.        return;  f& m, A0 U+ V, Z- I& W3 g, n
  46.     }
    9 y1 F5 N0 J5 B& C* R) H+ o
  47. };
    7 y, c* H* ]4 k3 R  q- ~- |, U9 J) A/ x
  48. " Y1 R* v! r* ~* Z7 y( c1 j# }
  49. // 当有客户端连接断开时& G9 L+ `9 n+ a: ^8 v. a
  50. $worker->onClose = function($connection)
    ; m$ o( k" v0 u
  51. {" `, _9 O0 n# x. t0 m/ \% j7 w2 u
  52.     global $worker;
    $ v% Z% Y# T4 Q
  53.     if(isset($connection->uid))
    ; m! }* P% u+ d& ?0 Z$ i
  54.     {
    6 F2 C' [4 S% w) F: d4 [& q
  55.         // 连接断开时删除映射# w% G8 s( j1 f+ ]/ C
  56.         unset($worker->uidConnections[$connection->uid]);
    ( A( Z# z2 `& ]  p, G
  57.     }
    ' }- l% J) a/ }, a* z
  58. };
    & i7 Y8 K5 @2 m( N! G* H

  59. ; g/ k  m2 G0 N3 v' T) ~
  60. // 向所有验证的用户推送数据
    : G6 {7 y+ _2 A- v
  61. function broadcast($message)
    8 U/ z7 |1 D% k1 l# G
  62. {! C/ C! n$ f$ l- g
  63.    global $worker;. J% l3 s1 R, a2 L# e! B/ m
  64.    foreach($worker->uidConnections as $connection)
    $ [; R0 N# r; s; g
  65.    {
    + e3 o2 S7 r  C
  66.         $connection->send($message);
    6 O+ \! D3 D5 ~3 M4 L, H
  67.    }
    : ~# [/ G8 l" t
  68. }
      N* O0 l& I3 t' K

  69. 8 `/ n, y& ]  j7 ^9 B# Z5 z; z
  70. // 针对uid推送数据: Y4 m( V4 ^: ~* @  a
  71. function sendMessageByUid($uid, $message)
    $ P# V) W. V7 e7 W6 ?
  72. {
    - f, L! Z" Z& U8 X' ]
  73.     global $worker;
    $ a, d2 l# n$ H/ ]7 \# l
  74.     if(isset($worker->uidConnections[$uid]))
    " P" `, {& W0 N  o
  75.     {% a- l9 Z! g" V) _/ H
  76.         $connection = $worker->uidConnections[$uid];
    . Y% L. N9 Q! j
  77.         $connection->send($message);7 ~' f' t; d. D" n- ?# U/ T0 L
  78.         return true;
    5 ]* F* j/ n$ ?  ]
  79.     }
    2 X: a# u# \. j0 m7 r* J: m8 c
  80.     return false;
    # u* `4 F) t/ V* n) y( M
  81. }
    0 v; |0 Z* U0 h' }) g

  82. 5 m, ^/ @2 C! x5 D
  83. // 运行所有的worker' x6 `- d5 _5 T: x' Y
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');6 Y3 t- l% I  Y$ z
  2. ws.onopen = function(){
    5 y1 _' e0 g/ G6 W1 r3 D
  3.     var uid = 'uid1';
    $ i% z( j  h2 {( Q& b# _
  4.     ws.send(uid);& X! Z0 `3 F5 R6 Y
  5. };9 @( [1 B9 B& {% x2 H; |6 g
  6. ws.onmessage = function(e){1 x0 L6 m+ Y. K5 M
  7.     alert(e.data);1 ]( p% r8 _* A* B
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口$ s3 K% y( \6 Z1 A' ]' L
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    " i0 o5 c+ e1 O& l0 {" J
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    / p* |3 ~" @0 H4 `
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ; P8 U# A7 U+ A$ M6 s
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    7 ?- v5 j4 a9 i- K3 M
  6. fwrite($client, json_encode($data)."\n");! q5 y6 V. D* C
  7. // 读取推送结果9 ]$ |; U8 f: ?' a
  8. echo fread($client, 8192);
复制代码

: E. g3 ]4 f0 C7 z# A0 c- m9 P$ Z; v





欢迎光临 cncml手绘网 (http://www.cncml.com/) Powered by Discuz! X3.2