您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10512|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    3 z, w9 i' m7 i. C7 J- T& C! r
  2. require_once __DIR__ . '/Workerman/Autoloader.php';4 Z, B- q4 h& H8 n9 R( @/ U% n; u& O* ~( h# G

  3. ' ?- j- U8 W, w, d0 p5 a- A
  4. $worker = new Worker();
    0 E+ d; r) W4 `- q' F& L' o' Z: v4 U
  5. // 4个进程
    0 `5 v9 c# D3 I2 p  v3 w
  6. $worker->count = 4;9 Y, ~5 p4 a2 U
  7. // 每个进程启动后在当前进程新增一个Worker监听
    % v; D) |% Q; g
  8. $worker->onWorkerStart = function($worker)
    " G1 Y: p% p, ?( w
  9. {
    $ [6 j, B6 u" e
  10.     /**) U8 n; n2 U' j: Y
  11.      * 4个进程启动的时候都创建2016端口的Worker' U' v; g) O2 S  Q
  12.      * 当执行到worker->listen()时会报Address already in use错误
    6 U; n3 e6 H+ {) v1 x
  13.      * 如果worker->count=1则不会报错
    % P# D. p  Y0 Q) A3 q0 [+ }
  14.      */9 j2 l" {/ ~* a( U9 u) n
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');7 ?! @6 V# }3 w: N0 q$ Z% y7 K# d
  16.     $inner_worker->onMessage = 'on_message';
    3 r: q! B; d3 {: D5 n0 o
  17.     // 执行监听。这里会报Address already in use错误! z1 X; k) x  N
  18.     $inner_worker->listen();' Q; y& B6 M2 i) Z# k( K
  19. };
    1 q$ r2 K# I# V* N6 k' X: Y( z( z
  20. $ G* D/ X0 r! W
  21. $worker->onMessage = 'on_message';
      |8 W0 c2 a) t3 f# D! Z

  22. & c" V0 X$ v) M3 s* d/ x
  23. function on_message($connection, $data)8 x2 U$ s6 H0 [: x
  24. {
    7 |1 y. H5 s( ?& P2 O& }
  25.     $connection->send("hello\n");
    + Q- M% c: p9 C' x
  26. }! q. a% ?# V+ _2 G. U
  27. - A; I9 E7 X1 {$ o- X
  28. // 运行worker
    # `+ ^& o0 _4 n* q# _& \" U4 d
  29. Worker::runAll();- {! u6 G& y3 p! l7 |
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    - v+ t) @- ]+ H3 b& k

  31. / P" f! }$ v1 m+ H" H! a/ t' y
  32. use Workerman\Worker;( v$ j% {$ r2 S1 B, v
  33. require_once './Workerman/Autoloader.php';& O$ z: c; Y5 V9 P" f1 u

  34. 0 Z' E' B$ f5 N0 \- F# Q" g, L
  35. $worker = new Worker('text://0.0.0.0:2015');
    " n( u2 a" {7 u1 I
  36. // 4个进程
    ! Y# W; w9 i4 |) F! v7 @
  37. $worker->count = 4;! P6 i7 y9 T# J; t
  38. // 每个进程启动后在当前进程新增一个Worker监听6 F9 @; U* M6 x* I
  39. $worker->onWorkerStart = function($worker)7 P3 ?; M% H7 u' Z3 ~
  40. {* O" d, A2 N$ Y" t
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    7 F4 e4 A, R  A, D" }: t. H
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    9 u) ?8 n* w# N. e* b- X4 v# m: ]
  43.     $inner_worker->reusePort = true;+ y! J) B* U# V7 E/ Y$ k
  44.     $inner_worker->onMessage = 'on_message';
    2 P+ U2 E: ?  F
  45.     // 执行监听。正常监听不会报错0 v- d$ R- T! X- s6 ]2 \. s* y
  46.     $inner_worker->listen();# Y( u- \% Y0 Y
  47. };
    ' c  e: F1 f9 f, {
  48. & \2 t8 C* |  N: T
  49. $worker->onMessage = 'on_message';
    9 ^1 x1 l8 e( W* V

  50. 5 `( z6 d8 m1 q, C- M) o/ f# V- Y
  51. function on_message($connection, $data)
    " l6 q3 b# E, M
  52. {
    ) A2 W0 [; Z- ^6 I' B2 a
  53.     $connection->send("hello\n");" B9 i: ?* c& M' N/ n# L6 Q
  54. }4 S; Z$ Z. R! u0 G" o5 j
  55. + l+ `/ Z& G: J* ]! y) D2 n
  56. // 运行worker
    / b8 w' n) N4 d4 \! R4 u3 f, A# b
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php: F' h; h7 O' G0 X& n0 u8 v
  2. use Workerman\Worker;
    ) i5 k: m" m" c2 P. f* _
  3. require_once './Workerman/Autoloader.php';* F0 X: x) s3 O" U1 F
  4. // 初始化一个worker容器,监听1234端口
    1 M9 j8 y) @* @5 ]# X( f2 T
  5. $worker = new Worker('websocket://0.0.0.0:1234');/ \4 w9 p. a  E
  6. 1 D' f" u, {* Z
  7. /*
    8 z3 U7 W1 T" t9 R+ U
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    8 C; ~9 R, s' m3 \) v% _
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    4 x+ s' [+ X3 J5 k  T, M- S
  10. */
    ! W7 u% I" A8 _& I% o6 D& d/ t. Z
  11. $worker->count = 1;# G' L) a- H0 E. C, W
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口7 C1 t$ G# g4 X/ Z+ v0 p% l
  13. $worker->onWorkerStart = function($worker)
    : t" i1 X# J7 |0 f- e
  14. {3 r2 G$ y8 o0 J1 G8 |, Q
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ! b- N3 K8 Z, _9 K: o
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');) n- P. M/ L" n$ z0 M2 ~1 ?1 D
  17.     $inner_text_worker->onMessage = function($connection, $buffer)& ]4 ^0 f& f5 Y  V
  18.     {# q9 F0 f. s  v! C
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据  h; n4 F# l2 p& y7 r# p- k
  20.         $data = json_decode($buffer, true);
    & Y7 k) C5 I* p9 J
  21.         $uid = $data['uid'];3 O' _* j! @) j* _
  22.         // 通过workerman,向uid的页面推送数据
    ! B% h. ]7 u3 M" R4 G( f) t1 n
  23.         $ret = sendMessageByUid($uid, $buffer);6 x* e. k+ F* t% H# T
  24.         // 返回推送结果
    " e- p4 W4 I; R& W- w. {. R, u
  25.         $connection->send($ret ? 'ok' : 'fail');# R5 |* I4 L8 @9 }) q
  26.     };2 f# ^' t0 @* }- K0 m/ j
  27.     // ## 执行监听 ##- P+ X7 v/ f( p5 Z& K# ?
  28.     $inner_text_worker->listen();. d0 z; p6 ?; c: F! x2 u% U
  29. };
    3 O9 W# G8 E+ h3 i& S
  30. // 新增加一个属性,用来保存uid到connection的映射
    3 b$ y( M$ q, m; d( f' n
  31. $worker->uidConnections = array();& O6 S0 T" c4 ?% R) f
  32. // 当有客户端发来消息时执行的回调函数
    8 f+ y7 z( q- g) {
  33. $worker->onMessage = function($connection, $data)
    ; I0 z3 Z. q: p
  34. {
    6 q; O* Y' F, q( R& u6 I
  35.     global $worker;
    / O1 {% ]0 w, m: q5 E
  36.     // 判断当前客户端是否已经验证,既是否设置了uid* ]( t6 K0 S2 z2 G: H$ ^+ N1 T
  37.     if(!isset($connection->uid))( \( @, n+ C- y# `, U' G; H, e! x
  38.     {+ ~. ~7 U& D5 \1 O+ F
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
      o# s. t7 \- E3 a
  40.        $connection->uid = $data;
    9 z- j5 U' L3 _: J4 N5 g/ x+ C6 c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# g' q6 c: ]/ e# Z0 T% {( M4 T7 |
  42.         * 实现针对特定uid推送数据+ f$ t5 Y5 z6 Q! C, u' d
  43.         */4 `: Z$ v& V, E) P
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ) w; i% \/ D/ O/ |8 P
  45.        return;
      t0 @/ w% X' V5 Y. j
  46.     }
    ' K& s7 ^: U% O5 w2 s' T
  47. };
    * E) B4 o4 `2 E2 o; i, e; A
  48. 8 a. _+ @! [8 X
  49. // 当有客户端连接断开时) \& Y* g( `( C7 ?
  50. $worker->onClose = function($connection)$ {4 m. Z4 |- j& ^
  51. {7 _: g( B( j$ d2 \2 c: a% _
  52.     global $worker;5 o3 Z) u& X$ {3 y
  53.     if(isset($connection->uid))
    / ^2 m1 J, V/ w' D' b, D" F
  54.     {2 A* t  Q& c4 q! U# |$ u
  55.         // 连接断开时删除映射0 {7 e) h0 d* V% W
  56.         unset($worker->uidConnections[$connection->uid]);! C' O2 s* z7 i. E
  57.     }
    ( R" U( b8 v2 J) M
  58. };
    7 g. C1 r9 A; c. V9 E0 n0 V1 J
  59. 9 w/ G8 m$ N; P. K+ j3 x
  60. // 向所有验证的用户推送数据1 p9 w" p( z: y$ T
  61. function broadcast($message)
    & H$ q: Q# W4 \. }% b
  62. {
    2 j2 ?4 x. o' ^4 T: K1 q0 Y$ s
  63.    global $worker;
    3 A7 p( c, R. S( b
  64.    foreach($worker->uidConnections as $connection)
    ; L' M6 h- e) ~
  65.    {( [4 L! H0 t( c! I% e
  66.         $connection->send($message);% X# D% g  L2 Y
  67.    }# u: \! a7 g9 P& h' u4 e
  68. }1 q) x/ ]2 T/ t. N5 d- C

  69. - t7 \/ Q- ~! @- O
  70. // 针对uid推送数据
    / m9 \+ v( {4 M/ K! A. n
  71. function sendMessageByUid($uid, $message)
    $ v! u( u+ v3 I" w% l
  72. {2 e6 t! J0 V5 x
  73.     global $worker;
    . H* z" \6 _% U0 r6 K- R
  74.     if(isset($worker->uidConnections[$uid]))  L5 q- ]& e8 `. v5 M
  75.     {
    % U& X: H8 g0 ~) W1 Y) ?* d7 o
  76.         $connection = $worker->uidConnections[$uid];3 [1 D& C6 }2 i, c! i; o
  77.         $connection->send($message);
    3 a6 Q8 T" |$ o% `! t$ D# x9 b" T
  78.         return true;1 j) z2 n9 g( h" L$ |, k
  79.     }- I# g  H# N3 S" d( {2 j& _
  80.     return false;
    % u1 A& T# A! u) j  l+ {
  81. }
    0 H3 y" n8 `, ]- P# o, V4 Q$ n7 w
  82. - w) }  X6 H' s4 S0 }
  83. // 运行所有的worker+ H' @8 m# u% P. c/ H7 Q- O
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');  [3 [; Z4 P& n4 `: ]
  2. ws.onopen = function(){  p- f7 M- z0 O% @0 f
  3.     var uid = 'uid1';+ d6 D+ {/ Q6 _/ o8 c
  4.     ws.send(uid);& O( s$ I; B# i  B. D3 _! i
  5. };
    1 p7 l# N; i) @
  6. ws.onmessage = function(e){
    % Q1 l, G; }& D" w& M
  7.     alert(e.data);
    . Z; \* m; K: G
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口& W3 ]7 Y+ N2 u4 O: I; a- X" [2 h2 l4 I
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    9 y4 k  m$ U3 I( L+ |3 T
  3. // 推送的数据,包含uid字段,表示是给这个uid推送# _% O- k) [+ o# o# K4 w* t  l
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');$ G2 ]3 j0 `$ p4 p, t0 W3 E
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ! k' p' p" K9 H7 P
  6. fwrite($client, json_encode($data)."\n");
    8 F$ v9 ^" L0 M% z! [, m; ^! E
  7. // 读取推送结果; m7 y' V) Q+ u1 k0 j/ |. f
  8. echo fread($client, 8192);
复制代码
+ \, T+ L' [1 L: e/ @& w

! i( q# p7 k( D/ C  ]
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-18 22:04 , Processed in 0.106360 second(s), 21 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!