您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14660|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    5 z* W, S  O& U4 d( n. a7 ^) z
  2. require_once __DIR__ . '/Workerman/Autoloader.php';2 U2 l- ^0 e7 T1 ]1 s1 ~+ W4 z
  3. - x9 x0 V' J8 Z- P! A
  4. $worker = new Worker();# O8 w! v& M: z" ~5 `8 D- W
  5. // 4个进程1 E- P4 c& Q6 V- `- R& K3 A. ]2 P
  6. $worker->count = 4;
    4 o8 Y6 a' l) z; N, E7 q
  7. // 每个进程启动后在当前进程新增一个Worker监听! @2 n: z9 `$ U, I: |% X6 d, P( y! \  ~
  8. $worker->onWorkerStart = function($worker)
    5 g) d7 u) V. o/ T) p2 U6 k8 i
  9. {7 |1 v7 v! d: |4 _* \
  10.     /**
    3 R3 ~+ E; l$ l
  11.      * 4个进程启动的时候都创建2016端口的Worker
    ( F, J5 \4 X  z3 `5 X; t( a0 C
  12.      * 当执行到worker->listen()时会报Address already in use错误
    , S3 m$ c! Q" W! {) e2 a. K
  13.      * 如果worker->count=1则不会报错
    1 L0 O. y" w0 K/ j
  14.      */% z& M$ t1 S8 B$ p/ X: @
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');5 F: O5 l2 X9 c" B, T$ }
  16.     $inner_worker->onMessage = 'on_message';4 c/ W2 q. E! }6 P" a
  17.     // 执行监听。这里会报Address already in use错误# ~" {. C7 N# p+ b# V
  18.     $inner_worker->listen();# A& U2 y7 n( ?' U0 s9 W
  19. };
    9 i$ P" k8 M: Q, f8 v

  20. 5 W. j3 R: }& r% `8 I
  21. $worker->onMessage = 'on_message';
    , v% Y' e6 e: L: W7 h- _

  22. 3 D% o, k' [* @
  23. function on_message($connection, $data). c, N) W, d( I! x, s
  24. {5 b0 Y5 d+ z- v) s3 x  a
  25.     $connection->send("hello\n");5 z$ B6 L  ~+ w0 H' ~
  26. }* e' t! z2 I9 P3 h
  27. : k3 W9 I9 h: G  S( D
  28. // 运行worker
    0 s* _, s7 K4 |# x5 G1 L
  29. Worker::runAll();5 u( X$ v) `  v) x, q$ Q
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    % z1 j% `4 ]5 D% V6 F
  31. # b4 M- S  V* _5 ^( Z" d% v1 Q  _
  32. use Workerman\Worker;# y4 @( T9 G! L3 R
  33. require_once './Workerman/Autoloader.php';0 `. a/ d& d5 X8 m! M

  34. - V& v* L5 T0 @1 P( @$ k/ V: Y, E
  35. $worker = new Worker('text://0.0.0.0:2015');( M9 c0 ^# _# t8 M; A. _/ H' P
  36. // 4个进程2 b: }* @- H( e* V
  37. $worker->count = 4;
    : _7 P3 a- `4 m9 ?: _8 u; S
  38. // 每个进程启动后在当前进程新增一个Worker监听* k. i" p/ T, z* o" E9 _
  39. $worker->onWorkerStart = function($worker)
    ' ]7 f( c( U6 X) a" r' |
  40. {
    . v& k/ v) E- k
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    . v0 j# `4 ~/ m
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    + h( M! \9 g" j
  43.     $inner_worker->reusePort = true;
    / F+ Z9 g1 |' |7 f
  44.     $inner_worker->onMessage = 'on_message';
    1 A$ B, {' B4 R2 N( X' X
  45.     // 执行监听。正常监听不会报错) {9 E1 G# V9 f1 f* h( d! N
  46.     $inner_worker->listen();9 K; E% U) a! B
  47. };# ]7 {3 a8 A, {1 V1 ~1 h/ \
  48. $ h$ H# a$ y+ p3 d. O# A7 U! l- N
  49. $worker->onMessage = 'on_message';
    # G: n' a" a: N1 p' t% @& g1 d

  50. / _0 e& i1 l$ ^+ h+ \) _5 W7 Q/ Q7 ?
  51. function on_message($connection, $data)
    4 M# ]! K) x* F6 x6 j
  52. {
    / ^: F/ b  ]# H8 F- y/ \
  53.     $connection->send("hello\n");/ M$ }2 X8 C; w' o- C, h" E- t7 E5 K
  54. }
    % I9 G3 T8 j; q8 _

  55. 2 T: k" k- _" y% ]! u2 |
  56. // 运行worker* }' ?  ?4 k: r* M# C
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php* f+ k) C! c( r$ W) h# a3 Q/ G
  2. use Workerman\Worker;( Z  s/ \9 [& o$ U; f; g
  3. require_once './Workerman/Autoloader.php';
    * j; J( q8 U( N2 e; ]3 Y& b2 j4 `
  4. // 初始化一个worker容器,监听1234端口3 o+ ]6 G, h) k, g) M4 D' s
  5. $worker = new Worker('websocket://0.0.0.0:1234');8 U. m2 d4 a, E! F; ~/ b- L+ s6 M
  6. 7 ~3 C# U1 {8 e2 ?& M5 R9 b
  7. /*1 n* q" A- Z1 \$ t
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    ' i9 H/ O. a+ K0 W1 F4 `& L0 }
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    $ ?! f) X1 t* ~! D1 x4 y7 L
  10. */
    . T, Y* }+ c  b) c; Q5 B1 E
  11. $worker->count = 1;
    6 N! ^' B# [3 o- `3 p# Q
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口' _4 ?  O' U6 {% p5 A& W+ _. A
  13. $worker->onWorkerStart = function($worker)
    3 z0 P  L3 L$ x, F$ w
  14. {
    1 R* Q/ T. z6 I+ j! A  i
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    0 W% E6 U1 H. D
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    - v8 ~$ S) Q+ `5 h# H0 t2 {, g
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    6 P# `& v4 U7 R, _; G8 i$ O  k$ ~. e
  18.     {
    ) A7 u9 H  n- N6 F3 |4 g5 i
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    # E/ z; @; l; _% H3 V
  20.         $data = json_decode($buffer, true);" Y$ B% u! Y* w
  21.         $uid = $data['uid'];
    / @8 j* t, X" _- v
  22.         // 通过workerman,向uid的页面推送数据1 L! f+ `' R0 e  I- @6 Z
  23.         $ret = sendMessageByUid($uid, $buffer);+ e- v* Z9 [/ @* f: w8 Q9 B3 `
  24.         // 返回推送结果( l0 X1 ]3 [$ u
  25.         $connection->send($ret ? 'ok' : 'fail');
    2 Y2 [0 u7 ~: f3 e
  26.     };
    * M/ H+ M) `# y$ ^: \6 `) V1 [' l
  27.     // ## 执行监听 ##
    9 M) R1 h& r2 Q5 h4 O
  28.     $inner_text_worker->listen();
    4 Z: Y- i1 L9 A7 r3 G; r! C
  29. };8 j6 i5 f' a$ a1 [; ]. u
  30. // 新增加一个属性,用来保存uid到connection的映射
    * j7 n, S3 ~4 p0 o, P
  31. $worker->uidConnections = array();
    * o- }* I  D' F$ O
  32. // 当有客户端发来消息时执行的回调函数( _0 E0 z1 C. X$ C9 B
  33. $worker->onMessage = function($connection, $data). C# r9 C! ^6 U) a& ^- }* y
  34. {6 U6 s% o# q( C5 g  ^  L
  35.     global $worker;
    / n" Q: T: S+ T1 s4 N1 _2 W
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    / H5 g1 b1 {1 p# W0 |" N8 N6 ^
  37.     if(!isset($connection->uid))
    # h3 V7 k4 W* ^& ~, G2 ~' w" [
  38.     {$ j7 {; ^  c* a  l: l
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)$ _+ L/ m$ H% i) j4 W4 R1 e2 g
  40.        $connection->uid = $data;
    # W* [9 y) f% x8 I& r6 l! Y# J5 |
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    3 c* y3 X0 L/ x2 s( x
  42.         * 实现针对特定uid推送数据
    ( _# f( T: i6 s2 h2 b6 x
  43.         */
    , X. X# a9 x  k
  44.        $worker->uidConnections[$connection->uid] = $connection;
    + K: I' K* ]+ D- j
  45.        return;
    0 b4 }$ x1 A, S" P: P
  46.     }
      c1 e. r) l# u8 T# |/ u
  47. };
    % g9 I$ i6 U, A# i

  48. ' C7 Y* \8 T7 }$ E+ o
  49. // 当有客户端连接断开时
    / p- G2 u# S' Z- q8 p
  50. $worker->onClose = function($connection). ^5 Z  z- S( K4 R7 U9 x
  51. {# G- o  s7 u3 |* F8 y
  52.     global $worker;
    6 \6 F( ^2 R- o' j; m: `9 L
  53.     if(isset($connection->uid))
    ! v, g  x9 ^4 e# G6 t) W0 W
  54.     {
    5 |) j" C# i3 }* V& ?7 Q
  55.         // 连接断开时删除映射
    . V% `  @  Q" R, ^) ]+ C
  56.         unset($worker->uidConnections[$connection->uid]);
    8 C" {, x8 [4 v0 W1 G
  57.     }
    . Z8 S% p8 ~5 V6 J# R4 a/ Z
  58. };
    ; v5 `% m# \4 ~, c; P* b
  59. ; I$ ?3 \& k; [- i. x4 p
  60. // 向所有验证的用户推送数据
    9 c  A' \  j* L/ [/ {, F
  61. function broadcast($message)
    / n' K  d/ K. ]& c
  62. {( }; ]: Y7 W' p9 d
  63.    global $worker;
    ! \9 G* R" \. R" D% O! b
  64.    foreach($worker->uidConnections as $connection). \( J. Q$ |  E( \1 t, }
  65.    {
    1 y/ o4 P& G; [8 Q
  66.         $connection->send($message);2 X+ K1 h5 i0 m6 W3 r( n% v
  67.    }) ], K8 \* P. x
  68. }& [  i6 V& d. ]6 [' Q5 e3 ?7 m! L
  69. . T9 Z9 i  P$ H# u
  70. // 针对uid推送数据4 \0 {- k, k; b- K+ _: M: f
  71. function sendMessageByUid($uid, $message)$ k1 s+ E6 f' r# ]& t  x9 B1 y* i
  72. {3 q6 y3 R" H6 N+ \/ j7 s
  73.     global $worker;3 }- W! |0 [1 b, W; J$ p
  74.     if(isset($worker->uidConnections[$uid]))
    7 c" z' A  r( w9 K7 O# L
  75.     {. H# O% |* F1 F( M
  76.         $connection = $worker->uidConnections[$uid];
    * J, w' \$ v' m* ?
  77.         $connection->send($message);( y  T5 M& h" @, C2 K6 R4 B/ v
  78.         return true;
    & i: f, }# j! O) ~# v
  79.     }1 X7 C" j! ]: N2 A
  80.     return false;, ^8 Q: R  p+ R5 j' }
  81. }1 R6 a& @  u) c* l

  82. ! }' J1 U( H, t$ z, a( r
  83. // 运行所有的worker4 V9 x4 n5 s- y5 v
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    5 k7 n9 i1 O5 ]1 i* W
  2. ws.onopen = function(){1 ?2 b' ^7 ^  K0 P) M+ g
  3.     var uid = 'uid1';
    7 T$ e0 I* r6 _& @
  4.     ws.send(uid);
    $ ?" L: n9 B3 K) j3 L; Q
  5. };- Q$ q$ H% V/ P5 i* K" K
  6. ws.onmessage = function(e){
    4 I$ {7 P' n* U5 x8 p
  7.     alert(e.data);6 O0 e  A) ~0 z7 E! k" r
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口' z5 z8 Z+ B$ T; F
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    9 J- u9 \4 V( f0 O1 q+ [% \
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    - i0 n+ x. q1 D
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');0 x# n0 d2 p% |4 Z
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ) }5 Q" ]0 }  |1 ^0 d0 z( a
  6. fwrite($client, json_encode($data)."\n");: U% u  [2 s% i8 c/ P
  7. // 读取推送结果# x( }! G8 M3 |" {
  8. echo fread($client, 8192);
复制代码
0 m& Y1 V# B2 m! @6 Y7 j

" i% \: t6 G, R5 F9 ]* z& K! }
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 14:44 , Processed in 0.058985 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!