您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15072|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;# C0 X$ |* H3 Z
  2. require_once __DIR__ . '/Workerman/Autoloader.php';& J0 T$ ~) @7 o5 C& G" M

  3. ; U; |3 e5 O& n: o$ n: F
  4. $worker = new Worker();
    / F( {% L2 @+ L$ s3 q
  5. // 4个进程1 k$ l  I) F4 ]: Y
  6. $worker->count = 4;
    8 w- e% c  L: e6 ]
  7. // 每个进程启动后在当前进程新增一个Worker监听7 @- _) o0 ?; ]5 G6 t  f
  8. $worker->onWorkerStart = function($worker)) J/ k0 H  i  g4 `; K9 W
  9. {
    ; g) V6 i- @! {
  10.     /**
    ) M; O6 p+ H9 D6 f! |. J1 H' U
  11.      * 4个进程启动的时候都创建2016端口的Worker8 Z. Z6 M1 g- }- h3 C
  12.      * 当执行到worker->listen()时会报Address already in use错误
    9 y1 E4 S4 L# P& i8 V- v* H9 P
  13.      * 如果worker->count=1则不会报错6 @, k$ }0 v$ G3 u6 r
  14.      */
    - w4 W! A; g! C6 W7 m3 _, v2 \  S+ S$ r
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    3 L" c' x- k# P" J# [0 ]
  16.     $inner_worker->onMessage = 'on_message';
    9 F3 L- W2 P$ m/ O
  17.     // 执行监听。这里会报Address already in use错误
    1 q6 N( F- j; [2 Y6 I
  18.     $inner_worker->listen();
    2 R- a8 E5 W+ a* Y% t
  19. };
    8 A: f3 F" a4 n
  20. , O& v4 i' g1 C; {' T" V
  21. $worker->onMessage = 'on_message';' [$ }9 `3 [$ y* h1 B0 A) c
  22. / V2 m+ H; c  _0 ~
  23. function on_message($connection, $data)
    0 m& z, ^" I2 e6 u$ S/ T
  24. {0 i" x4 o: [; T# ?8 ]% |6 l/ k
  25.     $connection->send("hello\n");2 w8 R+ f- M% |+ D5 f5 u
  26. }
    0 Q% H# [$ r$ H% D! d

  27. 8 Z) I) `! m! b( m9 H
  28. // 运行worker
    : z2 W. [3 f, Y* K2 G
  29. Worker::runAll();
    0 ]' I$ l) @3 E4 _9 w2 F
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 }: U  f" B* n! b% q* w
  31. & I) o9 R& p' x1 d: t5 U6 m
  32. use Workerman\Worker;# l2 a* @: U  N$ d( J
  33. require_once './Workerman/Autoloader.php';" I: i2 X: i% t/ k& B! V

  34.   P; M+ Y* n/ I( ^& E7 T
  35. $worker = new Worker('text://0.0.0.0:2015');5 K8 K3 m" _% ^2 u! I
  36. // 4个进程! ~8 u, B. w3 c8 c3 I
  37. $worker->count = 4;
    + _( ?% D" d  K! ]3 q* v
  38. // 每个进程启动后在当前进程新增一个Worker监听$ X6 H0 K5 ?' z% w
  39. $worker->onWorkerStart = function($worker)
    ( U! |: `% x( X; h
  40. {5 A& K) l+ v4 |6 N& B1 b
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    + x/ Y+ H, N0 x2 ?# `- s
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    % G" w/ B" m# N. `2 n  t( r
  43.     $inner_worker->reusePort = true;
    - H& D) |6 W# m3 i2 y; |/ l
  44.     $inner_worker->onMessage = 'on_message';' Z, _5 v6 W. f. C5 ^( O
  45.     // 执行监听。正常监听不会报错
    : _1 A6 I) c7 o1 y# |/ y
  46.     $inner_worker->listen();$ P! y: q. J$ L1 q. c1 R) o
  47. };7 A. R* r2 }1 [  S  V- i
  48. , r% ~% I- u9 {2 a
  49. $worker->onMessage = 'on_message';
    * W. e+ f+ E, }# Z  L! I
  50. 7 E6 w; y% ]  h
  51. function on_message($connection, $data)0 _7 {3 |/ R" s. O
  52. {" ?" g8 [5 N7 s! A9 h+ J
  53.     $connection->send("hello\n");. H. g0 {+ o0 n0 S0 i* ]
  54. }
    4 c! I- M( F" s( R

  55. 8 C, I; k% |1 A
  56. // 运行worker* |, J8 w: U% R' \+ {- p
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php! w8 {; g% R- R) C; `$ Z  l
  2. use Workerman\Worker;
    5 ?( c' D" D$ o# `& L0 v* B
  3. require_once './Workerman/Autoloader.php';
    6 W6 @! U9 k4 Y5 \  J* A* ^+ B
  4. // 初始化一个worker容器,监听1234端口
    + _2 W( U$ z8 K" s1 ~+ T
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    3 s" ?3 o; H% v2 D: g
  6. * K; f6 K4 t& {! n1 W' E) F; n% D
  7. /*- A7 d* I/ Y: X$ h9 }7 L3 K/ e/ f
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    6 @. R7 i( A' P# B4 Q- R- e
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    1 }; O# N( q4 t0 r  W! x
  10. */8 `4 J  U4 x; m+ Q% J$ \- {" I! v+ w
  11. $worker->count = 1;
    % T2 C. P/ V  c  \) h2 r. L1 p
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, d  J3 x! Q. o
  13. $worker->onWorkerStart = function($worker)
    " U& a: }! y  n+ N& G" Y( Q6 \
  14. {( B: G! g; Y, z
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符: M2 f) ?2 A0 j/ U* a
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    8 h: D! [% t# R  O3 F. m3 {* i0 e
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( |$ \7 s! R) ]- _
  18.     {5 [* _  m7 c* \: U+ }
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据# l8 E9 K& V. {# r
  20.         $data = json_decode($buffer, true);+ D/ G) H, M" q* Z: O
  21.         $uid = $data['uid'];
    5 b/ ^; L+ J0 R) o( }5 B8 s  h* Z7 P
  22.         // 通过workerman,向uid的页面推送数据' }. j, I! \0 k+ J% E* Q
  23.         $ret = sendMessageByUid($uid, $buffer);
    6 G7 J& x$ c1 z; a$ V! E* A5 G) k! Z. ^5 h
  24.         // 返回推送结果
    5 k0 B2 E7 ?' F4 ?
  25.         $connection->send($ret ? 'ok' : 'fail');' t/ j: l: ^% c% J, O  }- s
  26.     };
    ; ]6 M( e; r) c7 b8 J
  27.     // ## 执行监听 ##) Q4 `8 [- l' s, T
  28.     $inner_text_worker->listen();
    % }' `* N9 e7 i! E" J: u) z- x
  29. };6 {2 o) a6 _9 A9 W
  30. // 新增加一个属性,用来保存uid到connection的映射
    " I! h, f# M/ j3 ]
  31. $worker->uidConnections = array();
    9 J) k- Z5 P$ M& S
  32. // 当有客户端发来消息时执行的回调函数
    2 c5 B- f* \+ b% \* \- o) R
  33. $worker->onMessage = function($connection, $data)- |6 \* |! x7 v: ~. A) S
  34. {- Z1 D3 v1 n8 ~; S+ p( k
  35.     global $worker;$ S5 t: b2 ?; N& p$ T4 P) S: g7 P
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    1 P" ?; W* p/ p) t* ]6 H
  37.     if(!isset($connection->uid))" b( j9 v( g5 D) H/ y, c4 T8 r
  38.     {( @$ ?' R2 A6 |4 k" G# L
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)! I" V$ j9 H- H$ F" F% [
  40.        $connection->uid = $data;* x# ?1 r: C% N# {, Q7 `1 [
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    + u: v7 m* F7 _! V) ^: o% A
  42.         * 实现针对特定uid推送数据/ q/ P9 P2 j6 K
  43.         */
    # O# \0 \5 M9 Q" L- [7 R' w
  44.        $worker->uidConnections[$connection->uid] = $connection;8 o  J+ q' B) q0 [- d, g  X
  45.        return;
      \9 j% T* u1 L) H) M
  46.     }
    0 q, B0 o# [( d
  47. };
    / y8 K9 @  H( F& h- v

  48. & [! ^- u7 c& N  ~# R
  49. // 当有客户端连接断开时( Z9 Z9 ?2 i6 ^) `3 n+ t4 H) m+ X
  50. $worker->onClose = function($connection)+ k$ `6 W9 Y+ j) P- j
  51. {- c8 T: v$ U# l- m
  52.     global $worker;" a2 e4 e  @* v1 X
  53.     if(isset($connection->uid))# D, F( a8 _! `
  54.     {
    - g9 V3 R1 a7 I% c
  55.         // 连接断开时删除映射" _6 P& @* o  l% Y, L
  56.         unset($worker->uidConnections[$connection->uid]);/ Y; T) U; u; E2 l$ S
  57.     }
    * g7 R# Y, h5 l+ U8 {
  58. };; H% U, {) M6 S9 h7 \8 p
  59. " A, u; w% D$ H
  60. // 向所有验证的用户推送数据8 A# [- d/ `+ _) N. h( I5 `
  61. function broadcast($message)
    * [- k. z# e: C# L3 a$ H& n5 s
  62. {! `# ?2 k2 I; c, v0 Q
  63.    global $worker;
    # o$ |4 P: }" `, x) l
  64.    foreach($worker->uidConnections as $connection)
    4 I) ]# Q0 g6 U$ Q0 ?
  65.    {
    " m7 }- T. i2 |& d. r
  66.         $connection->send($message);/ m( v/ A, w% P: {3 E1 z5 d
  67.    }7 a0 s! d( H8 B7 y, `
  68. }0 v2 l/ C' H6 P  l$ i8 Q( j
  69. ' k1 i. [& X, _6 e) s* A
  70. // 针对uid推送数据. c: F; v% V- i9 N1 r
  71. function sendMessageByUid($uid, $message)- y4 q8 F: n- i- o' ^) n
  72. {
    8 f+ a# S7 X+ R9 b
  73.     global $worker;8 h7 w9 s. x' N, B2 j! o
  74.     if(isset($worker->uidConnections[$uid])), Z3 `/ f) o/ U8 H& \
  75.     {
    ! S  _# P2 i" Y8 @, {- g- }/ u
  76.         $connection = $worker->uidConnections[$uid];
    ; P( ~8 q$ r) `" a, B1 D
  77.         $connection->send($message);
    7 s9 c$ {1 {& n7 v( ~" |/ P
  78.         return true;$ A" j: C* o* P2 S1 `' J# i; M
  79.     }
    # H$ z7 c0 s4 s+ k1 g7 {& Z
  80.     return false;. c3 k; f- h% e% L: U( V
  81. }
    / @; O: g; Q  h; Z! @

  82. % |! Z) ]9 O% c$ C  ~
  83. // 运行所有的worker
    9 Y+ w% j2 x6 c' s  X
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    * m: H/ s# T2 ~" ?
  2. ws.onopen = function(){
    9 T$ T1 h+ \3 d- A* ^0 S2 p
  3.     var uid = 'uid1';6 O$ u% |7 |0 Y9 r
  4.     ws.send(uid);
    9 ^: R$ t+ ?/ W2 s/ D' O5 q* G+ A
  5. };1 x' X; K. L" p5 X4 t2 _
  6. ws.onmessage = function(e){( H9 h, y% z2 q' q5 P2 H) I
  7.     alert(e.data);
    ' f5 E9 H, H8 N- h) h( k
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口7 o! C% E7 J: H
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    / }& Q' D/ X3 [: |
  3. // 推送的数据,包含uid字段,表示是给这个uid推送% o: R9 `% j# o5 M+ E
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ! d' O) u- U+ N; ~- Z
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ) h9 x1 ~  Z9 H) r% t  f% t" M$ D
  6. fwrite($client, json_encode($data)."\n");) `% L. P- n/ E4 r
  7. // 读取推送结果" [7 L% \/ Q/ Y7 I3 c8 N9 u) _" \
  8. echo fread($client, 8192);
复制代码

9 v2 C6 r& }9 k% Y- i$ y& T+ d7 [$ b  _7 w3 P8 C# \+ ^
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-4-30 21:45 , Processed in 0.064848 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!