您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15069|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;7 a# h" W5 P6 ^
  2. require_once __DIR__ . '/Workerman/Autoloader.php';* J: U; {0 B' j( l; Z1 R3 `
  3. ( h' J7 z2 `2 I
  4. $worker = new Worker();
    3 x- d; [/ g, y
  5. // 4个进程
    # E( G/ {* J& t
  6. $worker->count = 4;2 J- y' b$ [) E& c
  7. // 每个进程启动后在当前进程新增一个Worker监听
    7 c( e" K5 z" n/ |6 O* |
  8. $worker->onWorkerStart = function($worker)) O* n  o) L9 M" O3 x
  9. {  ^  R9 n" J5 O& ?
  10.     /**
    6 t" d* {* R) G/ X* Z
  11.      * 4个进程启动的时候都创建2016端口的Worker
    5 o& ~: x, Y% M# U- |
  12.      * 当执行到worker->listen()时会报Address already in use错误
    0 a; j2 i& E1 n( _, U
  13.      * 如果worker->count=1则不会报错
    8 P8 j/ I" R# G6 i
  14.      */! H( \. \; f. F% l: M- P' e
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');( ?: g8 z1 @7 E
  16.     $inner_worker->onMessage = 'on_message';& K5 d: [3 {8 n$ i) }# Q! W& l
  17.     // 执行监听。这里会报Address already in use错误
    8 L  T$ c* p8 u/ l( k! D
  18.     $inner_worker->listen();  E" B" [" e8 N  t( |( y( V2 Y
  19. };, [0 v, g+ {9 w) V" K+ q7 S* G

  20. 7 O) U* ?( b# f. Z- @$ J2 q
  21. $worker->onMessage = 'on_message';
    9 K; f. r: f$ v/ u7 H
  22. 2 Q2 r+ a# o+ A& D- T' E3 G" d; z
  23. function on_message($connection, $data)
    - D4 |' u- n4 P! m9 t
  24. {7 m$ D" b+ N8 y* P1 V9 `4 u
  25.     $connection->send("hello\n");
    $ z3 m+ L5 ^) n9 B; Y2 L) w+ G
  26. }% s" v6 \3 ]3 g: m7 _; Y/ @' s. D
  27. 4 c1 q2 E2 o" e1 B; M
  28. // 运行worker& T/ o2 W% E) I+ I3 N
  29. Worker::runAll();
    ) `8 P8 H" v) h9 g# a/ m
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    2 x  m8 a$ e5 v4 k+ K
  31.   X8 D9 a$ i, v5 |  B; c
  32. use Workerman\Worker;  ?: j( [( S8 ~. w
  33. require_once './Workerman/Autoloader.php';3 X! R( R/ \9 _& s% D
  34. 6 }7 s9 w" g  N
  35. $worker = new Worker('text://0.0.0.0:2015');* m# N) S8 t5 d+ [
  36. // 4个进程
    4 m' b' U4 j9 o( ]1 C% N$ u$ b$ N
  37. $worker->count = 4;" t% X0 i1 n' d3 K* j& b# F/ N/ ^
  38. // 每个进程启动后在当前进程新增一个Worker监听
    3 h6 l5 v  _; n; |$ V; q
  39. $worker->onWorkerStart = function($worker), T8 f2 p+ ~1 Y
  40. {
    5 C( [% s9 v; v/ x
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    9 G# \; _% u) P0 V4 F% X
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)) W0 s; W5 H+ i% P3 P  b
  43.     $inner_worker->reusePort = true;
    1 f$ k0 r% k1 D$ h% \4 A$ J: l) `
  44.     $inner_worker->onMessage = 'on_message';, {' W$ F6 h) O2 A9 A. [( r
  45.     // 执行监听。正常监听不会报错- b  D2 R2 p6 }' U/ g) b
  46.     $inner_worker->listen();" i4 N2 D: w* U% ?5 s9 ~) V$ X
  47. };
    6 I( M5 J/ @, J0 e( X
  48. 8 e, ]! `0 O! X5 f$ L/ u
  49. $worker->onMessage = 'on_message';/ ~8 v1 P- Q, c- ?9 D& E( l
  50. / r2 k# I' @0 e  E+ R2 w
  51. function on_message($connection, $data)
    9 N5 K' M5 @1 k. w( R5 `0 h; e
  52. {' ^3 j/ w1 T1 `& `
  53.     $connection->send("hello\n");
    : S. g* a5 j9 R
  54. }
    - v5 p8 A5 Y( d

  55. 7 J0 V% Z8 u4 C9 U0 T1 G
  56. // 运行worker
    : b/ R! C" ^7 F5 }, [+ ]# ^
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php) A8 m0 y* B2 s1 |5 J, n
  2. use Workerman\Worker;
    4 f" ]3 }) o# R- G& h
  3. require_once './Workerman/Autoloader.php';
    ' h8 ^! f# C  S3 b/ |" |4 j
  4. // 初始化一个worker容器,监听1234端口
    2 ]% E2 W  V# {7 n7 \/ ]  e
  5. $worker = new Worker('websocket://0.0.0.0:1234');2 J% q7 F! o7 ~3 ?6 p. h  b$ Q

  6. * w+ u4 }3 l' O; b; P
  7. /*
    3 E! j; R+ c- k: k
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    " k+ U% `* Z) w7 t
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)6 n( d- {7 Z# f, y- n
  10. */
    * y% I# k* j2 ?, u) ?! z
  11. $worker->count = 1;
    $ \3 I9 W. y# b: W6 p6 {; D3 C! B
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口3 q5 m3 C, k) O7 |# E( r6 H
  13. $worker->onWorkerStart = function($worker)2 v: ~. c* O- d1 V# W
  14. {
    & c3 V! p& `9 ?
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    9 K' k! q+ h9 A5 ~8 E
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');2 F& j( s5 U! m. ]
  17.     $inner_text_worker->onMessage = function($connection, $buffer)7 ]$ L; [6 F2 a8 b9 v/ B# q) ]9 I
  18.     {
    ) I, L5 C, O  b. }
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据' N. b) K3 i# t. u  M
  20.         $data = json_decode($buffer, true);2 ^! T: }1 a3 K, k1 S
  21.         $uid = $data['uid'];
    , V" Z# R( T. m- Z& K: X- `
  22.         // 通过workerman,向uid的页面推送数据& v4 r, a! I; M* \& j! \+ W' g
  23.         $ret = sendMessageByUid($uid, $buffer);
    $ k! ^5 K9 M6 W4 ]( O: q& r
  24.         // 返回推送结果
    . j) _; S6 r( M, n3 z
  25.         $connection->send($ret ? 'ok' : 'fail');. [; h$ ?' _7 ~* }, N+ \: n
  26.     };/ i2 L1 N+ ^2 X+ l: o( I; m) W
  27.     // ## 执行监听 ##
      N! R* G7 E9 q# Y8 q3 ^6 j" J
  28.     $inner_text_worker->listen();3 w2 c( w  N: b5 ?' E6 Y2 h
  29. };
    ( s+ X: K& k1 w# D! H$ r4 |( w& [1 c
  30. // 新增加一个属性,用来保存uid到connection的映射- @) n1 W. x8 p  e2 x
  31. $worker->uidConnections = array();
    2 ?% r( r+ j7 B' L$ Y
  32. // 当有客户端发来消息时执行的回调函数
    ( T; v6 b% e& m8 g0 o2 x' S
  33. $worker->onMessage = function($connection, $data)
    5 }4 I3 w& S! N% d' r
  34. {( Y0 ^4 E7 j' m/ ]0 f6 w
  35.     global $worker;
    . t( u: m) y  h2 F1 m" x$ O5 F( D
  36.     // 判断当前客户端是否已经验证,既是否设置了uid: P& t4 w0 |9 r2 i1 j. D: R# R
  37.     if(!isset($connection->uid))
    , l4 Y2 ~  a0 u/ R; R4 t# @, W
  38.     {9 \* M& u# V/ f3 N( ?
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)" m+ C9 _2 o* A
  40.        $connection->uid = $data;
    + K5 _0 E* Y) q9 B
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,: e- y  i: \8 J9 J
  42.         * 实现针对特定uid推送数据
    . k3 E/ o8 V8 D+ m5 z% s
  43.         */; {& X- i+ Y$ X5 |! v! r
  44.        $worker->uidConnections[$connection->uid] = $connection;
    # E9 M9 s% U( v: R  \' W7 i# k
  45.        return;
    2 L( e: B- g6 L+ Y) s
  46.     }  }# ?; F& s. b2 S/ z
  47. };$ |& W$ C  `7 p$ {6 B& |% u

  48. ( \/ g/ J! C; ]' I3 K
  49. // 当有客户端连接断开时: }- U: B* f5 X9 t) H
  50. $worker->onClose = function($connection)
    ( D& D" q5 g! \. L7 X3 v# `" p
  51. {$ x* p  l' L' n, q$ r. ?
  52.     global $worker;
    ( v) m9 F& f; w/ L8 z5 Y
  53.     if(isset($connection->uid))+ s/ l2 g0 j" B# n
  54.     {3 d! ~0 j( g5 e- n& [
  55.         // 连接断开时删除映射
    7 [% F% G$ t' h, b  N
  56.         unset($worker->uidConnections[$connection->uid]);
    9 z- @& H, W# L
  57.     }
    6 O6 }) E+ e0 Q9 y) X
  58. };7 v! n" V# W8 `) j' j

  59. / g9 k1 `- t: g. c- W6 {( d
  60. // 向所有验证的用户推送数据$ ~) x" d' f. F: X- [! l0 K% r' J
  61. function broadcast($message)/ Z6 R& ^' D( H5 U* Q; G
  62. {% q6 T6 J5 h$ v) k7 x
  63.    global $worker;. e% O. `0 O* e4 D% B6 ~5 b
  64.    foreach($worker->uidConnections as $connection)5 s% r2 |7 j' S9 t
  65.    {
    # t9 J1 f: q% S: P- ?- o) \+ y
  66.         $connection->send($message);
    # H! W: B% X) ^
  67.    }
    0 r, w: G6 x4 X2 d, N1 h2 U( `
  68. }
    0 `! [3 e+ n8 ~1 X" x
  69. 4 O% I5 l2 g8 n- K
  70. // 针对uid推送数据
    9 Z* q' h3 m* g
  71. function sendMessageByUid($uid, $message)
    0 N% E  k  a- ?
  72. {  `; Q1 L# i, b# U- H
  73.     global $worker;
    + t8 R' D- G; l/ V
  74.     if(isset($worker->uidConnections[$uid]))
    ' b: C' f; B/ w3 E/ g
  75.     {2 l: P7 J* l1 `1 r) {) a  q
  76.         $connection = $worker->uidConnections[$uid];
    ! V9 D! \( Y: Q1 J
  77.         $connection->send($message);+ _; ?$ V6 N, K! Y& a7 M" A7 s4 k1 T
  78.         return true;  B1 w1 S/ P4 b% c  \5 s4 i
  79.     }( q3 ^6 m1 x/ z" W& m3 `1 r
  80.     return false;
    & O& x$ U' J& |* y# c, R* C
  81. }
    / {5 Y: O6 T# W% r* t

  82. " \7 i0 X; i5 W! p
  83. // 运行所有的worker; G1 B/ {  {" L' G
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');- Y. {! n4 _% h" u/ O7 ]3 U3 z
  2. ws.onopen = function(){
    . F4 V# h+ U1 l2 v* D
  3.     var uid = 'uid1';  S$ A3 r% Q; x' g8 Y
  4.     ws.send(uid);, C$ E7 s' u! O7 G6 X
  5. };
      |- }$ ~7 b7 {& }- k7 x
  6. ws.onmessage = function(e){- f* ]0 U/ g/ l% V1 y3 @2 F1 O
  7.     alert(e.data);
    . [6 L' ?/ `5 c( @6 G# J
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    . X$ e5 z  p& n0 E$ n  b0 i
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    ' H7 m) A: H7 n# Y- M( K1 B( M
  3. // 推送的数据,包含uid字段,表示是给这个uid推送# J5 R1 q/ I4 q! l* `0 M( Y
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');" V- x( m# m0 f
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符  D, Y; v  M9 w+ n) a: Z* d
  6. fwrite($client, json_encode($data)."\n");$ ?; Z) v8 @' l9 g
  7. // 读取推送结果) _9 q# V7 u! ~7 }2 e* x+ W1 t
  8. echo fread($client, 8192);
复制代码

) u5 }6 z6 g6 ~, f) c
0 {; Y; D$ i  d- P3 _, a' H% p, p! N
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-4-30 20:52 , Processed in 0.064669 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!