您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14806|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    - F: ^+ h* \& R. n
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    6 |) L" z  j: ^; D2 G" v6 c
  3. ( a7 |2 Z: u2 X0 z3 b8 N
  4. $worker = new Worker();6 F0 A& a# B; s  Q. V
  5. // 4个进程. }- z& y4 F, b" W  ]1 L2 @
  6. $worker->count = 4;/ ^" ^: W# M% B" Q- c
  7. // 每个进程启动后在当前进程新增一个Worker监听4 ]: P+ D& |& C( @7 Y/ M
  8. $worker->onWorkerStart = function($worker)
    4 N4 b; G9 T; N, L. l, P
  9. {' b/ E% y: {" V9 V2 D2 ]
  10.     /**
    , |' ^# d7 T/ l0 B  O
  11.      * 4个进程启动的时候都创建2016端口的Worker0 o8 B7 Q  H! Z2 `5 |2 W
  12.      * 当执行到worker->listen()时会报Address already in use错误
    . v/ {7 _" W" J; Q- t" G
  13.      * 如果worker->count=1则不会报错
    1 S9 N% M' }# \8 l
  14.      */6 F2 p% _5 o- H6 f9 P* Q5 K: ]
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');3 F6 |  J# r3 p9 `! ~! y
  16.     $inner_worker->onMessage = 'on_message';' M' {. a) q2 H$ M$ b
  17.     // 执行监听。这里会报Address already in use错误9 W. Z9 {4 a% `; `9 {/ B$ @. c
  18.     $inner_worker->listen();# d  p* L  e9 s1 H" u+ G
  19. };' T2 U, O- g+ ?" H2 G  u! C  I
  20. " r" _% u9 K3 L4 C
  21. $worker->onMessage = 'on_message';
    % c( J- Y/ F) ^$ H) g( S+ E9 N
  22. ) Q( Y5 |6 l" x9 ^
  23. function on_message($connection, $data)2 l6 ]: a* R* N# w- {2 q4 C. A- D
  24. {
    7 t# m/ q5 }8 O, f+ R4 w4 {
  25.     $connection->send("hello\n");
    3 S" x. R: [* [  A- B
  26. }. X1 b' Q$ c- Z3 A1 u3 R: J- _7 Z
  27. - e4 t  G& U' P7 F! @/ L7 x
  28. // 运行worker
    & L) C) I8 V8 `# w4 k
  29. Worker::runAll();
    7 M! Y- q2 P- v; H( @; X
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    6 i, n3 J) K  M

  31. 2 Y' r  a0 |% Z( a
  32. use Workerman\Worker;5 j' }- V5 `, f. F, `
  33. require_once './Workerman/Autoloader.php';
    3 D6 _* I  r6 s! ^

  34. , K: V5 |$ ~4 u% Z) l
  35. $worker = new Worker('text://0.0.0.0:2015');
    5 F$ {# {9 B* Z. r# a
  36. // 4个进程
    # b  Y" u3 _! ~- K
  37. $worker->count = 4;
    + X( M0 x; ^) {5 j2 c6 T" ?. m
  38. // 每个进程启动后在当前进程新增一个Worker监听
    % G; u9 V" N4 B: L4 h# b
  39. $worker->onWorkerStart = function($worker)$ V& e' G  [' V
  40. {
    4 @7 H# x( s% ~! c" `
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');4 E. P+ H$ ~- i6 f# L
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    " O' [! O6 V$ _8 o  M) r# t9 M
  43.     $inner_worker->reusePort = true;/ w  w0 O/ Q2 u# o! f
  44.     $inner_worker->onMessage = 'on_message';
    9 y3 h. Q. @( P9 G; H  [
  45.     // 执行监听。正常监听不会报错( k! f  w& P$ f+ R4 Z
  46.     $inner_worker->listen();( z' P  H4 L; n; m# d
  47. };
    ( C3 g! K6 D, I% o+ c$ n
  48. & `; v2 A0 u  E+ m
  49. $worker->onMessage = 'on_message';
    3 O9 l5 X  [. d2 e# u7 \

  50. 8 k5 j, O- @" {: g0 I
  51. function on_message($connection, $data)
    2 v/ q0 e8 h! }, z
  52. {
      D. E4 D- |4 ~
  53.     $connection->send("hello\n");
    8 y% p6 z8 a' P7 @: I
  54. }
    8 g8 Q) ^! e3 i6 B8 C8 L
  55. $ n, h9 X9 F# ]3 }4 T" |! l% d
  56. // 运行worker
    . ?& Y9 D2 X6 y/ g
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php/ ~1 {* |8 b- Q0 u
  2. use Workerman\Worker;: D! |6 s& g2 g, x7 t+ U
  3. require_once './Workerman/Autoloader.php';
    & J0 P% ^$ \% {; b0 x) B9 b6 t
  4. // 初始化一个worker容器,监听1234端口, Y* z3 w; C' Z2 L0 k+ D
  5. $worker = new Worker('websocket://0.0.0.0:1234');/ W; D0 J* y8 x% A: A) K) p$ ~) ^

  6. , j& n3 O" F- B9 m, o6 Z. O3 h
  7. /*
    & G; s# B4 z  n; r' O+ Z0 @
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    & ^6 q* ?& V  d6 R
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    * S$ f. U* D5 m0 E3 H
  10. */
    1 g# K2 x; |9 z3 u  b) D
  11. $worker->count = 1;* h: ?, D8 D' `$ t) L
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
      q7 D+ |, V  H  ~& n! ^
  13. $worker->onWorkerStart = function($worker)
    1 J7 D4 ]6 d. i+ `4 ~, h
  14. {+ }" V" L1 L+ a1 c
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符) D5 R) T- Z, q7 V4 k: @
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    8 Z/ @9 [( E6 B* P, t
  17.     $inner_text_worker->onMessage = function($connection, $buffer): T9 g* l9 M6 o7 S$ E
  18.     {( z# ]% j- P- T) g) Q7 T
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据' X2 q/ k$ n# E0 y8 Z! U4 X
  20.         $data = json_decode($buffer, true);
    ) @: m& \; T2 `- C' s/ S0 I
  21.         $uid = $data['uid'];5 \- O1 l5 }( f! X( x( `
  22.         // 通过workerman,向uid的页面推送数据
    ; B6 l1 B; w& B" |; d; v
  23.         $ret = sendMessageByUid($uid, $buffer);
    9 q$ q7 n3 L6 Z; C
  24.         // 返回推送结果
    - Z0 N) _  [! U  u) H6 \
  25.         $connection->send($ret ? 'ok' : 'fail');$ }( p  ?; \4 C( d8 W, _% y8 ]
  26.     };
    5 ]/ |: I! A. i
  27.     // ## 执行监听 ##! O: Z; T4 q; r  z3 n
  28.     $inner_text_worker->listen();, N5 d7 a; b  }+ O* h! o  r9 P
  29. };
    ) a7 x; x& _, `8 f# g3 R) g
  30. // 新增加一个属性,用来保存uid到connection的映射  D: Q- A: `" x; C3 J) i$ s
  31. $worker->uidConnections = array();% o$ K. k! p  }( v0 O  a
  32. // 当有客户端发来消息时执行的回调函数1 {4 J+ ~8 c& H8 r8 m( b
  33. $worker->onMessage = function($connection, $data)7 e3 L& E) L, r4 i* Y5 K' m
  34. {6 L; d: \6 W* d( T
  35.     global $worker;: w3 U7 f4 Q: Y. _( E7 ]
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    ; W9 [; |4 h# c7 `4 t, Y
  37.     if(!isset($connection->uid))
    ; _' P& W/ o. a; F1 T' S
  38.     {0 y# J, R7 @9 y; ~7 U8 c. T
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)8 ^3 C. @+ }6 t1 r, J
  40.        $connection->uid = $data;1 k+ \* \' c" u% l4 k1 P7 c
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,( w( Y" u0 \2 G
  42.         * 实现针对特定uid推送数据4 d# D1 A$ ?' ?# C7 E
  43.         */
    % s' i7 p$ o# @/ B7 y. G9 u( K
  44.        $worker->uidConnections[$connection->uid] = $connection;
    $ y( i; |4 H) K, ]) z/ x
  45.        return;
    . G! w' M: F" u$ f7 A/ [& {
  46.     }9 N- s+ r& k( s6 C- U
  47. };2 F6 x, a7 N. t6 T
  48. ) g5 _& \/ S+ q) T
  49. // 当有客户端连接断开时
    - c, X2 G+ e$ I' c
  50. $worker->onClose = function($connection)
    ( _. g% }. B: Q# d; Q4 \
  51. {4 @7 n. s" a  O5 T0 ?- B
  52.     global $worker;
    % F' q. L% o# V8 ~% Y7 {+ u
  53.     if(isset($connection->uid))  V; N; l0 N. i) d& X5 q& G
  54.     {
    4 ~6 C3 ~6 G: r, i; Z: H
  55.         // 连接断开时删除映射
    % O: k7 I" w6 W% c2 @& S$ c
  56.         unset($worker->uidConnections[$connection->uid]);" V: \1 u/ ~: Y1 e7 r- }+ h/ E% |
  57.     }: N' B7 T5 M" E" w8 Z5 A/ M
  58. };
    ; s3 W8 ~3 Z  Y9 m6 c. Z7 W
  59. $ J2 [9 D/ c" [7 n6 w
  60. // 向所有验证的用户推送数据- u( d; b* n. x' M0 O) Q  [4 S
  61. function broadcast($message)) Y; }* a7 j% E- v9 P
  62. {# K7 N$ F: j$ j
  63.    global $worker;0 K' k2 o- W/ \9 w4 R8 X) a  m
  64.    foreach($worker->uidConnections as $connection)( x& x9 T$ `) |5 |- i) c; s% M
  65.    {
    + F! K% I( I8 [; z
  66.         $connection->send($message);
    5 L5 K$ v" ~! _. W* r3 N( i; k
  67.    }) p) c7 }* C2 T- ?/ F  `
  68. }
    & }2 k7 n& P/ [; d

  69. 0 S2 Y6 `1 d  W$ h1 p+ z
  70. // 针对uid推送数据  q5 s. A$ N$ u5 v4 _$ D
  71. function sendMessageByUid($uid, $message)3 @1 i7 K: i' o) l1 f( F
  72. {
    , ^9 C5 |5 @6 ?1 m: x
  73.     global $worker;
    & U8 m0 \5 j0 K# ]
  74.     if(isset($worker->uidConnections[$uid]))
    6 M2 T+ n6 p/ A& I
  75.     {
    ' E5 k2 G/ K/ M* w! \: l& [
  76.         $connection = $worker->uidConnections[$uid];
    0 U; N! u, T/ a& ^$ K1 a: o% C
  77.         $connection->send($message);
    ( u  b. z; r" [( H: \/ y
  78.         return true;
    / H, C4 j8 Y8 B. s2 z+ a7 h" V+ ]. q
  79.     }
    . f% j5 G( c, f, e9 \, W: [
  80.     return false;
    4 s* S7 I4 d* l( A( ?
  81. }
    # ]4 }7 s9 S  }# U# p
  82. $ D" s+ a3 A1 C4 t' F* V; `( p
  83. // 运行所有的worker  Y+ U( f" Q' {4 ]8 Y
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    / @+ W) y( R5 m
  2. ws.onopen = function(){  K. n  Z$ V9 B4 d  u' l
  3.     var uid = 'uid1';
    6 v6 G9 w+ h8 B2 F0 O6 D
  4.     ws.send(uid);
    , |! p& H$ k& g$ F; [- \0 a
  5. };/ N, `- o* _# I3 I, r4 s8 _7 U1 j9 A
  6. ws.onmessage = function(e){; A4 {9 z+ ~' i" D; {6 t) w
  7.     alert(e.data);
    # _+ h* Y. L- n  ^- ^4 i5 T! i
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    ( a' d+ S& {% O
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    ) m$ M0 _' S  u2 P
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    ' ]( I: h& u( `) D. c
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    $ J* f5 L& o- N1 n
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    % _6 S0 `( `1 ]( k2 r8 q5 \) f
  6. fwrite($client, json_encode($data)."\n");. M4 X$ W/ J* X) r, b
  7. // 读取推送结果
    8 O; V1 R0 {4 |! ?& k$ x* n
  8. echo fread($client, 8192);
复制代码
1 L! u. ~6 t" ^2 T& E% m

0 c  h$ B' B8 p+ A" Y. q3 N
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 18:51 , Processed in 0.063851 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!