您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10563|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    ( f% C. }! U, k! S( M, ]
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    3 K: M7 s2 o- W4 m8 c, q

  3. 5 n5 R0 o8 j6 O5 b- z* y( W
  4. $worker = new Worker();
    ; R; C# t# K8 l, h
  5. // 4个进程
    + X" X" u# v& V6 t5 C! T8 h
  6. $worker->count = 4;
    - G& \; E$ q  _5 q2 w
  7. // 每个进程启动后在当前进程新增一个Worker监听* I/ g& n5 g% D4 _0 V
  8. $worker->onWorkerStart = function($worker)1 T8 E7 L: ]8 A& E
  9. {
    $ h1 O8 t2 s1 n
  10.     /**' y8 F  w3 C3 z2 `# Y1 L) C9 D) q
  11.      * 4个进程启动的时候都创建2016端口的Worker
    9 h, N& K5 y9 I1 i
  12.      * 当执行到worker->listen()时会报Address already in use错误
    & H' S6 Q. Z+ H  R" H6 ]% U
  13.      * 如果worker->count=1则不会报错4 J+ p8 R: Q' o8 ]  ~; s" H
  14.      */: t0 Y* H. t3 u: X
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');9 |8 A7 q, A( a+ I' Q
  16.     $inner_worker->onMessage = 'on_message';
    ( b8 J5 I4 e9 g' {
  17.     // 执行监听。这里会报Address already in use错误; _- x/ ^. v6 F( H2 R1 c
  18.     $inner_worker->listen();8 z$ _5 X3 g! l9 `- }
  19. };
    0 r( n: E+ Y0 J' ^  q

  20. + v$ @8 c3 `; l/ d/ W, i0 x# c
  21. $worker->onMessage = 'on_message';# k- R2 s% X6 R. t) [

  22.   X: A  x' M% _" K' E1 M# E. U& Z. j5 d
  23. function on_message($connection, $data)
    ! g6 J3 K6 _- @, l5 T
  24. {
    3 s$ g2 g5 L; [8 @
  25.     $connection->send("hello\n");
    : }0 d9 ]! `# ~  ~& g( i' W
  26. }
    ; M; E2 t; x5 o4 O7 ~) c
  27.   s9 y6 u, l8 U/ |
  28. // 运行worker% J) z% `7 @1 k4 M' k
  29. Worker::runAll();. {( ~5 q' Z# [: K0 Q
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:% f% p) R$ R- m/ d

  31. # N2 `8 h+ J3 T0 M+ ]0 c9 [* k9 o
  32. use Workerman\Worker;
    * z! c; {, T4 X1 O" C% @% O. @
  33. require_once './Workerman/Autoloader.php';
      c# g$ i% S! O; s1 a: J
  34. 9 M/ _  M0 H% w& x/ T8 x" e& I8 q
  35. $worker = new Worker('text://0.0.0.0:2015');
      b# t9 v3 b* n) I( R( _9 e
  36. // 4个进程
    7 ]. ^/ b* X+ ]* K7 n
  37. $worker->count = 4;1 L2 D8 f6 ]4 d1 N5 d+ F3 G
  38. // 每个进程启动后在当前进程新增一个Worker监听
    3 K. I* f2 Q+ R/ m
  39. $worker->onWorkerStart = function($worker)3 P) Y  n5 f4 J' ~3 m9 a1 }
  40. {
    - q$ c$ p/ c/ X1 z# n
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    0 s7 W5 u# v, `# v# U' O
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0). B9 y9 I2 c! r$ Z% t0 h3 Q
  43.     $inner_worker->reusePort = true;! h) S  `( o$ o7 [
  44.     $inner_worker->onMessage = 'on_message';9 O. S# c5 k( W2 Q, C0 H
  45.     // 执行监听。正常监听不会报错$ t; O0 l2 ^8 R% k+ E
  46.     $inner_worker->listen();
    % \9 k  y& n# {& T7 x0 ~
  47. };
    # u  b7 N% R0 \- v, J& S" a
  48. : N5 o4 O! k$ S& R/ G6 l
  49. $worker->onMessage = 'on_message';9 {  C; N  m$ O' L

  50. 6 E6 C$ N5 @8 B* z" a+ X" f
  51. function on_message($connection, $data)# ?2 J0 Z$ n; g, O
  52. {
    , Y3 Q& {- R7 y6 V$ N) S& ]. r
  53.     $connection->send("hello\n");
    7 \- t# D$ o/ B
  54. }
    6 K4 _9 o8 J4 p! E4 K2 f3 i) A/ j9 y
  55. 2 e" A- ^& W; L$ i1 a+ w! _
  56. // 运行worker( p, _% h  x5 R# i
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php0 q: U# e8 F; f0 f6 Z% g
  2. use Workerman\Worker;
    : O1 L& G/ U  k: H
  3. require_once './Workerman/Autoloader.php';! r6 }/ z. l5 o2 j2 g, f7 W6 h! p
  4. // 初始化一个worker容器,监听1234端口
    % n" T/ |0 D; m+ H6 v6 P
  5. $worker = new Worker('websocket://0.0.0.0:1234');) B. I& m. p) p) u$ |& H
  6. 0 P# l& k3 v$ A7 i! h' z1 \+ V
  7. /*
    / n4 q% b; V, ]
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误3 R5 c6 j  p' j: e) q# J2 h
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    " V  }# ?7 ?: }9 I
  10. */) J+ w- a4 M+ y. Z3 K3 u1 J, D2 R
  11. $worker->count = 1;/ A# e; ?1 A7 q: A
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口" G. e5 _3 f% r, I: n0 m
  13. $worker->onWorkerStart = function($worker)
    8 V& u2 U* D( z- V; ]2 p5 m
  14. {
    - B% i: N# j  f* k
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符$ u. J  A9 a# ^' t, r6 s
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');' h# L1 u% |0 Q0 T" r' x8 J# q
  17.     $inner_text_worker->onMessage = function($connection, $buffer)- @# m1 \: b3 z
  18.     {
    & _  b% R" ^% ]( K. F8 y
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据% x! G/ ]& U9 R3 t0 u
  20.         $data = json_decode($buffer, true);/ H' A5 k, b+ p% E- T
  21.         $uid = $data['uid'];* D2 d, |% u" P; G
  22.         // 通过workerman,向uid的页面推送数据
    2 Q- }4 j" W  z) F9 n
  23.         $ret = sendMessageByUid($uid, $buffer);
      D+ d! C, o: x# v6 Q
  24.         // 返回推送结果
    + V2 V3 ?( l3 l9 U: [
  25.         $connection->send($ret ? 'ok' : 'fail');6 h3 ~# h: A4 j" e, r
  26.     };4 c9 t+ C, |# y( u- i: R* l" Q
  27.     // ## 执行监听 ##
    4 ^4 B7 Q9 y8 F7 @
  28.     $inner_text_worker->listen();2 B1 ]+ \! c; G" b2 b
  29. };3 y/ r+ I, L- @$ h& C1 u8 o! q( i
  30. // 新增加一个属性,用来保存uid到connection的映射6 L3 h7 k2 Q7 ~
  31. $worker->uidConnections = array();
    - d/ W" ?' u  w2 T4 v
  32. // 当有客户端发来消息时执行的回调函数
    9 ^  U8 [8 M6 X2 V8 X4 H3 U
  33. $worker->onMessage = function($connection, $data)' h! Z; t8 n6 D- K, i  l5 n% @
  34. {
    : l+ @4 i" o3 H+ ~
  35.     global $worker;
    & e. t) w0 n* c9 E% J1 f
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    % V  l9 T! b  I5 _
  37.     if(!isset($connection->uid))2 `8 C3 |# i1 ~1 E
  38.     {
    " J/ d& Q! [  u
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    $ E3 J$ O5 }$ L6 V' @, @
  40.        $connection->uid = $data;1 N( |4 n1 h7 b  L7 A: t5 l
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    & e9 l3 h4 ^, v' @+ r
  42.         * 实现针对特定uid推送数据
    4 Y+ w0 ?$ P  S6 Y2 C0 r
  43.         */0 L: \0 f) w: J- y& M9 k
  44.        $worker->uidConnections[$connection->uid] = $connection;  y5 m# U5 W9 t' ^7 d
  45.        return;
    ; O  C( J8 s2 [$ `2 E5 o
  46.     }
    $ ]4 B- t9 B" N
  47. };* `$ K6 A' {( P# m. ~

  48. 1 _  W  U$ _% l3 R  _# I( U2 W
  49. // 当有客户端连接断开时
    3 V6 t2 M, i3 [  m% n  N
  50. $worker->onClose = function($connection)
    & q' k/ ]( R4 m
  51. {
    ( e) F) M! d1 d6 L% [; I# P# x
  52.     global $worker;
    9 o) _) |/ h' l: R5 k3 T1 n7 f- A
  53.     if(isset($connection->uid))
    * L9 u$ S4 ]( @2 T, v  E+ `: P
  54.     {- K8 z: d0 ?4 r8 q) p: b$ E$ D
  55.         // 连接断开时删除映射
    2 i/ i0 v4 L6 ^" a
  56.         unset($worker->uidConnections[$connection->uid]);  x1 r$ V) i* c
  57.     }( a( Q  F& l1 t9 \1 A1 t* Y3 M4 W
  58. };1 i2 W; j1 d$ p5 k

  59. 2 D. U/ S- T  f  R( d
  60. // 向所有验证的用户推送数据' d. P+ {  ^/ P
  61. function broadcast($message)
    1 E2 }6 x% ]( O+ t* `; |6 N
  62. {
    ; [5 R' D2 C1 V' A+ g5 u
  63.    global $worker;
    1 J- ~7 F9 i6 y  l- y; S
  64.    foreach($worker->uidConnections as $connection)
    1 B) @7 c: a1 d4 w
  65.    {' H# Q6 M* a, _2 |- w* b+ M
  66.         $connection->send($message);# h/ ~5 O. u, x: w5 \4 Q
  67.    }/ N' n: E' W; O* v% o5 |
  68. }
    1 w: }! i' n$ R3 {
  69. 4 r6 ~2 P4 r$ z
  70. // 针对uid推送数据8 `; B0 x1 ~/ i% Q" x# V
  71. function sendMessageByUid($uid, $message)/ a9 V0 ]& K/ M
  72. {
    3 ^: u5 e. q2 O! S6 u# K
  73.     global $worker;( y. t" [/ V5 P$ l
  74.     if(isset($worker->uidConnections[$uid]))9 ]8 T0 N+ _) ?# Q8 H, Q
  75.     {
    3 Q  I7 l: |( d' Y! M0 s# x
  76.         $connection = $worker->uidConnections[$uid];
    " q0 k. A7 _6 `# Q
  77.         $connection->send($message);
    5 A* r5 L* _* R! C! g  p
  78.         return true;! A- H: i- m5 S8 r
  79.     }
    # v1 R1 x+ ~9 G
  80.     return false;4 G+ o0 X9 {9 i
  81. }/ u2 b* S# _% L5 h# U

  82. ! X, U8 ^4 b& B) y3 p- a2 q
  83. // 运行所有的worker
    ! x2 t$ V( J9 w9 p
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    0 R; Q/ ?0 m  o& Z* H
  2. ws.onopen = function(){" L4 F0 d1 w( C* p# t$ g) ^* A
  3.     var uid = 'uid1';: ]2 W, M" p+ w1 ?8 _
  4.     ws.send(uid);4 L9 z; Z# D5 s; j% ?
  5. };9 L8 E# w; M& H1 t) V
  6. ws.onmessage = function(e){
    , ]9 B2 _+ P8 }3 J% u' l# @
  7.     alert(e.data);
    8 p! G8 E! y% n* F- C
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口/ r4 \5 [$ A8 `" g( D% J
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    $ n* w1 W0 ~# \
  3. // 推送的数据,包含uid字段,表示是给这个uid推送. H$ q* {. [- R0 k  g- a
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    $ l4 c% D$ D4 R" E  U9 c; P5 [" o
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符  @. K5 B3 X, V
  6. fwrite($client, json_encode($data)."\n");
    4 ]9 X, x, b+ m# h; E, s
  7. // 读取推送结果
    $ c9 i% d: u, x
  8. echo fread($client, 8192);
复制代码

# H; `" Q7 {2 z. f+ {
5 J2 Y& ~6 d0 W6 c" h
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-20 01:33 , Processed in 0.133737 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!