您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15223|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    7 {  C( N4 q6 q$ M. `
  2. require_once __DIR__ . '/Workerman/Autoloader.php';5 V" G3 V7 h/ _! B' z! X/ ^
  3. . c, x- X9 I' n) c1 \
  4. $worker = new Worker();; k' Z3 I; |# j5 @1 I
  5. // 4个进程! h9 o* g( n7 r( k/ x- e
  6. $worker->count = 4;
    1 B5 {% F4 w0 L9 z
  7. // 每个进程启动后在当前进程新增一个Worker监听( l5 y  F* j" G" R' I" |
  8. $worker->onWorkerStart = function($worker)
    4 s. T; b: z$ w8 B/ R
  9. {" K* A( y+ O. I( M8 S  b' k3 |
  10.     /**
    ! b* h7 N9 d% }7 x1 f' x
  11.      * 4个进程启动的时候都创建2016端口的Worker8 w" F* \+ H. I/ y0 w
  12.      * 当执行到worker->listen()时会报Address already in use错误+ Q5 R/ T- W8 q3 P
  13.      * 如果worker->count=1则不会报错
    , b; a/ F6 w8 v* l, A( x3 T6 g
  14.      */
    " y0 v3 H( K) p2 e2 x& M! b0 f
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');2 o3 K# p; c0 w, J9 r
  16.     $inner_worker->onMessage = 'on_message';' ^1 n. c  o  O7 h/ Z2 T& l) g, A$ c
  17.     // 执行监听。这里会报Address already in use错误
    4 d' ?' B% R* G4 h1 q
  18.     $inner_worker->listen();0 q4 m4 q& D; s7 c6 f
  19. };% S6 P# g. l9 W, Q

  20. 0 J' p" F# G, i& ^' j( `
  21. $worker->onMessage = 'on_message';
    5 D( J" H$ ~, T8 |
  22. ' \; _# O8 B2 a/ ^; m. W
  23. function on_message($connection, $data)
    . ~. w9 L7 `9 O0 d
  24. {
    4 E) ]: G/ ?% a. n, ?
  25.     $connection->send("hello\n");. Z/ A7 m6 O, Y/ T  i
  26. }- z, O, [8 X) P  R2 Z) ~

  27. % V8 E2 N. N/ z5 ~- w+ k
  28. // 运行worker
    # A7 t3 Y( I2 j" B3 e
  29. Worker::runAll();) u6 Q; _( p  y& d' s
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    1 y4 W4 v2 _. f" g% P, i, J

  31. # H9 t  U/ q, z- I, X6 S
  32. use Workerman\Worker;
    ; w0 a9 M6 i, j8 \  |
  33. require_once './Workerman/Autoloader.php';
    4 r  \; M# w9 ?, G' Q, p
  34. " ?& V: f% _+ P  z
  35. $worker = new Worker('text://0.0.0.0:2015');
    9 B+ f6 _) N& U5 Q. @9 B
  36. // 4个进程
    " l. K6 y7 t6 h6 k& T5 k: W
  37. $worker->count = 4;) j+ {6 n3 ~2 K& h. B
  38. // 每个进程启动后在当前进程新增一个Worker监听
    8 r0 b$ ?" S3 j' ^" w, h2 p
  39. $worker->onWorkerStart = function($worker)
    / g3 m3 J. h& a8 J; ?
  40. {
    0 g& ?" U* A9 {$ }5 a/ Z
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    5 e9 E' x8 U- U! r7 u2 ]
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    4 O+ j0 o8 p1 Z6 P: J! O
  43.     $inner_worker->reusePort = true;
    * p' K; G1 w' y( t
  44.     $inner_worker->onMessage = 'on_message';* p4 G9 C; o7 |$ B3 _! F3 l
  45.     // 执行监听。正常监听不会报错, |% ?1 m, J  c3 p( X7 a6 i$ C1 m8 x
  46.     $inner_worker->listen();- p. R  v4 h& a. P: u: z9 R8 ]
  47. };$ V  M2 Q% {0 p" T9 ~* X: `0 l

  48. 8 u. B+ w- F7 s  h0 V: v2 L( G$ a
  49. $worker->onMessage = 'on_message';0 F. E+ y! e# t/ f* r# H

  50. ( c6 Q- D" T6 D3 Y! r% t
  51. function on_message($connection, $data)& x8 m  U) Q0 C2 c. K
  52. {
    # P8 M3 L9 R6 r2 e% J+ Q  s$ j/ @; y
  53.     $connection->send("hello\n");- B3 C+ n, y" x9 ?
  54. }8 o# u) G3 v1 y& }
  55. + ~+ U6 E6 x) p$ y  }: K
  56. // 运行worker7 Q: I" T9 \7 a6 }  U& Z
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    * [# z, ^/ @/ _: q8 V! ~  ^* P
  2. use Workerman\Worker;7 \7 }: }8 p5 |$ q' G/ S
  3. require_once './Workerman/Autoloader.php';
    ' l5 o; J/ X$ }0 g+ l; r6 |
  4. // 初始化一个worker容器,监听1234端口
    9 q! J) p) E+ ]7 w
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    * d; M! a& j" l

  6. 4 m: q- ~( \" X$ M0 x( p
  7. /*; l. Q& F6 B4 {% i6 m: ?
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    - k9 [- ^5 E( i- `! _
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    4 h3 U9 I' l0 p" a+ o
  10. */  p: e# }# J. j: A  b
  11. $worker->count = 1;
    ( s  `4 u) c* U% o, c. y0 @: z6 v
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    $ S4 h5 ?0 i. x$ @- ~! {
  13. $worker->onWorkerStart = function($worker). _9 m& j) S$ {4 g$ w' @/ A4 n
  14. {$ w+ O( n5 ]$ d
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    " v& q6 X4 N. Y* K+ @, [9 w3 P
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    6 n4 l1 D1 H2 n$ ?- Q6 i
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    % W5 O/ @3 \# g: W6 T% K, t5 t- Y
  18.     {
      d" E1 _3 r/ S  E8 A
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    & V/ K0 L2 R! o+ x0 |: L
  20.         $data = json_decode($buffer, true);' K* ~7 C3 J- D  `  _9 y
  21.         $uid = $data['uid'];+ i. |5 _. s+ j0 |
  22.         // 通过workerman,向uid的页面推送数据* _$ C1 N# Y9 W9 o
  23.         $ret = sendMessageByUid($uid, $buffer);5 m' o8 c1 D5 a
  24.         // 返回推送结果
    2 G! Q! R0 g- P' e% v! ~
  25.         $connection->send($ret ? 'ok' : 'fail');: N9 c' S  v! ^. q
  26.     };
    ; |' o5 o: r( n/ H5 M0 u3 |
  27.     // ## 执行监听 ##
    0 a( L+ ?1 p2 t9 Q- {9 ]
  28.     $inner_text_worker->listen();9 K/ y9 X. J3 u( F. h% X
  29. };& {+ b" m& x0 s. {9 F
  30. // 新增加一个属性,用来保存uid到connection的映射
    " u+ G  e4 s5 b$ d3 S
  31. $worker->uidConnections = array();+ o* n. ]$ [* i% P- p) n3 j) `$ @- o
  32. // 当有客户端发来消息时执行的回调函数
    + }7 \5 D3 _1 O* p* y
  33. $worker->onMessage = function($connection, $data)
    3 V. Y2 R! z+ A% r: j. V0 d
  34. {5 N3 s8 [8 o) P2 M1 W; {% Z: D! e) X
  35.     global $worker;
      |8 Q' m$ |( h# R# |9 i
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    ) \' j/ {+ p0 b. G; R. {
  37.     if(!isset($connection->uid)), F  a8 U' ?: |$ ?
  38.     {& ~1 Z; {, i9 \
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证): [+ j8 i8 d% \) V2 P" Y
  40.        $connection->uid = $data;# s  M, x' E# y* d* q7 ~
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,9 G' v. j. n3 }- I; F5 L& ]" j; o& u0 u
  42.         * 实现针对特定uid推送数据% i6 D. q# I7 _/ ]% z
  43.         */1 o4 N3 H  E) l! U
  44.        $worker->uidConnections[$connection->uid] = $connection;$ E; B' @3 a. _2 z* T
  45.        return;
    " z- {( |, H) [7 z
  46.     }4 ?* Q3 s. W* `) a2 {
  47. };9 Q! ]+ V$ o/ |: q: U0 K' F4 F  @
  48. $ b$ ]' d; R" h- D1 W) c
  49. // 当有客户端连接断开时. e; W9 o3 P2 f/ E
  50. $worker->onClose = function($connection)
    7 W, {0 W# G& _; ]
  51. {
    4 n& T: P8 w! `7 c" L0 E
  52.     global $worker;+ Z+ @) u% p' T% x: f
  53.     if(isset($connection->uid))# G$ e# o+ a0 f, r1 }( p7 }1 z
  54.     {
    ' B2 I, P" P2 k6 ^/ U
  55.         // 连接断开时删除映射
    1 G3 X4 s' e7 X/ O7 @* @7 F2 @! w  n, \5 c/ Z
  56.         unset($worker->uidConnections[$connection->uid]);
    1 b/ {  J2 q3 G4 |, x
  57.     }  ^% Q1 h0 O& y2 D- L
  58. };
    9 G5 }% B! i4 p7 t

  59. 2 L' W% A$ L/ `: `5 O2 `
  60. // 向所有验证的用户推送数据
    * K0 A% F0 a" m9 E* y
  61. function broadcast($message)
    3 m, ~/ S6 T- [) s# ~
  62. {3 d! D8 X9 l# a1 d2 Y
  63.    global $worker;9 g, }2 s4 Y# e9 y* b5 h2 s
  64.    foreach($worker->uidConnections as $connection)1 ^5 x- \. i$ W  m7 l$ x9 V
  65.    {' Z9 _8 z0 f) \2 S
  66.         $connection->send($message);
    + D5 Y1 ?  I, y$ R; }
  67.    }
    . u3 q' V. @& T( B7 \& p* n2 N
  68. }* r- f6 y; s- s  ?6 I

  69. ; D' l; w( Q4 e. I- p0 w  K  Q
  70. // 针对uid推送数据
    ) h  ~- @* r5 i; O2 _
  71. function sendMessageByUid($uid, $message)
    " s7 U9 r# `; s$ T  h; E
  72. {
    / b. _' o% P7 m- y  H5 }( `9 [
  73.     global $worker;
    1 |; S5 W& O/ N5 S
  74.     if(isset($worker->uidConnections[$uid]))
    " v9 K/ _5 l; j) E- O! j4 w5 Y
  75.     {, |7 D7 B* m. {' T+ N7 m
  76.         $connection = $worker->uidConnections[$uid];
    ; m9 O9 @& X  E5 O; W$ y
  77.         $connection->send($message);1 e7 J6 U/ X) Q3 ~& L  @
  78.         return true;
      U& f9 E9 s. t- `( X
  79.     }, [& Z1 @+ C- h! B
  80.     return false;
    ! k) g& _. N; g# ~; R5 ^7 u
  81. }
    5 L0 ]; \+ m/ U' d
  82. 5 R( m# D6 R5 n9 u' `
  83. // 运行所有的worker
    7 B; K) _, e$ a4 j$ a9 v( \- E
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');! w. f7 F/ }4 R
  2. ws.onopen = function(){
      B4 i1 T0 l" A5 s0 R
  3.     var uid = 'uid1';4 r: d/ P2 X8 X" W  n
  4.     ws.send(uid);" H" @8 S% c( M5 L# D
  5. };
    3 z+ v! n! U) [1 a7 y/ {) p& ]5 T0 B
  6. ws.onmessage = function(e){8 m6 u' X2 }+ ], r4 Y  ~& X
  7.     alert(e.data);9 W' ?# Q& I3 ~
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    ( P  ?# x. r) B" o! r% k" o
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 k# H4 N4 X. U' W, [+ Y
  3. // 推送的数据,包含uid字段,表示是给这个uid推送$ P4 Q8 C4 j; B' _! V
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    % Z2 K5 g+ s% I5 I+ `3 C
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    2 D' H% \/ h' R
  6. fwrite($client, json_encode($data)."\n");% F9 l' j4 ~, v  P1 J
  7. // 读取推送结果! }" @: S  V0 R/ G5 j
  8. echo fread($client, 8192);
复制代码

8 `8 \' d% u# h  b0 P
' N- l5 G/ a2 C* m  H1 O
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-19 22:38 , Processed in 0.057714 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!