您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14799|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;) P0 F1 u+ G. `5 L* o
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    5 Z1 E4 {  ?$ d( E
  3. . c( l( ]4 |8 Z& t! F
  4. $worker = new Worker();# {4 d2 q& E5 d: v1 i, M# M# d
  5. // 4个进程. q  {! Y# @" I+ |  }6 C: U7 M; o' }
  6. $worker->count = 4;9 a$ ?) E6 v8 k. h9 ^  P! u2 F
  7. // 每个进程启动后在当前进程新增一个Worker监听
    , r( n5 `+ X& x5 b" x- h% ?
  8. $worker->onWorkerStart = function($worker)
    7 c: ~, Y7 C. z& Y
  9. {
    9 b( O; E+ ^) `6 b2 S* }) M- g
  10.     /**- o$ B8 S' w7 p; D0 p. e% U5 R; \
  11.      * 4个进程启动的时候都创建2016端口的Worker) d+ E- ~0 d( f8 k+ m, X
  12.      * 当执行到worker->listen()时会报Address already in use错误+ u" R4 h: M% h* Q6 W6 ~1 V1 v! A( E
  13.      * 如果worker->count=1则不会报错/ t+ _5 ~: l2 M! c: s
  14.      */, V! X; j4 n) R
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');8 m& }. r6 F0 v2 a' y  }2 E
  16.     $inner_worker->onMessage = 'on_message';, P2 Q4 p8 c2 i2 p& O/ Z6 D3 w" H9 q4 ]
  17.     // 执行监听。这里会报Address already in use错误
    3 {  j) e( L8 L3 u3 b) i
  18.     $inner_worker->listen();
    * J" u! u' k" |. W2 r
  19. };
    5 P$ j* u5 F. W5 J1 k8 b

  20. ! e/ R) E1 o5 H
  21. $worker->onMessage = 'on_message';
    ; ^" `; L; ^! r

  22. 3 v3 O9 `- n; v) {9 B* T: n
  23. function on_message($connection, $data)% ]! I" }% j4 L6 f* J! j' E  u
  24. {5 C  O( r, e7 |0 f
  25.     $connection->send("hello\n");8 D- U6 P2 K5 Q  c$ I) V/ B
  26. }
    - t' ]# C  q$ x, X) a9 @& x; B

  27. # O, M0 p, J7 E$ V- N( T
  28. // 运行worker% s. e7 m: t3 E
  29. Worker::runAll();# u4 ~$ W) U0 e* K
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    . [2 o7 `* }1 N9 C" F
  31. : }1 i6 D$ [$ R8 M- F* w6 g
  32. use Workerman\Worker;
    / d+ ^% |! Z4 `% `9 A
  33. require_once './Workerman/Autoloader.php';
    : }& {2 c  K' q9 |5 u

  34. 8 _& p& M% B$ C  ~' L
  35. $worker = new Worker('text://0.0.0.0:2015');
    + G) L- y9 B0 i7 E5 ]
  36. // 4个进程  t) l* y0 U3 t# e. V
  37. $worker->count = 4;
    ' c9 D) F$ W1 j6 K
  38. // 每个进程启动后在当前进程新增一个Worker监听
    4 `( J; J% _: T5 Q3 e
  39. $worker->onWorkerStart = function($worker)
    9 V1 j) H) ^% ]! G
  40. {
    4 H* }/ a$ `/ _
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');7 [( M3 t; Z( R4 N
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ; I1 N1 q% K4 f* Z- W) b
  43.     $inner_worker->reusePort = true;+ A4 j6 B! m, X
  44.     $inner_worker->onMessage = 'on_message';
    & w( O3 l# u  i1 n5 ]
  45.     // 执行监听。正常监听不会报错
    % n+ {9 C: m" l) H2 G
  46.     $inner_worker->listen();
    % |/ a% f& [  Y, z; I. @
  47. };, P! A$ Q; P" T# v6 q/ Z
  48. 8 C* ]+ I' ~% l
  49. $worker->onMessage = 'on_message';8 _5 X- ~2 [, d/ X- E! }& w# i

  50. 1 o6 q2 h7 j' G9 j9 D3 E8 X
  51. function on_message($connection, $data)5 Q2 |4 C! ?: r
  52. {
    2 s3 F, F6 [8 I* ?' k+ v: f: G, }
  53.     $connection->send("hello\n");
    * x- u" g7 r4 U) t) d9 b
  54. }
    + ~5 g, \' r' Z( n( d8 a% \& y+ F, J

  55. # H: \+ B8 `" @# g# ?
  56. // 运行worker9 P" Z8 w5 t( G6 h9 [' n* ]
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    " e* s0 s: L4 Y. c9 M5 X4 X
  2. use Workerman\Worker;
    $ t% o. T- s4 o+ i8 N3 `
  3. require_once './Workerman/Autoloader.php';4 L. k5 ^+ u; o, l" ?3 P, E# @
  4. // 初始化一个worker容器,监听1234端口
    * p0 V5 }3 v! B" V
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    - k! Y/ Y! m, z, p) a+ q. A/ `
  6. ! ^4 A& s) v0 t( D
  7. /*+ u; X. g) |5 F  o
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    ' v5 D$ o2 ?' {  v
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    9 }( U- P7 z- a8 m; i. N5 `
  10. */
    & T6 E4 ?' }7 N' D7 x
  11. $worker->count = 1;
    8 U0 J. o6 y6 a
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口! }' [7 @3 \& t+ p0 H
  13. $worker->onWorkerStart = function($worker)# s! d9 C. q/ @  ]
  14. {7 V* G" o, j# P! q
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ! {  [3 K4 J( y. ]& Q2 c
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');; q- E8 p$ R! I
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    % z9 u! n2 H) K2 K/ N! ?
  18.     {6 E3 Y  Z: V8 H4 f
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据3 s6 n9 Y' z" P& Z
  20.         $data = json_decode($buffer, true);
    + c. b7 D( J% X& T9 M: b% J2 z2 ]# R
  21.         $uid = $data['uid'];9 D# [6 z/ M, f$ {
  22.         // 通过workerman,向uid的页面推送数据
    0 I' L* u3 Z  ]) p) ^' `
  23.         $ret = sendMessageByUid($uid, $buffer);- t4 O/ s) C3 ~) A# M: _9 t4 m- q
  24.         // 返回推送结果. s+ z' z$ }! d  l+ j0 U
  25.         $connection->send($ret ? 'ok' : 'fail');
    : U7 j1 ]! _7 T* `) U, x2 _8 Y3 }
  26.     };0 Z5 Q( |( q( S! {3 b4 R
  27.     // ## 执行监听 ##5 C" _* _) H6 F' \% t7 Q: r. [
  28.     $inner_text_worker->listen();
    ) _5 U8 G/ b( u
  29. };
    & t- j9 j5 s! H8 g
  30. // 新增加一个属性,用来保存uid到connection的映射$ P, }/ \1 P( o* k+ f' n! O
  31. $worker->uidConnections = array();2 r7 @1 ]/ ~% H$ x" k$ K
  32. // 当有客户端发来消息时执行的回调函数: z7 ^2 Y5 Z4 D) a+ B  l, H
  33. $worker->onMessage = function($connection, $data)5 Z: r9 o/ C) ?: y6 Z6 I# i
  34. {
    & o/ P  p; x9 {) W* g; h
  35.     global $worker;
    $ G  t9 p) ^" T, c' H  k( q
  36.     // 判断当前客户端是否已经验证,既是否设置了uid! V* \2 _% N+ D- s
  37.     if(!isset($connection->uid))7 f% s. w# j1 K& O) ~
  38.     {8 {. n' M% o. O) Y
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ! N6 ]# [1 p" W) |' S  Q+ a
  40.        $connection->uid = $data;
    9 _$ I2 M5 a" N, r& F# W
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,& V. w" y  m8 w: h
  42.         * 实现针对特定uid推送数据) c6 A0 v% \$ d3 D# m: V* l; l
  43.         */1 J" y3 E1 ~3 p7 H' ~
  44.        $worker->uidConnections[$connection->uid] = $connection;6 I+ k8 A1 Y! t& ?2 w
  45.        return;
    - b/ N+ f; c! O
  46.     }
    : `/ m0 D+ Y& b. C" L
  47. };# s* J7 m! L2 v, u7 @/ l7 N

  48. + T. J8 ?5 v( A3 N: v9 n; W+ a+ D
  49. // 当有客户端连接断开时
    4 S& A$ s5 d+ {0 I' n  p
  50. $worker->onClose = function($connection)
    ' H5 _: d( ~0 D0 V9 l3 X
  51. {
    ! `$ W8 @6 H  D2 s* Y4 ^
  52.     global $worker;4 C2 K0 i( j% o, X) `9 z- E
  53.     if(isset($connection->uid))) S/ v2 W; }. g8 c1 E& j4 p
  54.     {8 S0 x( `/ S' f: a5 U9 i
  55.         // 连接断开时删除映射
    & J+ A  f( L4 `2 a
  56.         unset($worker->uidConnections[$connection->uid]);6 P: X3 E! _' j" R: \' W
  57.     }
    + }7 k' f" {) N" v
  58. };
    3 \7 }0 p5 F; N7 S
  59. ! R; D$ W! L8 h7 \& Y% `
  60. // 向所有验证的用户推送数据
    1 }- }2 K, Z3 |
  61. function broadcast($message): `5 J: d" q7 W2 O, ~
  62. {) y. P0 G$ D& G0 T" j
  63.    global $worker;
    . v$ s* O2 o: m* E' \% H" y( e
  64.    foreach($worker->uidConnections as $connection)  D. _+ _6 |( b. d, }5 P
  65.    {+ Y6 T" B: T5 ]+ z! p% [- w
  66.         $connection->send($message);* i+ k: l* N. [/ h$ M% c
  67.    }
    4 ^% e5 P4 t* {5 l& B
  68. }
      N  }7 R/ C" m$ z. Z
  69. 4 T% V0 {$ |2 x! J
  70. // 针对uid推送数据$ v7 a( M9 [6 A% t2 [
  71. function sendMessageByUid($uid, $message)
    # X, w2 X7 `, ?; _: J
  72. {! f! R2 ^0 m; s- g1 `* V
  73.     global $worker;0 P6 A) x; H9 Z) ^$ o: X
  74.     if(isset($worker->uidConnections[$uid]))
    ! `5 _' q5 m5 F( S
  75.     {# S: E1 B# t7 [! W7 i
  76.         $connection = $worker->uidConnections[$uid];
    8 B# F7 _' T# B! r3 g
  77.         $connection->send($message);. i' d! A  L3 i* }; j, _( {  J( l
  78.         return true;
    ; Q- C9 s7 ?3 z) G& U# C. t& z
  79.     }
    ; m' P& l( b6 D0 c
  80.     return false;
      V5 l3 p. ~1 r0 A# R) G
  81. }8 u! Y  o* ~$ Y' C' \  l$ Y# {

  82. ! z! w# p7 K1 W: ]$ ?3 G% e  N4 Y+ u
  83. // 运行所有的worker
    4 U- q, G' W& F/ N# ?
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    $ Y6 X% f+ y3 W9 \/ _* d( Q1 p# Z
  2. ws.onopen = function(){, c/ g! H' y; J8 C( K
  3.     var uid = 'uid1';* G3 P2 h( _6 }5 i/ r- `# b
  4.     ws.send(uid);
    / \3 d3 T7 P% o. H# V
  5. };! B6 }. y6 e& N4 }; i5 \
  6. ws.onmessage = function(e){
      l6 d3 [* v/ b
  7.     alert(e.data);
    ) {& S) L9 g" |3 W+ d
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口7 \. t" a3 O# v* ?% _; H
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    5 j9 k. V+ p0 E$ P1 S9 ]7 U
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    . k* C; V1 t1 [. @
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');0 B8 J+ [! ~% x& p# e
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符( h7 M0 x1 k- ?; ]4 Z  z% h$ j
  6. fwrite($client, json_encode($data)."\n");- R( P( x2 Z+ \6 B& C/ T
  7. // 读取推送结果
    , N8 p6 z1 G6 `+ L) M
  8. echo fread($client, 8192);
复制代码

& Z- M' b2 H/ i+ z4 q9 o; L# A6 g* K3 ^7 ?  f3 I
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 17:22 , Processed in 0.064006 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!