您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10513|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    3 L- e+ t) j# N5 f9 j
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    6 p0 H8 k9 O4 o
  3. , j% v* q- g7 a: s. x
  4. $worker = new Worker();
    * q9 l5 V: l1 M! Y
  5. // 4个进程
    : ^# j, Q! ^6 o7 O, J
  6. $worker->count = 4;
    % ^) u4 C1 N7 u
  7. // 每个进程启动后在当前进程新增一个Worker监听0 J! G4 B* T* r
  8. $worker->onWorkerStart = function($worker)' |4 P) M' h  `5 `( d+ H
  9. {( C  f9 k$ Q0 y# K3 [* N
  10.     /**
    ! h: I9 y' C2 ^, t: U% t
  11.      * 4个进程启动的时候都创建2016端口的Worker! d: j4 b1 |- Y) V" f9 {- [% o
  12.      * 当执行到worker->listen()时会报Address already in use错误# }0 v6 m4 w; }, @% F
  13.      * 如果worker->count=1则不会报错- }! n2 v9 r. l7 W' l) `
  14.      */0 A) Z- J" t) E$ e8 [: |! D+ j8 |
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');0 m- k+ ?" K2 {/ l: s
  16.     $inner_worker->onMessage = 'on_message';
    ! }' [9 t0 o$ N. h
  17.     // 执行监听。这里会报Address already in use错误: t- d4 ?. t9 ~; X3 c! t
  18.     $inner_worker->listen();# v* K0 m" {5 l' J
  19. };6 I2 U" D6 ~& v

  20. - q! x% Y7 G, F0 f2 ~' c
  21. $worker->onMessage = 'on_message';& k2 A! U1 ]1 q$ x/ a6 k

  22. ! [% O: y6 c) |
  23. function on_message($connection, $data)
    5 a2 ^  t9 K9 g- U% ~
  24. {# B; w" Y4 r1 T3 x
  25.     $connection->send("hello\n");% }, A. t; L6 J) C
  26. }& G4 \! h; S3 u. t

  27. $ }0 B; k0 n4 k% ^' p5 e( b
  28. // 运行worker& U9 b/ s" D* J2 N/ R4 }
  29. Worker::runAll();
    + H3 f1 o( D: b! c! H0 ^
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    $ d/ g* H% ]  i4 k& W5 p' E. G

  31. * Z( p0 a( q6 M& P+ i! ]
  32. use Workerman\Worker;
    1 h' s0 U7 Z2 ^8 }$ z6 N+ D
  33. require_once './Workerman/Autoloader.php';5 V7 z5 t+ ?' [! U* s9 _/ k  a3 U. s
  34. " `; }4 {( i1 B9 Q8 n
  35. $worker = new Worker('text://0.0.0.0:2015');9 p2 z" i' I& \9 s& k
  36. // 4个进程  L% P9 ^! _; ]  f+ _$ W
  37. $worker->count = 4;( n7 ?* g: i  N9 d! D! b, G
  38. // 每个进程启动后在当前进程新增一个Worker监听7 X! W2 Z9 ^9 {% w. _2 e- X( g! H
  39. $worker->onWorkerStart = function($worker)
    - z) q. k, l+ V+ r! I
  40. {
    ( [: D: v# V4 |- [. P0 `  I# T5 g
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');* Z, A" P1 P- G
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)' O" G, O+ h! g
  43.     $inner_worker->reusePort = true;
    ) k, S9 K' k; }9 O& E9 t5 C% J3 x
  44.     $inner_worker->onMessage = 'on_message';
    7 R4 ^5 E1 g& V! Z% s6 @
  45.     // 执行监听。正常监听不会报错
    + O. R1 o+ V/ O# X1 g; ]* H
  46.     $inner_worker->listen();& L0 c3 r: C% A9 v
  47. };
    8 ~+ W# E4 b  k. g0 Z

  48. 1 F3 p; v& E2 b9 [
  49. $worker->onMessage = 'on_message';
      a9 \/ n  V$ E1 O' J- |
  50. / n6 _( b6 q+ {& \! ^
  51. function on_message($connection, $data)! h: q  Q- V! F3 q4 G3 ^4 @3 [& q  c
  52. {
    : I& E2 T/ H' D% |/ H- \
  53.     $connection->send("hello\n");
    . W' h6 n+ {! }- [; J; F
  54. }
    8 H" r1 u8 b% _- {4 c
  55. % Y% N! V# u5 Q0 [
  56. // 运行worker
    ; E( [) T1 U( c. u$ \# K: I
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php# Q0 H4 v/ G8 Y2 b. X
  2. use Workerman\Worker;
    " \) P$ U9 o" U2 x, {
  3. require_once './Workerman/Autoloader.php';6 o  H9 X. B+ u
  4. // 初始化一个worker容器,监听1234端口
    ; x% P7 j3 Z- S" ?+ z# J6 U, Q5 V
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    3 \( y6 I/ N, f1 r& d, S
  6. 2 z; |  a5 P4 J( H: w
  7. /*2 R3 I7 f; B6 a2 Q4 v( E
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误6 {6 W' b$ r, \$ @# @/ ?9 S1 Q+ g9 v
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    ' J/ D1 r* Q- G( ]. U
  10. */
    7 H1 J# U& R* {2 ~5 n  H
  11. $worker->count = 1;. a5 z9 k$ M4 o: d
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    8 I" Q1 F( S, {. Y
  13. $worker->onWorkerStart = function($worker)- f: Q9 N1 p# O% k* H
  14. {
    * @/ d- l/ a8 U) D$ ]7 S
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符& m5 R$ V: k" t2 o) P0 W
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');  n/ d8 t% |" o( m. _8 g9 G: L
  17.     $inner_text_worker->onMessage = function($connection, $buffer)- L! F" F3 ^7 I) @0 K
  18.     {. O. z' ]1 k) q: X7 b; j
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    . k: i0 ?! ^  b6 k
  20.         $data = json_decode($buffer, true);
    % i% `/ T* }* T, s5 s- q6 x
  21.         $uid = $data['uid'];
    ; Y) ?5 C1 t  ]+ g& n" d
  22.         // 通过workerman,向uid的页面推送数据
    3 m2 `7 G1 [8 l2 Y: _
  23.         $ret = sendMessageByUid($uid, $buffer);
    - x+ \  E0 p) T2 J' s  W. s& l7 ~' A6 A% W
  24.         // 返回推送结果
    , C4 q. ]3 H! v/ E! s, Q
  25.         $connection->send($ret ? 'ok' : 'fail');
    # i# ^: \# o- G5 n! H
  26.     };
    7 x- H/ y# x  v# d
  27.     // ## 执行监听 ##
    + w" E) \) U" A0 n5 K" x) B& g
  28.     $inner_text_worker->listen();( P; _7 O  j9 H, o8 _
  29. };: \& }4 R: k- B( U  K  i  P; F6 _$ L
  30. // 新增加一个属性,用来保存uid到connection的映射) d+ ~6 P' @$ P' S
  31. $worker->uidConnections = array();8 N* r, U0 P, K' _2 {' S1 T
  32. // 当有客户端发来消息时执行的回调函数; _+ G% j& a! ?, K$ m1 I1 z0 f
  33. $worker->onMessage = function($connection, $data)+ K- }" c" Z6 X9 c
  34. {
    ' N! z  ~) N; g: a& ^$ R
  35.     global $worker;
    0 h% @: O1 W! R$ X# L9 T  R
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    * |/ A0 d  L6 D% A0 R! v2 J
  37.     if(!isset($connection->uid))" Z  z( g9 A9 i2 g8 e/ ~
  38.     {
    , p; k% G6 _1 ?( J  f: n0 e
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    - E: G; J6 [0 U8 Y5 N6 S$ j% c
  40.        $connection->uid = $data;
    7 T+ l* u0 y, P+ D
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,. ]  q$ c% C0 `, D
  42.         * 实现针对特定uid推送数据9 I* g$ n/ v1 K1 k; M
  43.         */, ?5 u6 h6 n+ ]" |) N$ i- a2 i0 c( A
  44.        $worker->uidConnections[$connection->uid] = $connection;* i5 H7 F; g. ]3 I# t8 ?8 F$ M
  45.        return;9 m8 Q# z* ^0 C# ~! R, c
  46.     }! q# O  v  O+ Z5 l& D
  47. };! q3 l$ A7 z% \1 N+ O

  48. / O0 S2 i: {2 n- V
  49. // 当有客户端连接断开时
    % e5 I7 t( ]6 K6 @( j" H) N
  50. $worker->onClose = function($connection); T* ^3 N3 b* l3 |# ~
  51. {
    - Z! B9 ]+ {. [% {4 Z* F
  52.     global $worker;
    * Q+ k3 k6 K9 C1 t6 v/ Z: f4 u
  53.     if(isset($connection->uid)). w, W9 t% Z5 f" E9 |
  54.     {
    8 W8 J' `1 _- o% W6 ^
  55.         // 连接断开时删除映射. y( y% I! M4 a3 k% h8 v8 Q
  56.         unset($worker->uidConnections[$connection->uid]);/ }: a& g3 @  {4 R
  57.     }5 W0 p/ _) X5 N
  58. };9 t2 s6 a! C3 W6 ?5 q: t

  59. ( `6 p" Y( Z7 J- m( g$ |! q
  60. // 向所有验证的用户推送数据% h' f! f3 }- R$ M2 |$ {& f# Z  @
  61. function broadcast($message)
    7 S+ R; H1 m) X! w
  62. {
    : O6 {* p) K  u# e5 r$ h8 @
  63.    global $worker;
    ) h! Y+ I. V8 {5 M! m
  64.    foreach($worker->uidConnections as $connection)
    ' B/ K% J" G4 u# Q  g- S( J* \# p9 e
  65.    {
    & C' w0 y8 T+ U; \9 u3 R# @- F
  66.         $connection->send($message);
    5 I. i7 L7 J0 K  D
  67.    }
    8 I! i8 z& x# L9 b5 x
  68. }
    5 Y' ?% ]& i8 p- b' ?2 k  J8 N

  69. . o' W3 F1 x; T4 Z9 P
  70. // 针对uid推送数据* V' q9 k8 [; D+ c% y! ~
  71. function sendMessageByUid($uid, $message)' ]' V% c' l) N$ p8 U; `
  72. {8 K3 V* B& G2 Y6 W- I
  73.     global $worker;
    " t( h  d! V" s
  74.     if(isset($worker->uidConnections[$uid]))* A6 `( ?- R7 y6 K
  75.     {- O, W8 w' W0 h6 ~* Q( ]0 o* O  w
  76.         $connection = $worker->uidConnections[$uid];
    ) ?3 \2 Y; o& `0 {$ B- I5 Z
  77.         $connection->send($message);+ F8 V  P/ `) O
  78.         return true;; U8 n* D  `* `2 H) x7 P* x) H
  79.     }
    ( q0 K& s; ^4 u9 T# D
  80.     return false;- U; v/ F1 I4 C: [
  81. }
    " m* S) v8 z$ |/ t
  82. / `( ^3 J4 ^: P3 G3 s' k
  83. // 运行所有的worker
    , d# o1 L, c  O9 D7 C0 t( W; r% D
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');( B9 d# C+ q8 ~" P6 g) K
  2. ws.onopen = function(){
    2 q, s% W9 X8 \/ w! t% G; L( \
  3.     var uid = 'uid1';0 E$ Q0 g2 J1 K
  4.     ws.send(uid);& S; A! ?0 b$ B1 F  j
  5. };
    / e) F1 y6 ?' n# |1 _4 C' w
  6. ws.onmessage = function(e){
    9 w- j  j& f2 p2 L# {. h& V* V2 a* `
  7.     alert(e.data);
    # ~0 A8 r6 L$ M, b3 h  C( c6 r
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口- N, Y6 x# A) J3 z, v$ w) W# K
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);7 p8 T, J7 j+ |, Z8 f/ o7 i
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    6 _1 {1 c- g4 M0 P3 i
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    9 r% h1 O" b' i! E+ w
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 x9 U+ G, z+ O* U' x& n1 H
  6. fwrite($client, json_encode($data)."\n");; L( O) e6 Z* }6 I- l- R6 x2 x
  7. // 读取推送结果
    4 n1 q+ H- m$ W2 ~
  8. echo fread($client, 8192);
复制代码
8 b4 `/ F4 J( Z
/ u/ O7 D4 n' b3 l, V$ ]
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-18 22:25 , Processed in 0.126292 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!