您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15068|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;7 Z* m" V0 T2 A$ K
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    - o( ?8 U3 X+ {3 d1 B0 ]- H
  3. 6 I4 l; i" h% Y$ H; B+ y3 D
  4. $worker = new Worker();/ t( q) a; E4 P( |2 n% P
  5. // 4个进程
    . d. q& ?% y9 `9 V
  6. $worker->count = 4;
    2 P4 M& Z, Z& b/ ?
  7. // 每个进程启动后在当前进程新增一个Worker监听; R& {& ^" P% \) w- z
  8. $worker->onWorkerStart = function($worker)
    $ z/ ~& q2 d, B: n# |' l  N
  9. {
    - J  o% T$ `/ w" k
  10.     /**# I7 G9 l2 e, o
  11.      * 4个进程启动的时候都创建2016端口的Worker: x3 m- U! C4 j& U* ]2 D
  12.      * 当执行到worker->listen()时会报Address already in use错误0 u4 i" j- y, s  g$ N* s3 J
  13.      * 如果worker->count=1则不会报错! q% N3 U% D9 D' U# G
  14.      */
    8 b0 e/ Z6 K) `: s$ \
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    8 i9 C3 J( A* {, x
  16.     $inner_worker->onMessage = 'on_message';
    # [' C: Z# Y6 ]7 @8 H  ?
  17.     // 执行监听。这里会报Address already in use错误
    " u+ m5 I; e2 Z( V1 U
  18.     $inner_worker->listen();
    7 G2 F, X( Y- P9 Z% \' \7 N
  19. };
      |2 s- Z& n, @+ |1 A4 z' O- ?

  20. ( T  p: X9 o, q9 R
  21. $worker->onMessage = 'on_message';
    : @% @- ~  `0 o8 [5 k
  22. , t. v; k8 `& ^! z
  23. function on_message($connection, $data)
    # m$ M$ B/ ^" }4 {# \- |! h8 z
  24. {
    ( `( K# t/ [" S) E
  25.     $connection->send("hello\n");
    - i+ P! I3 E+ s; \  b
  26. }8 Q" l; y8 H" j3 S# b
  27. ! h. m. ?" Y3 G% l
  28. // 运行worker. q  W% a6 e' Q- ?2 w: {6 J6 c  ^
  29. Worker::runAll();
    9 b6 G0 \$ E# v1 w2 {5 d* K1 M5 E
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    - C# Q5 |/ @- }: X

  31. + U+ d- Z* x2 q; O4 H
  32. use Workerman\Worker;
    : j3 q( B' S9 d7 a, L
  33. require_once './Workerman/Autoloader.php';
    9 P# P4 }5 S/ Q1 x3 I6 W! d5 D6 _
  34. ) b7 R1 U* {  N( ~4 }4 i& B5 k8 x5 G* o
  35. $worker = new Worker('text://0.0.0.0:2015');( B3 x8 N7 e7 B& Q6 H# v5 S$ T
  36. // 4个进程
    / o0 }9 x4 p6 ~0 T
  37. $worker->count = 4;
    + o: @9 K7 M4 l( G
  38. // 每个进程启动后在当前进程新增一个Worker监听9 o+ g6 ]& Z+ i* ?. m
  39. $worker->onWorkerStart = function($worker), |% A$ t% [$ r
  40. {
    3 ?( d7 r; V, y( }
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    & r5 n& j& v, Z$ `8 R
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    6 y) P" f* [% `% f9 j/ e/ C
  43.     $inner_worker->reusePort = true;- ?7 T4 C7 Q; @5 L
  44.     $inner_worker->onMessage = 'on_message';
    5 y% u6 k8 h9 g3 @& k; u! ]
  45.     // 执行监听。正常监听不会报错
    5 S, _7 H; w8 z6 ?9 d, L" D7 M7 [
  46.     $inner_worker->listen();( S  `! ~9 _/ x# T/ s+ |+ G! ]: p" S
  47. };
    5 @5 ^4 N9 Z% {
  48. : Y1 ^; H0 M2 R
  49. $worker->onMessage = 'on_message';
    , ~0 C- L) z, y  A* q& q
  50. 9 J) J# @+ T; v/ A4 [* W: c
  51. function on_message($connection, $data)/ c* \. e% r1 P! e9 v- e
  52. {
    5 h3 T# y6 k9 A/ A
  53.     $connection->send("hello\n");1 `% I" O4 g7 B+ u. v) X+ n$ g: M6 V$ Q
  54. }! w2 N% y: ]. Y: q& v5 v1 @7 [

  55. $ Q7 ^- E. t7 Q/ k( J; ~0 a* h/ u
  56. // 运行worker
      T* x$ x# d9 Z8 H( V5 W) G
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php& M( u9 J: J) m4 k7 x: y5 b/ n
  2. use Workerman\Worker;& V4 N+ e! |7 o0 x2 [3 ^0 J
  3. require_once './Workerman/Autoloader.php';1 t1 ?# B! I- i
  4. // 初始化一个worker容器,监听1234端口! C# L8 H$ h* f/ m* ~) I
  5. $worker = new Worker('websocket://0.0.0.0:1234');3 ?$ _$ I/ j& x$ C) h
  6. 2 j% F- U+ X% m  R8 M* x4 F
  7. /*+ w3 \) J2 e0 {- _( ~, l! C0 m
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误' Q5 Q. R" l% K8 `
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    8 n  n. B  y- a
  10. */% s& d; x6 O! Z- n3 D  E# E* F
  11. $worker->count = 1;
    3 F9 `. x! d( J0 }* E1 `6 F( [
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    # a: N) V: A& D, O
  13. $worker->onWorkerStart = function($worker)
    - X" t4 Y# g" r* m6 f6 |$ g
  14. {5 p' ?7 d( R" Z2 j7 w; W3 L6 k
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    2 J# g; E: e; t9 F' \0 l  m# s
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    2 H, g! [3 B; `0 M
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    # E6 l. T; R/ R5 a2 T# Q- d
  18.     {
    9 y' U  i+ x1 b9 V( }
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    ( d- Z7 f4 u7 J) A$ n
  20.         $data = json_decode($buffer, true);9 I  j( n" q# R+ p
  21.         $uid = $data['uid'];
    ) H) L# v/ X9 u& H7 s- X- g# H1 h
  22.         // 通过workerman,向uid的页面推送数据' h3 Z$ d3 P1 x6 d! ~' C
  23.         $ret = sendMessageByUid($uid, $buffer);! m# I$ }9 {9 ]9 _2 {0 A& _# o
  24.         // 返回推送结果1 V3 H) {7 C% V* a7 ^
  25.         $connection->send($ret ? 'ok' : 'fail');
    ! w6 X% W+ W: n+ v+ T
  26.     };# u; e# c0 s- }: B+ X
  27.     // ## 执行监听 ##3 C1 z1 x- i% T
  28.     $inner_text_worker->listen();( K( O- z3 V& i. U, h2 O
  29. };
    * d5 }1 |' x; `1 j
  30. // 新增加一个属性,用来保存uid到connection的映射; n* |# ^6 E0 W! d' {
  31. $worker->uidConnections = array();
    ! ^/ v* w- c. e/ u; D$ E0 p
  32. // 当有客户端发来消息时执行的回调函数
    8 d! S% u, f+ b
  33. $worker->onMessage = function($connection, $data)
    1 u4 e. n1 w3 i$ c
  34. {" |9 |2 Q; ^) l+ x8 P4 k* l
  35.     global $worker;; K# x( y  G, [2 |2 E/ ?! K
  36.     // 判断当前客户端是否已经验证,既是否设置了uid! l( g6 s( d+ E' ]
  37.     if(!isset($connection->uid))
      K# F) U2 [/ \9 V1 N
  38.     {
    * g  R3 p$ z: o4 Q8 y$ i
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)  {7 @$ Q9 h3 [1 |0 h' b
  40.        $connection->uid = $data;
    0 I& g* W% m8 O& v* o. P
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,1 N- u9 g# S) w# P( b5 P
  42.         * 实现针对特定uid推送数据4 W/ a8 Q$ O0 _. Z" g5 ^
  43.         */
    , k* K2 \, w/ G" C- X$ X
  44.        $worker->uidConnections[$connection->uid] = $connection;
    # t0 Z, e1 r! V* S
  45.        return;
    / `: ^4 v! X8 @( @2 \9 W% y' B
  46.     }
    3 ~: v+ O+ f6 l& I& V) T( [) j
  47. };
    ! ]8 p6 S( X5 _4 h, q) ]# n
  48. 4 W% o4 z3 X0 J1 s# B8 g+ `, |
  49. // 当有客户端连接断开时/ D9 ^. q' q5 |* H+ W4 G- U
  50. $worker->onClose = function($connection)
    : X! f6 O) I# |7 y0 n& M
  51. {, Z/ s+ S! K  T/ E% }. P7 J
  52.     global $worker;2 j2 q. P  Z! {8 o+ k
  53.     if(isset($connection->uid))
    # j0 i9 D* A- j
  54.     {, p# `: y, K- c' l
  55.         // 连接断开时删除映射
    : X& W  O! O( \2 d
  56.         unset($worker->uidConnections[$connection->uid]);
    6 e: K1 V0 C& W- z
  57.     }: T2 P" ~$ {7 ^0 ~4 Z0 P
  58. };
    " t, L  a; s/ V0 X- k/ Y

  59. 7 L9 P  A/ v9 p
  60. // 向所有验证的用户推送数据0 n% m( y9 F: j4 W
  61. function broadcast($message)
    + W" W% L: ?" K* c
  62. {5 {1 u& H9 m+ D- p+ ?( E0 P: P/ `
  63.    global $worker;7 S# x0 j7 W6 v% A+ k$ F* ~% ^
  64.    foreach($worker->uidConnections as $connection)& [" m7 G! K. {5 A) @  C
  65.    {
    5 B  m. U; ?, N2 g! c" ~$ j; H
  66.         $connection->send($message);; o! H6 d. t1 f0 y. Z+ m
  67.    }: w. F: M0 b) f" f8 ^
  68. }/ }1 ]; x$ n. y& ]) h
  69. 5 T' J) y) K! I* ?. r( _0 B
  70. // 针对uid推送数据3 N* J0 L+ F( K' [! F; t! F
  71. function sendMessageByUid($uid, $message)/ O* G1 w' q! Z! A; z
  72. {
    $ x3 J0 a2 \6 H" C
  73.     global $worker;! D9 T9 G' c* n/ w! h* V/ s
  74.     if(isset($worker->uidConnections[$uid]))9 l8 b! Q4 S4 W. J2 P6 b" t6 G$ \. q
  75.     {' z% N' n9 v* C3 I5 P" D" M, G
  76.         $connection = $worker->uidConnections[$uid];
    2 }/ }5 W+ N; v& b
  77.         $connection->send($message);
    7 W% I+ Q3 D$ L1 s5 T  r* v
  78.         return true;
    ( z9 C/ b, K2 c4 k5 o; _: ]
  79.     }
    ' k' Z1 M. }0 @! A
  80.     return false;+ [) H- u2 B( y1 A, b' f  q
  81. }
    # T# R. m: R  }& z- a
  82. 7 K& r' V) _, `7 L) u
  83. // 运行所有的worker$ [/ Q# W( U4 v( ^- I
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    $ J) k$ b, @% E' \) ?2 T: Y
  2. ws.onopen = function(){/ t; U% v& I7 ?
  3.     var uid = 'uid1';
    ; t3 F5 R/ L* E, g2 V
  4.     ws.send(uid);- d4 H1 Z/ k7 @0 v2 i7 |' t8 l
  5. };
    3 L0 I( V0 A# f
  6. ws.onmessage = function(e){
    1 R$ G+ j; @; V
  7.     alert(e.data);
      a* n8 L: p# B( R% j
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    & M8 r" v  K# t  ]" T
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    7 Q  h. g/ O4 D9 Z" c/ Z& z1 O8 w( H
  3. // 推送的数据,包含uid字段,表示是给这个uid推送, n4 @$ H6 u0 e5 \% V
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');# P9 Y  }: g+ _9 P- x
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ _# p% h% W2 o) o3 F2 ~6 Z
  6. fwrite($client, json_encode($data)."\n");# y: n$ v$ S$ H  D
  7. // 读取推送结果
    % P5 F4 v+ V+ U+ R* }
  8. echo fread($client, 8192);
复制代码
0 i7 g2 J. z( x* A

. `! Z& j6 G' v% ^* F+ d: w
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-4-30 20:51 , Processed in 0.079008 second(s), 22 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!