cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;9 f/ n+ u6 a% y  n- m7 }
  2. require_once __DIR__ . '/Workerman/Autoloader.php';) s( t9 @  U0 p$ F' Q0 z/ ]  c; F* {

  3. $ W" S4 Z" U+ u" v
  4. $worker = new Worker();
    9 N: w- Y9 U) L: o
  5. // 4个进程
    $ e! _5 h- G1 g, C; u
  6. $worker->count = 4;2 w0 K) O' l% q: A: r) s
  7. // 每个进程启动后在当前进程新增一个Worker监听1 h3 e/ w+ a9 t) q  Y2 }
  8. $worker->onWorkerStart = function($worker)" u/ [1 Q( f3 u7 p! O: ^  E  y9 U4 L
  9. {
    # [/ y7 k. V, m7 O6 V% y
  10.     /**% L" v# X/ \/ d$ i
  11.      * 4个进程启动的时候都创建2016端口的Worker
    0 i6 l7 ]9 o9 f
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ' w2 {# ?" ?# e) ]- d
  13.      * 如果worker->count=1则不会报错
    # o2 x3 \3 u; F& @8 z) A2 j$ ^
  14.      */( q, }9 ]" R) y
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    * S) M8 @0 g3 U; H5 ^2 a
  16.     $inner_worker->onMessage = 'on_message';
    $ x5 o! s  N/ c, @3 |! n0 C% a
  17.     // 执行监听。这里会报Address already in use错误7 D% l  p" U& e! Y1 O& s7 R( U" w
  18.     $inner_worker->listen();
    4 h* {& j5 B% w2 D) Y" g
  19. };
    , m7 M3 U4 L- x0 l; H! m7 v

  20. ( r  S2 D. J& ^6 D) _5 W2 p5 O
  21. $worker->onMessage = 'on_message';
    5 R% b2 {2 r% l9 y% e+ ]

  22. ' Q7 b: C% Y  U. |% i  Q
  23. function on_message($connection, $data)
    5 f3 ~+ J2 i3 ?& l- F& a$ `5 o
  24. {5 d1 M/ k6 m: S2 A
  25.     $connection->send("hello\n");6 n- l4 L0 d- _- g# Z
  26. }
    / Z2 z  F' E7 Q4 t+ t1 Y0 k

  27. 2 B. w8 \4 A" P  ?/ J% u. g( I6 v; Z% ~
  28. // 运行worker
      ^) J! B% S! Z5 R" F: C  _4 f0 w
  29. Worker::runAll();+ N% k( k8 W( H  X
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    : E6 a2 F, l3 z: i+ S1 M$ M0 a! y0 q

  31. ; [4 M. l& g. n
  32. use Workerman\Worker;
    ; W# J3 M1 d) ?; ~3 ~% C% p2 o' K
  33. require_once './Workerman/Autoloader.php';
    3 f: }) I' ~" g$ u: Z" G! R
  34. ) _, z; _% c3 A5 Z, |
  35. $worker = new Worker('text://0.0.0.0:2015');
    ! U. m. Z: \; n( c" C7 Z6 R  I
  36. // 4个进程6 F$ u8 Y! M* d9 f5 a
  37. $worker->count = 4;
    ) d7 a, n, N; w5 _; N0 k- y! ~
  38. // 每个进程启动后在当前进程新增一个Worker监听" j5 S( N! R1 S: W, Z! A  u2 O
  39. $worker->onWorkerStart = function($worker)
    ) `2 Y& g3 H4 g7 a/ i8 t9 y
  40. {
    " k: Q' E8 m; K, G
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');2 I6 r4 Z% u, L$ x2 v6 }0 d
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    1 N5 E, s3 w5 e7 }8 m& p8 Q. C$ s
  43.     $inner_worker->reusePort = true;
    $ H; R2 J" S" {7 K+ j0 P$ F
  44.     $inner_worker->onMessage = 'on_message';
      b5 C4 c4 r3 h" o2 m
  45.     // 执行监听。正常监听不会报错' b7 k5 a. \! G7 H5 E; b1 R
  46.     $inner_worker->listen();
    8 f: p" j, y2 a. J4 Q5 X& a2 f4 ~, x
  47. };0 H- y4 a7 r- x; v! e2 R2 Z1 \

  48. ) f* F+ f( O; C& k- y
  49. $worker->onMessage = 'on_message';8 h, ]3 l2 A2 Z
  50. 8 |. I9 x/ Y2 l7 l
  51. function on_message($connection, $data)
    ( x) N- _4 g$ y4 g; i
  52. {: X+ q( a  C5 ~) O+ x" W- {5 D: Y: X- h
  53.     $connection->send("hello\n");
    ! v! Y9 F* V! Q! H  t9 x
  54. }
    6 A% x: C+ K6 D

  55. 2 K5 P4 G# S' @; J) x. X) {
  56. // 运行worker0 V. `0 w: o* }+ O3 y- G7 J
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php1 n; p' l, C: H$ a1 z8 @, q
  2. use Workerman\Worker;
    0 v; `: I% a6 o. ^8 D3 d
  3. require_once './Workerman/Autoloader.php';
    / \& A: p5 T0 b! U) W
  4. // 初始化一个worker容器,监听1234端口. j; V# {1 W( t' o* I, V
  5. $worker = new Worker('websocket://0.0.0.0:1234');  N4 q) p9 @# X+ m$ p  y
  6. 3 N$ g  |* k4 S0 s" O
  7. /** K$ f: h' A& @' c4 A5 U" M
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    & p( m# K/ F# W
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)" B) u% v3 E! @, E) i
  10. */4 X3 v6 Y- Z! G5 ?/ @( \: i, g
  11. $worker->count = 1;
    7 Q- R5 S/ C5 P  R
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    8 g, q) l+ s: ^, r7 m
  13. $worker->onWorkerStart = function($worker)2 G+ u# {2 ^$ @6 \# v' a5 Q/ ~! Y
  14. {  u! K. p' Z$ F  s" `3 d' z
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符' c$ H* n: d/ a! ]4 H- n3 u6 m
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');9 K4 l8 B8 O" w& u
  17.     $inner_text_worker->onMessage = function($connection, $buffer)! J" o4 N8 T9 P2 T  @: E
  18.     {
    . Z% z" n" ]3 n  O; Y
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据7 Z* _9 X' P0 j: v3 k
  20.         $data = json_decode($buffer, true);
    5 O# }7 s0 l  \" b- N: h7 D% G
  21.         $uid = $data['uid'];
    & x, u+ ]3 E! i# k" _
  22.         // 通过workerman,向uid的页面推送数据. [, Y6 }5 r* X. K* }4 g3 d: s
  23.         $ret = sendMessageByUid($uid, $buffer);
    ( ]/ d5 b( N5 H
  24.         // 返回推送结果
    ) x* A# H8 I+ J2 J5 T
  25.         $connection->send($ret ? 'ok' : 'fail');
    + u4 _3 q4 e9 r8 I3 i
  26.     };4 ?# ~( [  d7 `0 `7 l
  27.     // ## 执行监听 ##+ x/ _3 Q2 Q: \: E9 I
  28.     $inner_text_worker->listen();% b2 [* l% {* T. g" B. k1 X
  29. };* k  \/ L. N. D$ e& K. f
  30. // 新增加一个属性,用来保存uid到connection的映射/ O2 q2 C' e. J7 M9 q- y8 V
  31. $worker->uidConnections = array();0 Q' P. T( k! Z% _3 X; q2 b
  32. // 当有客户端发来消息时执行的回调函数
    - u7 x% q2 m" H2 c
  33. $worker->onMessage = function($connection, $data)  L# B; A# z* e
  34. {9 L& b! o* M+ D0 ^# R/ O0 D* ?
  35.     global $worker;( h5 o* O4 U4 B( d" s% u! S) X# F
  36.     // 判断当前客户端是否已经验证,既是否设置了uid% ?9 Y" m4 I4 R3 E/ \) i, {& h
  37.     if(!isset($connection->uid))% y# }8 z5 b0 M" G
  38.     {) t# Y& J2 k2 y1 ?) c# B. f
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    * D8 n1 m7 S8 u- ]6 O/ G" w
  40.        $connection->uid = $data;
    + O6 ^. p! e9 S3 T9 E+ r0 {
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,0 k* h% C, ?+ s
  42.         * 实现针对特定uid推送数据6 Q1 N! E) O; l0 D! u
  43.         */
    2 ]0 d' s9 u- A" c- I1 p
  44.        $worker->uidConnections[$connection->uid] = $connection;
    5 ?, P5 u$ S7 m% H; K  W2 m$ }2 Q* C# X8 ~
  45.        return;5 l  Z# K8 ~+ S$ v+ u: Y6 O3 _& h
  46.     }6 v. E3 L' ^! e, g
  47. };9 d; o4 o8 [) ~( ]
  48. 6 D+ i! M3 b! {( H2 ]
  49. // 当有客户端连接断开时: m5 S/ V' R  o' I9 G5 d* ]
  50. $worker->onClose = function($connection)+ D& G1 k( J: B% f) D# j
  51. {
    # F# b# Z- B4 {- J6 u6 Q4 g
  52.     global $worker;
    * c7 K9 Z, f: K) [) r( ~
  53.     if(isset($connection->uid)). X! \* v4 h' s: @
  54.     {
    + T% P' C. ?7 o7 k9 F, {
  55.         // 连接断开时删除映射: p2 x* m8 R0 ]- @
  56.         unset($worker->uidConnections[$connection->uid]);, U- w( ?: m; y( O2 P
  57.     }; a2 L, S2 G4 H
  58. };
    # g- h( T9 _7 |8 F
  59. 7 @3 J# k& \# s$ n
  60. // 向所有验证的用户推送数据: t* K* }0 ~3 R% {6 u
  61. function broadcast($message)
    9 @& _  A5 P% S# h* \
  62. {; `4 t! w6 ~3 Q+ _0 [/ A
  63.    global $worker;' ]- [: _: p& p
  64.    foreach($worker->uidConnections as $connection)" p% C$ i2 [) O$ u
  65.    {
    8 [* @: P% Q* z. g
  66.         $connection->send($message);" S  p  o! \5 e
  67.    }2 K' Z: c. J5 J$ t
  68. }
    " P3 v2 l  }( I, `% T- u
  69. & S) k# M# E; f7 J
  70. // 针对uid推送数据6 t7 W3 `5 t1 A. R
  71. function sendMessageByUid($uid, $message)
    9 h. W$ l* A% D2 U0 o
  72. {6 b- N5 B: v9 F; S' l) f% }+ ?
  73.     global $worker;
    1 H/ l: g* p3 m/ ]
  74.     if(isset($worker->uidConnections[$uid]))
    4 q# t5 B4 Z) k7 ]  j6 L
  75.     {/ m  M; c. @, e% r8 S- E8 y
  76.         $connection = $worker->uidConnections[$uid];- e* L* K+ ~! G$ E3 L1 ^3 {- v
  77.         $connection->send($message);6 \6 `8 y4 b2 K$ F% n
  78.         return true;& r1 o% ^- I' I$ G  \
  79.     }
    8 y( G# g4 W7 x8 G8 W  ?( U: w* Z
  80.     return false;
    0 j9 O7 ~. E4 M1 r+ y
  81. }
    6 {% r* W' ]8 C

  82. # U. L: q6 e; H5 k, T
  83. // 运行所有的worker
    ( g% b9 U* [# `9 N
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
      L$ d' t0 _; J
  2. ws.onopen = function(){
    , z8 S5 d' Z/ X* T
  3.     var uid = 'uid1';
    : n+ t, f7 U" x. C1 y- R4 R
  4.     ws.send(uid);
    & q. r' c( n( ~. a% i
  5. };
    2 m9 q( B' J% [! s* B, U( _
  6. ws.onmessage = function(e){3 h0 ?' z/ A0 J8 m
  7.     alert(e.data);5 x9 f! ~- J' q. g0 g7 `( I
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
      t2 v5 i& V) M+ s. H! _  G
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);7 I2 s! K" S2 K! A7 [) X
  3. // 推送的数据,包含uid字段,表示是给这个uid推送$ N) z/ Z# g9 T9 D4 ?& K* I7 J  m0 K
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');8 ]. O) i" X2 x+ z7 z
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 d+ J. {7 G2 ]' b: m
  6. fwrite($client, json_encode($data)."\n");
    - n9 R* l# q4 t. J4 `
  7. // 读取推送结果. m' L- V4 B, ?' s
  8. echo fread($client, 8192);
复制代码

1 o! D9 c0 P" r% Y
9 V: t7 H% t8 B* A% [




欢迎光临 cncml手绘网 (http://www.cncml.com/) Powered by Discuz! X3.2