您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10297|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;8 Q; c  u+ K# Y& `
  2. require_once __DIR__ . '/Workerman/Autoloader.php';4 q4 N( I4 z7 ^- ~
  3. ' o1 P/ K, T( M' ]/ A' \
  4. $worker = new Worker();; X1 ~5 i, L; |: P( u7 a# h+ U6 T
  5. // 4个进程5 ~6 ^% C  H& ~/ ]
  6. $worker->count = 4;
    # ?7 U6 t0 d) [4 t2 }
  7. // 每个进程启动后在当前进程新增一个Worker监听
    ) k9 g  \+ I$ @) J
  8. $worker->onWorkerStart = function($worker)* L! k1 E- z: d% W
  9. {
    / L, ^% f% P8 A4 w) x4 m0 `
  10.     /**
    : ~& T! L0 L" Y$ H, p
  11.      * 4个进程启动的时候都创建2016端口的Worker. O. K3 u  c% m
  12.      * 当执行到worker->listen()时会报Address already in use错误+ R1 ]; ~; r4 V, C0 X0 F
  13.      * 如果worker->count=1则不会报错4 c# i4 u5 n1 g$ Q
  14.      */
    9 d  a6 ?. x1 ]$ U) ^
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');* g1 g( c! ^3 D" l5 K& w4 ?
  16.     $inner_worker->onMessage = 'on_message';6 P3 r( M9 ]# Z
  17.     // 执行监听。这里会报Address already in use错误3 y9 j# z+ X2 z
  18.     $inner_worker->listen();
    2 c+ \7 x# m+ w
  19. };$ b. ]! ]( O$ S
  20. + r( A  S( ^5 E+ m; h0 @
  21. $worker->onMessage = 'on_message';2 F, G& {5 i: Y4 B

  22. ) L1 [$ X$ P% h6 @, q6 W
  23. function on_message($connection, $data); K6 ^, z$ u. b/ E
  24. {! q/ y" }) V& ?" S1 w1 _
  25.     $connection->send("hello\n");  S8 D0 @/ Z4 U- f& P& l8 d
  26. }
    " J( t" y+ c/ H7 p! |' B3 N8 U' S

  27. 8 p# k  i' w1 U3 w
  28. // 运行worker
    6 [6 `* `& ^: [
  29. Worker::runAll();) F+ N+ F8 j: w( a4 K
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:+ y+ {+ {2 q7 j& q( O8 `: R

  31. & P8 x, w8 x* w# R5 Y3 V. `
  32. use Workerman\Worker;
    : o) D% F, Y8 e8 ?' F
  33. require_once './Workerman/Autoloader.php';6 E/ F! e7 q& T# x4 j/ s3 i1 a
  34. / W- F8 j3 c  I' l% ]  m8 i' A9 Z
  35. $worker = new Worker('text://0.0.0.0:2015');
    # Z+ t/ I5 B* W' P: n
  36. // 4个进程) f3 w# l0 v. a& I, G- S" N
  37. $worker->count = 4;
    8 i7 P1 b, Z* H( Z, }4 Y7 S
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ! S. J" S0 t9 ~' f% {  ]" I: }
  39. $worker->onWorkerStart = function($worker)1 |) V3 U8 K- y" b0 p# z/ E
  40. {' j% V2 V1 Y+ o( C
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ' c1 v' q( u' v4 i* q/ c
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& q6 ]; i& K0 J2 t- C7 f; C6 T
  43.     $inner_worker->reusePort = true;
    ( V9 Y6 ~% Y4 j% X. O
  44.     $inner_worker->onMessage = 'on_message';" J0 v" P; }  C$ K/ d
  45.     // 执行监听。正常监听不会报错
    9 y' ~3 [1 B% h4 |0 w
  46.     $inner_worker->listen();0 b+ A" t3 o: k; G8 ~6 ?
  47. };  u5 c' R( k, S9 L' J
  48. * [4 `7 m7 g1 ?4 w8 Z* Q6 g" x/ s1 u
  49. $worker->onMessage = 'on_message';
    5 D. O  D5 {3 w7 c/ j$ w7 ^; X2 y

  50. 3 d# z7 h# Y1 Y& v
  51. function on_message($connection, $data)( ~( n3 [0 O4 H
  52. {  r' x' z3 m/ g. M
  53.     $connection->send("hello\n");* ^" O" q7 y0 `. P  T
  54. }! U& L  u- F! M, Q/ K. |' \
  55. ; p: b- i3 A& e
  56. // 运行worker
      p4 m6 Z+ D1 j  I8 x
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php, _! r9 R$ z7 ]
  2. use Workerman\Worker;1 w2 h' O# m& }# [/ P
  3. require_once './Workerman/Autoloader.php';
    " E: c% ^' A# `+ Z
  4. // 初始化一个worker容器,监听1234端口
    5 _- k6 I# j! U. \. `
  5. $worker = new Worker('websocket://0.0.0.0:1234');4 @0 Z! M; f  Q- k
  6. ! R+ l5 O! i9 q" q# k2 h
  7. /*
    0 j  E4 Y% e( A( E' |
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误8 t) p* A+ K# D$ B
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    & S, a5 B% E4 D  K8 z
  10. */# n  ]) T, z  E9 S# t: y
  11. $worker->count = 1;
    , G8 F7 T( `$ R3 |) }& i( j3 K1 [
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    ' S5 Q1 {  T7 k- g  Z
  13. $worker->onWorkerStart = function($worker)
    5 Y2 i: q! j  @+ S
  14. {5 i1 H9 z) K$ B9 I, o4 q
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    " U# S7 y5 o7 H  n7 B
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
      G- j; W# i/ y1 D% u0 ]8 [* s
  17.     $inner_text_worker->onMessage = function($connection, $buffer)* q; Z2 S7 C2 M) R+ P$ o' p. U
  18.     {
    - I) @/ G; V2 K' g
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    1 d" H# P, U' M& z; ^1 q
  20.         $data = json_decode($buffer, true);
    " @7 t# x8 d& ]. H% ~+ c4 D5 {6 _9 }/ A
  21.         $uid = $data['uid'];
    ; |* [! t' e5 }. S3 g- T
  22.         // 通过workerman,向uid的页面推送数据
    / ~. f; w* `; s$ L! b3 Y9 u
  23.         $ret = sendMessageByUid($uid, $buffer);1 e1 l) u7 c1 A* U( i: M+ b0 T
  24.         // 返回推送结果" z4 y" T3 \9 u. Y: O
  25.         $connection->send($ret ? 'ok' : 'fail');2 R6 m. |: x1 }8 U% s# {
  26.     };
    # R. L' c, i7 j: d' R
  27.     // ## 执行监听 ##$ C5 p5 C8 U8 }/ g2 d- G6 u6 W
  28.     $inner_text_worker->listen();+ w$ m5 H: |+ A0 J( @" k8 ^9 z) i
  29. };
    4 v- F% u1 s4 M. s3 N2 P
  30. // 新增加一个属性,用来保存uid到connection的映射) F! z9 k4 Z5 k  L9 U8 J4 n
  31. $worker->uidConnections = array();
    2 P* P% v( h% R3 C( ~
  32. // 当有客户端发来消息时执行的回调函数! p5 U  j) C' z0 B) R6 E
  33. $worker->onMessage = function($connection, $data)
    . O; q8 J2 Z$ B( ~1 r+ D
  34. {$ K( P5 ?7 F! g9 b2 `/ {! n
  35.     global $worker;
    2 e. G# {- Z& O9 Q# j
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    & K' P3 I5 ]# Q1 p! F
  37.     if(!isset($connection->uid))
    7 N  X2 R4 q$ X8 b3 E: f
  38.     {4 C6 C& A1 }- \. g9 }: \
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    % e& e7 G. [# d
  40.        $connection->uid = $data;$ E9 w6 v. V) }% N' o' t4 V
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    + x3 b# d- @8 C+ }. H" ^
  42.         * 实现针对特定uid推送数据
    , H- w9 w; a9 ?1 T0 Y- J& i
  43.         *// \, e) M9 v3 L$ j. x+ l
  44.        $worker->uidConnections[$connection->uid] = $connection;% i$ B; Y1 q. Z$ ^: N% p
  45.        return;
    0 B* D, Q1 y, u0 o
  46.     }$ K7 l. b5 B; J+ W
  47. };* ?4 o$ p# V1 i$ `, f
  48. 2 u5 E1 U3 P7 j6 }5 l6 [* Z
  49. // 当有客户端连接断开时
    ; @* o0 y/ F. d2 G6 i% k
  50. $worker->onClose = function($connection): V# x2 m0 P# e7 w
  51. {
    0 W. j  h. i9 j( Z6 O) }
  52.     global $worker;
    ' U' X% p+ `8 v: s! g; e) l
  53.     if(isset($connection->uid))4 S0 `" ~6 m" P( c! a- M- u' Z
  54.     {" j5 X" I+ A8 _; }$ J
  55.         // 连接断开时删除映射* y8 T6 X' H: e
  56.         unset($worker->uidConnections[$connection->uid]);
    / V: E! a* \! @& x$ C; I
  57.     }
    $ G! s! }2 g& e; v) N# {
  58. };. N5 S1 B) n/ c9 ]8 S9 c8 y% [
  59. - x: ~/ [! `2 P7 p- s% h% R
  60. // 向所有验证的用户推送数据0 X$ X8 X( P, B
  61. function broadcast($message)* @, }- K, f. {# O  X9 W# I; k
  62. {% E& A7 a& I8 E& U6 v) {" Q
  63.    global $worker;: L: @9 C8 R  t! y- \
  64.    foreach($worker->uidConnections as $connection)1 g4 O7 |8 K' o. Y8 \! T0 f5 s
  65.    {
    0 x5 u/ R3 F* F% ^' u) s9 O* i; @
  66.         $connection->send($message);  V8 U& X) w2 n
  67.    }
    ; T' \  C) H0 P
  68. }
    * }* G/ _& I9 X2 j9 h, l/ D

  69. 4 M. e$ G% f3 Z3 Z% S
  70. // 针对uid推送数据
    $ ]& E' a, b! `; I
  71. function sendMessageByUid($uid, $message)
    * g* k! q8 @5 O9 y! T
  72. {2 h; u& ^7 q8 s# ?* F
  73.     global $worker;
    6 k7 f8 Q5 T. ~( f) _, {) s" g
  74.     if(isset($worker->uidConnections[$uid]))+ N% W% n0 M$ B- F2 _6 f' h
  75.     {1 b9 _4 A6 M" r
  76.         $connection = $worker->uidConnections[$uid];
    6 s$ J7 Y* y0 d, O
  77.         $connection->send($message);9 R; a) y0 S4 ~8 q  x% V: m
  78.         return true;$ G' r3 d/ c5 {
  79.     }
    1 P! g9 O- Y; E; o# [& @
  80.     return false;
    5 A; u- r& Q& P; f2 S0 f' F
  81. }
    - R& _- L8 B6 @$ p. w' |

  82. 8 r. [" C8 z* [/ m9 {5 \
  83. // 运行所有的worker
    , L8 E) F9 b0 x7 O, U- e
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    0 A) q- R. r/ I8 M- @
  2. ws.onopen = function(){1 I/ w/ t: n1 J. W/ h
  3.     var uid = 'uid1';, b4 Z( v; k3 n: W, c" B  w8 I
  4.     ws.send(uid);& H" C2 Y/ Y9 E# \+ s
  5. };# J( k9 S; B; L8 ?/ _
  6. ws.onmessage = function(e){! v' B) O4 I1 R# z- v
  7.     alert(e.data);* ?: W4 b$ g% G
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    9 c5 f8 A9 U8 h- U$ r6 L
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    8 n6 T5 W! B3 Q
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
      Q3 a6 s5 x/ K) b
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');2 B# P- p. n+ O
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    * f8 @" |5 a+ F2 B4 P
  6. fwrite($client, json_encode($data)."\n");
    ' s) o6 c" z8 x9 T/ ]" k
  7. // 读取推送结果
    2 o7 b/ I. }+ I/ @
  8. echo fread($client, 8192);
复制代码

6 l# z7 K* m9 i) I4 n  N3 \' m
4 g) e) ?+ r( ^) u
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-5 08:30 , Processed in 0.118221 second(s), 21 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!