您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10516|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    3 G% b3 O5 d; [1 C4 ?
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ( I( f8 s5 ]- R( F& O; \
  3. 2 k# P/ b) r9 p; \2 b
  4. $worker = new Worker();2 j: B# o$ {( p. }% n) g  V, D% P9 ]
  5. // 4个进程4 A) S2 `+ R* w1 C% Y6 S8 O# T8 y" b6 J
  6. $worker->count = 4;5 n! G6 N0 e! U. A/ N
  7. // 每个进程启动后在当前进程新增一个Worker监听5 J- Z$ z/ j( }4 w4 B
  8. $worker->onWorkerStart = function($worker)3 L* r9 v/ D' v$ K8 f
  9. {; q/ T# o. V- H0 Z8 p# m! m5 s3 t: w
  10.     /**9 ~0 f. C* F: J4 t0 i
  11.      * 4个进程启动的时候都创建2016端口的Worker
    % @2 \2 m. V9 ^0 o
  12.      * 当执行到worker->listen()时会报Address already in use错误9 @( S; x0 r0 I* |) m: t
  13.      * 如果worker->count=1则不会报错
    9 b3 r6 x! L" t
  14.      */
    * a' ?& ?$ g" @) R
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    6 |+ P# }1 L& c' V
  16.     $inner_worker->onMessage = 'on_message';" F# |/ G, q: T& x! L
  17.     // 执行监听。这里会报Address already in use错误
    4 ~2 ^7 P6 f9 ~* [: x) s
  18.     $inner_worker->listen();# I& S. k4 d3 h) O( y% m
  19. };7 d# h  o) {2 V7 k. V3 b0 \

  20. . q3 y8 h, y$ T: W
  21. $worker->onMessage = 'on_message';' I! U; w. j/ i% o

  22. . M2 N2 R( N' x& R; e6 s/ l" O' X
  23. function on_message($connection, $data)
    " d9 ?, a% d0 O8 {2 M# J# F- ~( i# T7 f2 u
  24. {9 A3 \2 T/ x5 u3 k
  25.     $connection->send("hello\n");! \! f# }' h) t- ^
  26. }8 S: ?" g& D+ T; `5 |! c0 I1 Y' {
  27. $ Y1 L" t2 @- ]( v
  28. // 运行worker3 ]% @7 ?+ t) G, `+ w( Z' Q* B% E  {
  29. Worker::runAll();+ n6 G6 K( p; J' L
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:3 u5 i" c" y; k; S/ h1 J) ~: _
  31. * D0 t. G0 n5 D  Q
  32. use Workerman\Worker;
    ' m: K; |$ n# G$ q& ^
  33. require_once './Workerman/Autoloader.php';/ f* G$ c% c. q8 _" [
  34. 1 o2 E, q) a" G9 g  D+ `
  35. $worker = new Worker('text://0.0.0.0:2015');/ N3 F. X3 D7 g' E- Z5 Z( \8 p
  36. // 4个进程
    & j4 @# l: m. K* A- v2 _
  37. $worker->count = 4;1 ]1 A; ]4 f8 U2 r( x
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ( _( D/ H, r0 i# v) u
  39. $worker->onWorkerStart = function($worker)
    5 r0 X2 [" ]( |1 O* P) ?1 a% N
  40. {
    ) X) ~- O! u6 |0 `# }% M
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    $ H" Q2 `4 }' P6 `
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)( ?: s: F  p( B7 r+ a9 j+ v0 T* `
  43.     $inner_worker->reusePort = true;
    $ y: U9 p5 ]+ k- O7 @/ |# d
  44.     $inner_worker->onMessage = 'on_message';9 K5 X' r' [: j7 Q+ @. x; d
  45.     // 执行监听。正常监听不会报错
    ; z/ [0 T9 D& }6 M0 y
  46.     $inner_worker->listen();
    6 W# x, j  }" A. p# T9 `: d
  47. };( P4 Q. ^% {, Q/ f. X0 `: W( e2 ]
  48. ! n" X  Z4 s5 S
  49. $worker->onMessage = 'on_message';8 u8 }8 e# z5 ~$ l

  50. ! a6 H$ e* V$ A1 ?
  51. function on_message($connection, $data)
    / P2 a4 J; h9 v, j" `5 g8 w
  52. {* d9 }) t( ^! y9 E+ z. N0 o
  53.     $connection->send("hello\n");
    . ]4 F% p1 r! _. q: S
  54. }
    % a1 U' ^& Y. c! Q3 z
  55. + p4 {3 Q4 D4 M9 @/ P' j( T! b
  56. // 运行worker" x  s- ?) z- d7 T& \1 |6 F1 w
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    , M  }# ^5 x" V% T, J$ q. A
  2. use Workerman\Worker;
    8 \! n  ^# G$ j, z! G
  3. require_once './Workerman/Autoloader.php';4 a4 x3 C4 t( J  d
  4. // 初始化一个worker容器,监听1234端口2 M7 Z  [( W4 L1 z/ J
  5. $worker = new Worker('websocket://0.0.0.0:1234');( P8 S1 v( R, e% ]
  6. ; Q+ [4 K% S1 O4 K$ W* v. g
  7. /*
    1 z. `- E; |7 {% o! y" a: K
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    " U* f; t  q7 b, p2 ]" }
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)7 h$ K9 M! o. H' l$ @
  10. */: Z  r3 E, a0 S4 g# h
  11. $worker->count = 1;
    0 f0 ^' r' }% h
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    ( H& h, Z$ D6 A0 P+ l3 E6 r
  13. $worker->onWorkerStart = function($worker)" b4 a, M. y  M- M: ~' x
  14. {1 V, h$ w; {, Q8 _. D
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ! V$ W$ U( |! l3 H/ \
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');1 O9 b1 l; r) W, Y. c( P! T
  17.     $inner_text_worker->onMessage = function($connection, $buffer)' N/ A6 s" a8 k  d5 F
  18.     {" ]4 f6 K4 S. x- ?# c4 W
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    ; e8 |- A* Q/ I
  20.         $data = json_decode($buffer, true);; T* I+ y7 a8 W7 ^
  21.         $uid = $data['uid'];
      b4 g7 d0 s4 |' u6 A7 W
  22.         // 通过workerman,向uid的页面推送数据
    ) b6 d' Q4 v0 b5 N& a
  23.         $ret = sendMessageByUid($uid, $buffer);
    7 E) \7 h% u( h0 {, k- d
  24.         // 返回推送结果
      B8 S7 S& y/ }! P
  25.         $connection->send($ret ? 'ok' : 'fail');
    9 H+ q# X  w- \
  26.     };1 [+ p! _. i$ v1 Y9 c4 h/ B% b0 o
  27.     // ## 执行监听 ##4 L& _) c) a/ P) W/ K7 J: Z
  28.     $inner_text_worker->listen();5 L% g0 L5 |0 I
  29. };) o% a2 g2 S# c% G$ x6 Q# q9 d: q5 W
  30. // 新增加一个属性,用来保存uid到connection的映射
    : n  F, s' }. u7 L* d
  31. $worker->uidConnections = array();
    . y, Q' v, I+ e3 O% T- w
  32. // 当有客户端发来消息时执行的回调函数
      N% G9 Z- D$ t2 m
  33. $worker->onMessage = function($connection, $data)
    4 X6 Y2 r7 w8 j% a5 y( m. F
  34. {1 p9 N" _6 m# u
  35.     global $worker;2 |; v- Q; `3 n
  36.     // 判断当前客户端是否已经验证,既是否设置了uid9 j6 x  Y# `& X0 e1 g* N8 p
  37.     if(!isset($connection->uid))
    ! Y9 p( T/ Y7 @
  38.     {
    , m5 L) d+ E8 E/ M
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)( F3 B) `8 r9 @' l% w; X! S: e
  40.        $connection->uid = $data;; i, a; U5 \1 @& t6 `
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    7 ~5 P& R2 P1 O4 t: ]9 z0 e( D; q
  42.         * 实现针对特定uid推送数据/ ]* g, S+ u# A$ y
  43.         */
    & z9 U, S6 d' X& y2 s3 c4 X
  44.        $worker->uidConnections[$connection->uid] = $connection;0 n) B4 n6 {5 J0 L+ d3 k
  45.        return;
    # P( z. l" ~6 i7 ?  t
  46.     }
    7 d2 O. h; f2 n  O) }2 |7 k
  47. };
    ; m+ f3 R6 k' x: f# |
  48. ( M# M- g" `" J( A1 m, r
  49. // 当有客户端连接断开时! b) v; H0 Y/ I$ U. G* q  C5 |
  50. $worker->onClose = function($connection)
    4 V, q9 }+ A& {, \% G9 V! ?" A
  51. {
    2 l/ B) D% [! @
  52.     global $worker;
    8 k3 i% _) {" I; Q
  53.     if(isset($connection->uid))
    ! J7 g" E# X9 {
  54.     {2 f" |, o( }; z9 `3 `3 X- m
  55.         // 连接断开时删除映射/ @8 ^' k/ M9 G& D
  56.         unset($worker->uidConnections[$connection->uid]);" X3 J8 X4 a7 q2 o# r1 T
  57.     }, k6 |! ~: G8 B- N  B, }) P; t" D
  58. };* [9 \  V: o0 s

  59. # z" }) g2 ?8 Z0 c7 r3 B+ V
  60. // 向所有验证的用户推送数据) d7 D# T! W$ f( k/ Z7 e" {7 c
  61. function broadcast($message)2 C1 ?- @: `' H* _4 i- B0 R
  62. {/ r/ s7 \$ G4 I8 ~* a, k2 l
  63.    global $worker;) R2 g3 t# L! `: J; a2 E, M
  64.    foreach($worker->uidConnections as $connection)  u8 l6 l4 D7 X2 J, s5 F3 h
  65.    {
    & l/ \5 g. @2 ?4 d
  66.         $connection->send($message);
    6 L3 c4 N" R3 p, {
  67.    }2 L; t4 d  g* H! V' D6 T, J
  68. }5 y: }& L& p/ A: P! g; \! E

  69. 0 o4 U6 ]  n/ A" W% ?
  70. // 针对uid推送数据  C" Z/ u  [/ ?/ y' |
  71. function sendMessageByUid($uid, $message)/ v6 q- K' Q2 |0 N* z0 Q9 B
  72. {
    # g/ c  y. ?1 @9 q4 F9 M
  73.     global $worker;
    : A  Z2 c# j% B' \. ?
  74.     if(isset($worker->uidConnections[$uid]))
    . s% O. P( e) u
  75.     {
    ; r% C4 H8 u8 z7 G0 R, z0 x8 z
  76.         $connection = $worker->uidConnections[$uid];/ J2 Q% L& O7 S' t  H
  77.         $connection->send($message);# l3 M. G' S, D, p% H
  78.         return true;9 I. _1 j6 E3 o' d
  79.     }
    + E8 W3 F: Y% p+ F1 t: v2 q5 j
  80.     return false;
    ; ^! o3 s% l; v+ ?1 t
  81. }
    8 ?' H# i, }; b, S" P& E- _
  82. - ]5 ?3 n8 ^# t
  83. // 运行所有的worker
    4 X- U& c/ y4 |+ R
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');1 |# ~; |" t( }5 T4 @
  2. ws.onopen = function(){; J3 R7 K0 a6 d
  3.     var uid = 'uid1';
    2 I4 b2 w9 b( H& f3 C
  4.     ws.send(uid);6 K" E. F. Y: J8 c$ L* f* _  o
  5. };( L0 {2 x" o6 g2 i' B1 J
  6. ws.onmessage = function(e){' `3 T1 u' \( R" W4 f! o, V
  7.     alert(e.data);8 j9 C& s8 W1 m2 W' k
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
      w; S2 P1 M, X. X
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);' r: H5 u4 v' |
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    0 O- r( @! u& J$ N
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    , a, e# r3 `- z  n% s
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符7 J7 X. A# ]8 P& m
  6. fwrite($client, json_encode($data)."\n");; h2 P6 P$ ?; q5 u; s# B# x' a1 u
  7. // 读取推送结果% X5 s. _) K8 y: [4 U8 g
  8. echo fread($client, 8192);
复制代码

/ I4 Y7 @  h( X; m# Y; R
7 Z8 P! ?: F3 I$ b" H6 H! y
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-18 23:29 , Processed in 0.110733 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!