您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14800|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    6 ^, ^8 F! N+ F4 V5 D
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    5 _4 i* O- {5 J) m& B

  3. ! I, [* h# x. e3 j0 f: u
  4. $worker = new Worker();
    9 P/ b' g5 q( r: J; c& Z
  5. // 4个进程
    + p4 |* r' f" T# l3 D, p
  6. $worker->count = 4;
    - o, w& {" j) h9 o( L: q
  7. // 每个进程启动后在当前进程新增一个Worker监听1 q2 d5 |/ Z. C, x: P0 x& h
  8. $worker->onWorkerStart = function($worker)$ ^$ V1 A: n8 E% p* c! @8 U6 ^  W
  9. {
    4 p" k; a) M( K6 Q$ B6 e+ {
  10.     /**1 U- c1 N2 m6 k" v0 D. K
  11.      * 4个进程启动的时候都创建2016端口的Worker$ ?$ s. `$ f6 H& L8 [" O
  12.      * 当执行到worker->listen()时会报Address already in use错误8 w' B- Y  M, D
  13.      * 如果worker->count=1则不会报错
    8 r" u, ^: L1 b7 D3 f, }  a/ H( E
  14.      */# h' Q( b9 S4 e3 Q0 J! H( h0 I
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');; \2 g0 [# w3 R" J- S- w
  16.     $inner_worker->onMessage = 'on_message';
    $ G) l6 {0 R, t8 {: M/ l: Z, v9 n6 w8 s, R
  17.     // 执行监听。这里会报Address already in use错误
    + J3 |+ s" k4 Z+ L1 {
  18.     $inner_worker->listen();
    6 F# D' G  y+ `9 ?" ?9 e, m* n
  19. };
    ! @; z0 h# ]$ N+ m" Z

  20. 0 j( F" a! B& [/ p! ]: C1 s% O
  21. $worker->onMessage = 'on_message';, _0 [0 Y& d- q( E

  22. 6 s2 k5 ~7 S9 F# C; e6 {
  23. function on_message($connection, $data). P6 r5 P" G# c: l7 E2 @& K" i  P
  24. {
    3 i* j# e9 t, \& B4 I
  25.     $connection->send("hello\n");6 `0 O8 _  x$ Q& Y3 ~; L$ i, c
  26. }
    ' _- Z' T, N; u) M
  27. * I! M5 w9 ^2 h4 q: }: d& ^! j
  28. // 运行worker( N2 n2 K$ ]0 d+ E9 Q7 C
  29. Worker::runAll();
    ' e! B1 n+ L" C* s
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 F) ~5 |: o2 a0 F7 [9 W3 |! M$ g
  31. + R9 t/ \! s5 L7 {5 ]' m
  32. use Workerman\Worker;3 o* C  y; ~, s# r* Y0 E- U* V, l" z
  33. require_once './Workerman/Autoloader.php';
    ' F+ r! |" U% m9 r2 z) r

  34. ) Z& z' T7 ]) r
  35. $worker = new Worker('text://0.0.0.0:2015');  ]. ~: E. ?% c& B5 e0 K# p
  36. // 4个进程
    ( q# m. v  \7 D* q* ~" b# B
  37. $worker->count = 4;
    3 a4 r3 ^3 L0 @
  38. // 每个进程启动后在当前进程新增一个Worker监听
    6 E9 i7 I" ?" Q  q0 l8 z
  39. $worker->onWorkerStart = function($worker)
    ! a0 [6 R6 j! X0 q5 Z( A% ]
  40. {
    8 ^2 s3 y6 y" x! q, _6 R+ v, s, Z8 g
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ( q" o; S7 e; n5 z8 {) I' R
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    + _8 u) Q4 R% V& d; r/ A& y. @1 M2 Y
  43.     $inner_worker->reusePort = true;1 i* |, J$ ~9 o, J$ \, P- @6 O
  44.     $inner_worker->onMessage = 'on_message';$ [* _4 n- J6 ]; v# }( Y
  45.     // 执行监听。正常监听不会报错
    : [& Q9 ]. z* [1 o
  46.     $inner_worker->listen();
    * t# \! B$ F% A9 p8 G. B( l
  47. };
    # @* O2 P. ^% R  {) [+ i7 X/ w

  48. " o5 w# L. O% R8 }3 x* ?4 l. f. V
  49. $worker->onMessage = 'on_message';7 u) r, W: a& K& Z( p; ^

  50. * b0 I( p( W7 D
  51. function on_message($connection, $data)- m7 J* r& G0 p: f
  52. {) ~6 V# r; s- v
  53.     $connection->send("hello\n");7 n6 R) x% c+ {) `
  54. }
    % F2 V7 ]+ g  R% B( B" K7 a, d& i) M

  55. ) M$ `6 i. ^" }# b) d  x
  56. // 运行worker( k; Y) |% G9 \4 `2 |4 C
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    * A! f3 P6 i6 a  y8 L) `2 F+ w
  2. use Workerman\Worker;* J, H, g# V1 B7 `7 W, B9 d
  3. require_once './Workerman/Autoloader.php';; b, O+ ]6 h! C
  4. // 初始化一个worker容器,监听1234端口8 G$ T" G1 T8 |
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    0 ]" C$ D5 d1 L7 h1 ]

  6. 6 s; @2 E% z6 L9 F- J3 \/ A
  7. /*
    8 G' ~6 ?6 h2 y7 w' W1 S/ R
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误. v% n" w0 R. @  E: I
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    , M& e4 n. k0 H1 X! p  U# P) c
  10. */% z* T" A% R+ g
  11. $worker->count = 1;
    3 J9 F6 I1 Y( g4 _( y- h; j  y
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    2 Z  ?- r2 U! C0 Y- n# e+ e; M
  13. $worker->onWorkerStart = function($worker); X, u2 d$ S+ A( i1 W' P
  14. {
    & U( j5 B8 z- _5 m) p6 V- e
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    % A" Z  \; ?& M9 Q: w8 w: f
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');; G, K1 W0 m- z2 n) n& |
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    1 h! c: [$ r1 K
  18.     {8 K" w2 |& D5 p, M
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据* R1 x5 H# h. V! D/ ]' L
  20.         $data = json_decode($buffer, true);" q. e: \/ o+ S8 D' o. J* B( w1 m* g
  21.         $uid = $data['uid'];( E8 C- Z' J/ s9 T; Z+ n
  22.         // 通过workerman,向uid的页面推送数据
    8 W+ ?& Q- ~" }; ]# O1 X
  23.         $ret = sendMessageByUid($uid, $buffer);5 p1 h* C* P, M' u
  24.         // 返回推送结果) a" u& B! Q% j, F  l# `. M% |. X! v, j+ j
  25.         $connection->send($ret ? 'ok' : 'fail');
    + |2 w& t) f, B- U8 r
  26.     };
    ! t- t! u& D8 c" V  \: H  c( C/ P
  27.     // ## 执行监听 ##. ?+ W2 Q4 S! ^& k# A
  28.     $inner_text_worker->listen();3 A  h* O8 _1 `& J8 J  M3 ^
  29. };3 U; y$ D  y2 V2 T) }% Z/ ?3 `
  30. // 新增加一个属性,用来保存uid到connection的映射
    3 f: n6 v+ K9 z, W% K- `
  31. $worker->uidConnections = array();4 ]$ d6 y, Y" ?: O/ b0 a
  32. // 当有客户端发来消息时执行的回调函数" y; X6 `' l/ e3 X1 f; a
  33. $worker->onMessage = function($connection, $data)
    . \$ {! o( G# J( `6 }, P
  34. {
    . C5 z2 L  F# @7 F  q) f) ]$ s
  35.     global $worker;
    3 K, l& L8 i* A4 Z1 x& k2 K3 m
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    . g( B3 V) ]7 n  r1 d. u: o
  37.     if(!isset($connection->uid)), U" X+ y  ^, Q) F
  38.     {2 e, `% T1 M( Z; G% |: h$ z
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    5 e" l) S6 L" G, |' w. C
  40.        $connection->uid = $data;- R# l5 Q" v7 U% j
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    . Y& b4 a6 c) o1 l; l* V
  42.         * 实现针对特定uid推送数据8 {( R- l* x: }1 `
  43.         */
    0 B5 g" u8 F: ^7 f9 y" `
  44.        $worker->uidConnections[$connection->uid] = $connection;
    2 l! @' }' g% I  Q/ `3 T3 n
  45.        return;
    : v$ g* `5 x# B! S3 f% m2 @
  46.     }
    8 ~6 b( I/ L; K2 Q
  47. };( t5 E+ u+ q1 B1 L
  48. 2 {& |$ H: Q. [& T; r
  49. // 当有客户端连接断开时* t* q. ?; u. ?% i  I- i5 ]. W
  50. $worker->onClose = function($connection)
    & x( ^. k' v" A* {
  51. {, y9 V) w2 c# ~5 g) X: D
  52.     global $worker;
      U9 j& o3 i" l  B
  53.     if(isset($connection->uid))6 f8 l. c. [5 Y7 ]: O
  54.     {  M  P! Q: K  a- p
  55.         // 连接断开时删除映射8 {( G* O/ ?' k+ M, }1 ]! I
  56.         unset($worker->uidConnections[$connection->uid]);
    $ M' n* W3 O9 y9 r& t# u4 g3 ]- @9 W
  57.     }/ Q, o1 Z4 I3 ]* _/ L$ b
  58. };" o+ e: S( m, Q. T# I

  59. & j! J, ~4 u8 W, p/ u0 Q
  60. // 向所有验证的用户推送数据
    ( a! u( {; M, V1 u2 L1 Y& H
  61. function broadcast($message)9 b$ P8 d/ m0 F8 R% w
  62. {
    4 Z0 K0 c8 ^) C- N" _! m
  63.    global $worker;
    2 b% c$ f3 q* T2 ]- n/ o8 l
  64.    foreach($worker->uidConnections as $connection)
    4 w! t9 R& ?, y! |9 Z
  65.    {
    ! P2 n* y& Z6 W) ^. S8 s3 w. A+ a
  66.         $connection->send($message);/ D! e2 i9 A; c. `- m9 `6 w( l
  67.    }" w3 h0 M0 ^/ I0 g2 j
  68. }
    1 O0 X& g( q+ e; B" O

  69. 0 b* _) J" V! W
  70. // 针对uid推送数据
    0 z# p, {" N( _. O
  71. function sendMessageByUid($uid, $message)
    " D. w' C3 [0 i- a
  72. {7 R/ F: T8 G. ^0 a! ^; W
  73.     global $worker;
    , c( r/ Q8 B4 `" P+ b- G8 ]
  74.     if(isset($worker->uidConnections[$uid]))7 Z2 W, F" K9 o
  75.     {
    ( F5 [  H* S" h2 a# ]
  76.         $connection = $worker->uidConnections[$uid];  s) @* q, l9 L1 I! _) Y
  77.         $connection->send($message);" p6 b+ ?0 c8 V0 Y
  78.         return true;5 I7 o- N0 j! n: O! m
  79.     }( Q) y! a/ k9 j4 H3 I3 l
  80.     return false;8 `# A$ b2 L$ n( x' P
  81. }; v: P' y. a8 W+ r4 L. s
  82. & L5 ~5 b5 }8 c0 w* ?0 z
  83. // 运行所有的worker
    % E0 R" L) o1 o) J5 h# `
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');1 v: X* U9 f& i6 B: f0 |
  2. ws.onopen = function(){
      s" e4 {: U+ n& i# N: P
  3.     var uid = 'uid1';2 z9 v2 a: `; a$ i% e
  4.     ws.send(uid);' _1 g. R1 `; l% h! Y$ R, }! j
  5. };+ C- k+ ]0 v* ]
  6. ws.onmessage = function(e){$ W( {% N! {6 ]$ G4 ^+ f) }: e, h9 k
  7.     alert(e.data);
    ' s# v, C  `: ?# B7 _/ o' H
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    # }- o6 F9 S- s, Y1 ]' e, o  I& P
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);& l1 j/ M" d: o  G* s" [
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    # B% p5 o: \# w3 j" K
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');& N! L) q# O! h1 m" p9 k% p0 C/ S4 z; ^' E
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    6 c0 d: }2 C$ [; K. X+ o
  6. fwrite($client, json_encode($data)."\n");
    " i( j& y- K- F" S8 c  R
  7. // 读取推送结果
    2 x$ z9 {: j5 ^5 D! f* U" F
  8. echo fread($client, 8192);
复制代码
/ g" `( Y+ Q4 p3 K
$ S) ]! ~2 I+ l  X  [0 E& E" E0 d9 ]& H
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 17:25 , Processed in 0.058671 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!