cncml手绘网

标题: 用于实例化Worker后执行监听 [打印本页]

作者: admin    时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;% q8 K/ ~, U( M& A; ]$ j) n
  2. require_once __DIR__ . '/Workerman/Autoloader.php';* y9 H, J8 n2 V* R' W7 v5 X6 U$ f
  3. * G5 v4 w; e6 j# O' U8 V) ]
  4. $worker = new Worker();- |8 G' S* l7 ]4 b( c$ M% p* Z, I
  5. // 4个进程
    & S- u3 x+ w- w& B4 ?* m
  6. $worker->count = 4;2 n2 y5 S4 _  N" f( r
  7. // 每个进程启动后在当前进程新增一个Worker监听4 A" N5 b* ]( e3 t+ [4 f8 p& K
  8. $worker->onWorkerStart = function($worker)
    . D4 n' c, H6 E  ?2 }
  9. {9 q2 e$ ]2 P# M  a
  10.     /**: {( z) {) r: ]" a) `0 i0 {/ E$ Z
  11.      * 4个进程启动的时候都创建2016端口的Worker
    . X5 c) F/ g5 L! s
  12.      * 当执行到worker->listen()时会报Address already in use错误
    . _1 d2 l' x* g8 L  Y" \3 T7 F
  13.      * 如果worker->count=1则不会报错
    + L! W) ^4 n9 V$ I2 y9 O8 x
  14.      */
    . J+ U% t. n! t+ h
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    5 {# r9 `4 Q3 j  t# W  H0 o8 `
  16.     $inner_worker->onMessage = 'on_message';' {) @6 Y& O( @( R' r5 r
  17.     // 执行监听。这里会报Address already in use错误. \$ h1 S  Z6 j) \. _4 ~4 S+ E
  18.     $inner_worker->listen();
    8 d( e/ ^  d9 D: K. H  w) }
  19. };8 U5 d# T( q: u5 \3 m0 Z. n1 H

  20. ( N& s5 E1 [. U: y
  21. $worker->onMessage = 'on_message';
    7 t+ v/ @8 i9 O# ?+ H9 [, Z# G
  22. : x' [  [1 j% m1 X, D6 [
  23. function on_message($connection, $data)
    . t/ o# g0 `1 {3 Q
  24. {
    ) B" [, C- l6 u9 g+ {  O1 ?
  25.     $connection->send("hello\n");
    - Y5 I4 m( S) z
  26. }, K* V: Y2 }& |% q

  27. 9 }0 n) B  l4 E0 w4 r
  28. // 运行worker& B8 L; B+ O' m
  29. Worker::runAll();" j2 L" p7 y7 d
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    8 j& B, n9 c$ H' T4 i
  31. & A$ M$ Y+ M+ ]
  32. use Workerman\Worker;, F3 R5 L* P% B' {- F$ P. N
  33. require_once './Workerman/Autoloader.php';
    , E  v, u4 J6 A# s! c

  34. 7 x2 O8 {6 a+ `# r* k0 O
  35. $worker = new Worker('text://0.0.0.0:2015');$ p6 q! e% H7 O) c
  36. // 4个进程
    $ Q. u3 q  t: w
  37. $worker->count = 4;
    ; v' O- h+ ?3 z- M$ V
  38. // 每个进程启动后在当前进程新增一个Worker监听
    + x' E8 S# [, W
  39. $worker->onWorkerStart = function($worker)5 Y; m9 c1 w0 ?9 _# v  }
  40. {
    * J: s# `/ y4 u3 G
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    # `: A5 x7 W8 x
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    3 A: q; H8 T4 b; v. f3 ?+ f
  43.     $inner_worker->reusePort = true;
    5 ^) m$ y( Y5 Y- l' F
  44.     $inner_worker->onMessage = 'on_message';( W: u7 X( m! p. L
  45.     // 执行监听。正常监听不会报错* |/ o4 m, r! Y( L
  46.     $inner_worker->listen();
    ' N7 J/ d8 Z; [& L; ~* q, \( a
  47. };
    * R# O3 u' @& D- s' w

  48. 7 I' w7 f: u4 O
  49. $worker->onMessage = 'on_message';8 M* ^; t: E5 P
  50. $ z  P$ z4 S/ D* O" @( ^
  51. function on_message($connection, $data)
    3 }0 O, D$ D3 }' v
  52. {6 Q: `& c! C/ i, o& L
  53.     $connection->send("hello\n");4 p( d9 y# D; }+ a: e9 M8 E
  54. }
    * d) I1 G9 p7 B$ f

  55. ' Y' }; S# g. K! z
  56. // 运行worker
    % A% @  T- M  E: s( Y# n& r3 h
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    / \. t% K# G8 `6 B8 q
  2. use Workerman\Worker;% C8 ~! P: Y% Y% n4 F$ i
  3. require_once './Workerman/Autoloader.php';
    6 O5 n8 W. U# Y( B9 i6 X$ E( k# A
  4. // 初始化一个worker容器,监听1234端口1 |, w* T! f" a2 }  Z0 |
  5. $worker = new Worker('websocket://0.0.0.0:1234');* x/ v' |9 [, I" I0 i
  6. 5 c) U0 L$ C" j5 S& [  f0 O- ~7 D: Y
  7. /*
    . y, L7 r8 }0 y$ D  J  Z# _
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    1 ~6 Q; I! e; C$ G
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    * D% [; p- B" l. o$ t) T& v
  10. */
    $ ]3 R; h0 l4 T- ]
  11. $worker->count = 1;" j! ?4 q, i* Y
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    9 a; v: `5 T! \/ h) w/ w$ Y1 x
  13. $worker->onWorkerStart = function($worker)7 _- y% w% e$ E3 f! \: V. D
  14. {
    1 q6 b6 Z; f# X& V) I$ _
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    9 O# Q* e: X, S+ O  V3 Q
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');: V& p# c4 y+ e/ B+ B; f/ I
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    2 j- D' z' [8 z0 `! e
  18.     {
    , ~" n  [) U' H- J+ L1 F- \
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    , Y. Z& x! c, s& [2 b& j
  20.         $data = json_decode($buffer, true);
    4 d8 e- ?% A( j/ w0 Z+ V* [
  21.         $uid = $data['uid'];0 n4 Z7 ]' J( J- R1 @
  22.         // 通过workerman,向uid的页面推送数据/ T1 G6 K" W2 A9 k6 D% ~  {2 _1 g5 B
  23.         $ret = sendMessageByUid($uid, $buffer);; q; u! u4 j  h( z$ Z
  24.         // 返回推送结果: _" @6 n# K) b* d1 ~
  25.         $connection->send($ret ? 'ok' : 'fail');# I& ?" N8 i0 K; X
  26.     };
    4 `4 m2 L0 u# H. h, f
  27.     // ## 执行监听 ##
    % I/ k6 U+ N; k/ G
  28.     $inner_text_worker->listen();
    7 S) x% |: w( Y" X5 Q* g
  29. };2 t! W4 Q% k5 o$ E; _9 ]3 n
  30. // 新增加一个属性,用来保存uid到connection的映射4 ~2 b# h" d+ C3 q+ \
  31. $worker->uidConnections = array();
    & T0 \2 f0 }6 T: A% x  r
  32. // 当有客户端发来消息时执行的回调函数
    ) L. B  U; O# j
  33. $worker->onMessage = function($connection, $data)' w# n7 L1 B- D) r: A
  34. {
    # Z: _9 H. V6 E, @! j
  35.     global $worker;# p2 I( C0 ~( U# F% |" O
  36.     // 判断当前客户端是否已经验证,既是否设置了uid3 [3 V7 e% ]1 s6 _
  37.     if(!isset($connection->uid))5 c. c; [; i. G
  38.     {
      F; h& f0 l$ |& S
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证). X& E2 s7 U* y
  40.        $connection->uid = $data;) a% F9 G* ?' [/ ~! D# x
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
      g. J1 }* R' Z# s+ V9 n8 ~0 l
  42.         * 实现针对特定uid推送数据, s/ o: |$ v6 q6 B4 I1 }
  43.         */6 r* q/ h, W$ W9 j1 @- e5 s
  44.        $worker->uidConnections[$connection->uid] = $connection;, n. w: h, I+ |( @4 F0 N$ ]2 p
  45.        return;
    ( X- O6 c% k4 `1 [, |, K
  46.     }: i' R1 ?5 S. Z0 [7 w+ n7 U
  47. };
      w( u' j: R8 T4 v0 Z* F

  48.   k, y7 Q6 D' m! Q
  49. // 当有客户端连接断开时
    ) @- i% Z4 g+ C4 l% w* l" v
  50. $worker->onClose = function($connection)
    - Q4 ?6 |! L' b2 [7 Z+ ]( ~( ]. p0 \
  51. {
    ' U& O$ S% u9 x( d# S4 d
  52.     global $worker;0 F$ ~4 N6 e( D3 `
  53.     if(isset($connection->uid))
    2 P) S1 U7 N$ ]7 |' [8 k  N
  54.     {
    / d. d6 A! K8 T! ^( T5 }5 `" {6 o
  55.         // 连接断开时删除映射
    , x& x4 w- B0 @( I* j+ F
  56.         unset($worker->uidConnections[$connection->uid]);
    ) B' l. _. ~7 Q' R$ d
  57.     }) A2 |- A9 ^. b* W, e# w
  58. };( O" r+ v  g/ l. ^2 C6 j' Y
  59. 1 }% C8 v8 o- c
  60. // 向所有验证的用户推送数据
    / E( Y& F: ?* g9 r
  61. function broadcast($message)
    ( u' Z9 M1 s  c5 I0 S8 H6 [* S
  62. {# B& m* b$ E+ j5 v* B  T7 @* z8 b
  63.    global $worker;1 _, q# c( u* c/ S: R( P# }
  64.    foreach($worker->uidConnections as $connection)3 P/ u( \: @; d
  65.    {" F# ~3 {/ ~( m! c. z) W) \
  66.         $connection->send($message);$ X/ F9 J4 F% J! ?1 V. K6 K& D
  67.    }
    1 y2 l: b3 Z) f2 c
  68. }- X* g4 l: `6 g
  69. 7 w( ]8 w! P5 ]
  70. // 针对uid推送数据
    1 l6 m( H7 b! {
  71. function sendMessageByUid($uid, $message)# g& E$ j* J4 r6 L
  72. {
    - Z" C( n; w  K; E4 A
  73.     global $worker;
    4 j! C; v+ [6 A+ X% ]
  74.     if(isset($worker->uidConnections[$uid]))# Z, F" c2 [( D0 @( w8 F( ?
  75.     {5 B. q  S0 {! x3 u3 _" V: V9 H. h
  76.         $connection = $worker->uidConnections[$uid];
    * X, b8 E* k4 C9 l, y( d
  77.         $connection->send($message);" R- P: {$ r% a7 N9 M3 u3 G
  78.         return true;
    ! g. M4 O  }% u+ W0 y
  79.     }5 C. c: f  F9 Y; x
  80.     return false;
    5 t2 D, A: T) V
  81. }% r6 b6 M8 Q+ }0 C
  82. 1 W* }8 q# ~5 q) p: M; @
  83. // 运行所有的worker
    , `' k- L. W2 b& P5 N
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ( X/ A  I4 P( O3 x5 q1 m
  2. ws.onopen = function(){
    ( q2 g; s. [( P2 _
  3.     var uid = 'uid1';
    0 o# _4 m$ u$ r& W0 M& f) k
  4.     ws.send(uid);
    $ A5 t& W( L6 d0 ^6 i% g
  5. };. m" {; r+ N; |  a5 I( [9 @
  6. ws.onmessage = function(e){
    ( y, q6 J! ?! j1 L! w
  7.     alert(e.data);" k+ Q# o' j' y. ?, W
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口( Q+ W- B" [% ?/ O- L
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    $ _  J" H$ s/ F! K# ]
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    7 M& f) ~3 Y: b: R; G3 F
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');2 L4 B: o; D% `4 _$ `" e
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% H+ M( f( j2 c
  6. fwrite($client, json_encode($data)."\n");
    " B& I5 A8 B" q1 p
  7. // 读取推送结果
    4 o9 _/ b  i" {" I3 r* \, F: G
  8. echo fread($client, 8192);
复制代码

1 w1 @- i2 w( P% y
( N5 }/ Q6 b; {1 t: x




欢迎光临 cncml手绘网 (http://www.cncml.com/) Powered by Discuz! X3.2