您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15070|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    " ^# i0 D9 |9 a+ j
  2. require_once __DIR__ . '/Workerman/Autoloader.php';3 z  M+ W: p) H' A7 z$ U; I
  3. 5 C  c$ J& x+ j; k( H" t
  4. $worker = new Worker();. R6 u4 a" t: ]& p* j7 L& ^# E
  5. // 4个进程
    4 A! |& R% v! q5 i$ L/ T
  6. $worker->count = 4;
    - D7 l/ l2 M7 |+ z% c
  7. // 每个进程启动后在当前进程新增一个Worker监听
    1 [$ l6 ?, E2 }1 O; f3 B
  8. $worker->onWorkerStart = function($worker)
    ; O1 X% u; C5 R
  9. {9 W- h8 x7 b9 j5 R, h0 y" b
  10.     /**& }, h, Q" C* a9 n
  11.      * 4个进程启动的时候都创建2016端口的Worker
    , Y2 s+ |, K* |
  12.      * 当执行到worker->listen()时会报Address already in use错误
    . m4 e$ ~. X5 S* X
  13.      * 如果worker->count=1则不会报错
    0 d: M8 Y. S* M# ~" p1 ~
  14.      */) X) X' `7 Z+ n3 F9 e! J) v
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');7 M% r8 k$ J4 t7 J4 m
  16.     $inner_worker->onMessage = 'on_message';
    : f+ c* G2 l' Q- B: q  ]
  17.     // 执行监听。这里会报Address already in use错误
    : V* _7 f! k4 i( ]8 g* S6 ^0 ]0 D
  18.     $inner_worker->listen();- V. \! S$ j! V/ u. F) x( ]1 k% |4 w
  19. };
    6 j! C& E) s! m; t$ U% Y- l

  20. 7 e" j& b, ^2 K- q+ a# A/ L
  21. $worker->onMessage = 'on_message';
    * x2 ?* H2 ~- u4 g

  22. % l# a& K; e9 i6 m
  23. function on_message($connection, $data)
    8 ~( ]" D3 L6 P% ~: d# {
  24. {% T  a3 u5 Z- I/ ]1 [) G
  25.     $connection->send("hello\n");9 z$ R( N: e, t% f. s% D7 v
  26. }
    ' r3 L0 V% m1 n+ ^, V

  27. 1 S7 e8 E! ~# l
  28. // 运行worker+ _) P: f$ e& J7 \
  29. Worker::runAll();
    , C# H- n; {- E9 Z. ^! G9 w
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 ?3 K5 t; X2 E% b
  31. ' h# v1 l1 S/ H) n9 s4 U
  32. use Workerman\Worker;
    0 r: _4 Q2 x3 N0 @4 B- z. j
  33. require_once './Workerman/Autoloader.php';# R+ A7 d0 d4 }' i
  34. ' b- z2 |1 f+ D; M: ~/ h
  35. $worker = new Worker('text://0.0.0.0:2015');
    * T' V3 S8 Y! N
  36. // 4个进程
    , w* }  |0 J$ ~: P+ j- i% t
  37. $worker->count = 4;
    8 l' U; u; n3 E) S* p/ Q
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ! D% o8 C0 w0 m* l* k8 n. `
  39. $worker->onWorkerStart = function($worker)4 N3 H* l: |% X5 Y4 \5 I: V0 v
  40. {
    3 b- J7 o# s& z( \
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    : v' u6 Q5 j6 u+ n& ~0 J
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    $ G% M- X( m* g) f7 _0 R7 z
  43.     $inner_worker->reusePort = true;7 p' H, P9 ^# e" s5 Y# ]3 J! D
  44.     $inner_worker->onMessage = 'on_message';
    5 v. d7 B/ ]& Y' S/ W
  45.     // 执行监听。正常监听不会报错; v( J4 J1 ~/ y1 M
  46.     $inner_worker->listen();
    - t# c( n# C6 M; l' ?) O6 \, I! X
  47. };* H; ?3 O3 J( j  S" M

  48. : O  e0 n2 ?- b( I3 u' V
  49. $worker->onMessage = 'on_message';
    $ X( Y9 L0 Y. C: r

  50.   s# E; d0 ?& f* P0 N! V  [
  51. function on_message($connection, $data)
    ' e1 m) u9 z1 B$ P* p4 A4 r
  52. {& Z! u# K0 R5 G$ g4 [! \4 N) O
  53.     $connection->send("hello\n");
      `: v$ H9 F8 D5 R5 T
  54. }' Z+ T8 q% B% J$ {# u
  55. # b" F, x" S: q4 O
  56. // 运行worker  l$ B$ `3 U. R7 a/ x
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    : C8 A' B" S- @; p+ t: F7 u3 X
  2. use Workerman\Worker;
    : l  r+ E* Q  ~  P0 ~6 l
  3. require_once './Workerman/Autoloader.php';% i8 ]+ l8 o1 c# L4 O
  4. // 初始化一个worker容器,监听1234端口4 a+ D# R+ _" b  \% d2 W
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ! L; q- r* m! K- u1 V

  6. ; w2 c, M8 L/ Q5 @0 c
  7. /*0 k* r7 w+ z. p2 [# l$ q6 s2 e
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误* U# O" P# U7 |. X
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    + u8 e, }1 }* p$ ]  m
  10. */; i- K! |. l8 z" Y/ a8 D# M
  11. $worker->count = 1;" t& L' O& d9 D2 c3 p: ]
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
      W  s5 _; Q$ d: v5 M2 x5 w  n
  13. $worker->onWorkerStart = function($worker)
    0 B( a% X* e: T" u, u+ [
  14. {6 |" ~! j/ M8 z8 e; R" U
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符; A5 E  ]! D& G' W
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    ! A& S. G+ C2 ]' n) k: a
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    8 y: J! @4 V; S: a; P
  18.     {
    $ V0 E' L6 B# ~
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据- S' k$ n! ~8 ~! ^, G
  20.         $data = json_decode($buffer, true);, b0 I4 R' \- q" {8 O. B/ R" t6 W
  21.         $uid = $data['uid'];  E$ _4 t  u; O0 b( x
  22.         // 通过workerman,向uid的页面推送数据
    5 t$ i8 ~6 X1 Z5 n5 c* g, n3 m8 s# w
  23.         $ret = sendMessageByUid($uid, $buffer);
    ' u# {( A0 S$ }& Z$ ?: S
  24.         // 返回推送结果8 |4 @9 t) k1 a& P1 V
  25.         $connection->send($ret ? 'ok' : 'fail');8 e8 Y: M/ h, f! F6 `8 ^, q
  26.     };
    / r  [/ o, K9 D3 L# r' ]2 y
  27.     // ## 执行监听 ##
    0 F, J) M' o& ]5 ?. f
  28.     $inner_text_worker->listen();
    % A" B( m5 S6 ]6 l+ v! |% N
  29. };; u" f" b: G1 r
  30. // 新增加一个属性,用来保存uid到connection的映射
    " E( {5 X) s  b  ]
  31. $worker->uidConnections = array();8 L& B8 ~1 h" X5 ^5 e9 [( |
  32. // 当有客户端发来消息时执行的回调函数. F3 U  C. M2 }0 q3 Y7 H, b
  33. $worker->onMessage = function($connection, $data)0 Y6 ]; e! r9 Y5 S
  34. {
    2 W7 W& c& P6 g4 g5 E
  35.     global $worker;( x1 Q0 i7 ~; ?, y9 f) T/ @9 r  z
  36.     // 判断当前客户端是否已经验证,既是否设置了uid+ n3 |2 ^: f4 _6 e5 S, [
  37.     if(!isset($connection->uid))5 \! t- [- Q( X
  38.     {
    - n/ A0 i+ C% O# O
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)4 ^% v9 z. |7 r6 @
  40.        $connection->uid = $data;! l" E: m) D% P5 ~
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    ) ^) K9 `/ m, }, P: v& J; L/ ^1 u
  42.         * 实现针对特定uid推送数据
    4 e  Q1 I! u7 k" ?6 ^
  43.         */
    5 a0 E5 F7 B3 N; p! v# j
  44.        $worker->uidConnections[$connection->uid] = $connection;
    1 l6 L6 z- e  ~" `
  45.        return;) V! D* b. Y5 j9 g" K" n
  46.     }
    0 w$ B3 H; i! e( L1 j
  47. };
    ( d: L- ]: a, o/ n5 c% M9 T7 n8 Y; \

  48. 0 Y4 m5 ^+ x- t( d$ o1 k2 ?
  49. // 当有客户端连接断开时* J7 t- k% F' o5 u
  50. $worker->onClose = function($connection)
    8 p" {  n; T8 r. T/ Z' M
  51. {' }" }/ ~$ r! H+ q+ u8 N) d8 c$ {
  52.     global $worker;  `% q( m4 Y) u  t7 b( A
  53.     if(isset($connection->uid))
      o* o8 y0 x# P3 w
  54.     {
    % W4 |' n% i  @. m& x  A
  55.         // 连接断开时删除映射4 n. J+ m: X( u8 y
  56.         unset($worker->uidConnections[$connection->uid]);* M6 ]5 z$ x" }  \
  57.     }1 [6 l' ?' L! B2 N
  58. };
    ( P8 U9 B$ d) R. b
  59. ( z. o, z3 S; |& ?. j# @+ ]' d6 Y
  60. // 向所有验证的用户推送数据
    + c* Y; T+ e3 M3 f+ V; \
  61. function broadcast($message)! F0 {3 A" C8 j  P; b! c! `3 g
  62. {2 T5 Y' \/ ^* c9 H, m7 B& n" \. P8 K
  63.    global $worker;. |+ v$ N0 V  B9 w# l) v1 {% O
  64.    foreach($worker->uidConnections as $connection)' p4 w; Q! F* u% ]7 j
  65.    {
    1 h0 r* d/ [9 ^
  66.         $connection->send($message);8 r5 i- _+ L+ F. M$ O
  67.    }0 o3 o: @5 o8 c" R& ~
  68. }
    ; J& q. D) M& A
  69. - g: v3 T0 p1 u! u% j
  70. // 针对uid推送数据: C; x0 x, W. W$ k, G
  71. function sendMessageByUid($uid, $message)
    2 C: Y4 N5 @# A1 x- b- r
  72. {
    * m; T( U# i& r& ~9 ]" b8 {& X
  73.     global $worker;
    : {, x! n$ m1 r/ b
  74.     if(isset($worker->uidConnections[$uid])). a& J3 S8 U' g% T+ b' i/ R
  75.     {
    3 V  F; S) h3 d5 j) Y
  76.         $connection = $worker->uidConnections[$uid];% E: C& H- `) Q8 A& [0 C
  77.         $connection->send($message);& W& F  H' u" w  M+ l7 @
  78.         return true;
      K# ^5 `" ?8 l# X
  79.     }
    * g4 J. k+ {* @6 }. R! v5 O0 P
  80.     return false;
    5 k, }) I* V0 E1 U
  81. }
    # `6 U" G: N6 j- {
  82. 1 B* o) o0 T9 B$ Y& T4 s9 ^9 q% i
  83. // 运行所有的worker6 I4 w8 `7 D' N9 P. t* [
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');# M6 n2 b# k8 ?, p2 P6 ]* k) c
  2. ws.onopen = function(){
    $ j4 u7 w4 q/ |: D6 ]! A- z
  3.     var uid = 'uid1';
    7 D$ F2 t' ^) r5 b
  4.     ws.send(uid);6 _, c* o/ _5 N2 P
  5. };( c; A0 H8 C, G8 G6 N
  6. ws.onmessage = function(e){
    " J5 o+ ?8 N. {7 e. b1 G3 `0 w& [8 k
  7.     alert(e.data);( x, v4 i) D0 W# w- J- ?9 [
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口3 s6 r& ^0 q" H& \8 D# C
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);  b/ \/ o4 l# }* L
  3. // 推送的数据,包含uid字段,表示是给这个uid推送4 o3 h% a1 S, x; c; Y! ?. s
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');9 p5 ]! g. j) z/ Q5 p! o! q6 j
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符- `  j$ F5 _6 _. q+ K
  6. fwrite($client, json_encode($data)."\n");% n! K* q! A. M$ X
  7. // 读取推送结果
    ' ~" @8 I  F. a; h
  8. echo fread($client, 8192);
复制代码

, i9 f6 I( d1 v; ~
7 H8 H# o9 b0 |* H3 ]
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-4-30 20:52 , Processed in 0.059306 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!