您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14812|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;8 u- i% T- P" u/ U; d( o
  2. require_once __DIR__ . '/Workerman/Autoloader.php';4 R! |/ n. `. A# @% S2 j
  3. 3 ]. C* ?6 f- |/ [0 }; D4 c
  4. $worker = new Worker();
    + X- S8 A, C7 ]5 I# O' @! I& x0 q
  5. // 4个进程
      j) N( w4 [4 o* ?7 T- Q) U) U
  6. $worker->count = 4;
    ! w$ j1 t# f& [
  7. // 每个进程启动后在当前进程新增一个Worker监听
    7 A/ |3 Q' d/ S3 j: C
  8. $worker->onWorkerStart = function($worker)
    + L! I- b2 Z, F: X
  9. {5 k  t3 @2 e. C/ ]  R7 \/ ^
  10.     /**) G5 T) K# V) b. ?- M7 z
  11.      * 4个进程启动的时候都创建2016端口的Worker
    % h: ?& p0 |7 b' k
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ; u$ y% M7 R+ r6 Y$ y( S4 l( p+ X, N
  13.      * 如果worker->count=1则不会报错/ r$ D+ s7 z9 {9 t: ]  N
  14.      */
    $ s$ g9 o7 m7 F* S  z( n( g7 Z3 d8 H' k
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    3 _2 ]. o) P4 u& Q$ Q# t6 d9 W
  16.     $inner_worker->onMessage = 'on_message';
    * J% o- ]. G( {  F6 Z
  17.     // 执行监听。这里会报Address already in use错误4 O4 P) O: u3 M; [8 q
  18.     $inner_worker->listen();; y9 h& _- m$ W. r4 x0 d
  19. };
      ]. f) ~) s0 U

  20. " a! T3 y8 s: d1 `) ^# u" h
  21. $worker->onMessage = 'on_message';- o9 E& V9 c( B; a  b2 L

  22. $ K  A6 t: {- Y5 y3 }6 k% [' g
  23. function on_message($connection, $data)& Z: K2 L- y/ z, ], c& _  N8 ?/ O% I
  24. {
    % e1 v( n8 s% l6 P9 @* T
  25.     $connection->send("hello\n");
    : R5 n. x6 s3 R# h9 i6 j9 l; _5 f1 _* c
  26. }- J5 U4 z# t3 i2 Z) r' l
  27. 3 g% a4 V$ Z; T1 V5 h: M' `+ _$ d
  28. // 运行worker
    3 Y4 d* r( t# T  a% D. p8 @
  29. Worker::runAll();1 Z" ]  h0 Q. L' n8 |' Y
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    ! S( {2 j+ ?2 P: b5 ?* J; p. }7 v) W
  31. 5 q/ ]( \& k9 F' W
  32. use Workerman\Worker;
    $ `: A' d- G% F4 y$ ?+ T
  33. require_once './Workerman/Autoloader.php';- g* B7 b* W1 R* c4 r

  34.   x( d9 T. l/ L) R. A2 J" s) Q
  35. $worker = new Worker('text://0.0.0.0:2015');
      i% h( P5 |% @7 s, f$ Q
  36. // 4个进程
      E8 i; j5 w) E1 n* p- o4 a
  37. $worker->count = 4;
    # V6 B3 Q- K) T9 y- ^( y  a! J
  38. // 每个进程启动后在当前进程新增一个Worker监听2 c0 w% T+ X" a8 P
  39. $worker->onWorkerStart = function($worker)
    4 E, X5 C- |" k9 [6 i  s2 [& d# \8 x
  40. {$ h3 u0 c& g5 h7 C6 L9 }6 ^( s
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');, D% l( ^( b: X  u6 P- r
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0); V) G, H, L( O
  43.     $inner_worker->reusePort = true;
    / t+ K6 V2 x) U7 H& b
  44.     $inner_worker->onMessage = 'on_message';
    ( D; ?4 U0 F& w. j2 D; R
  45.     // 执行监听。正常监听不会报错0 ]+ z* t. g% O2 G
  46.     $inner_worker->listen();) m, n- Q. o0 c. g
  47. };  D2 z4 K& k, A5 D7 f, ]" u

  48. 9 r1 j9 {6 s4 e' c5 Q: z+ C
  49. $worker->onMessage = 'on_message';
    ) M2 |6 P* K2 U

  50. - P" _7 J% M+ C: _5 k
  51. function on_message($connection, $data): X! o& X: a& ]3 u
  52. {
    ! d7 F2 |  ]3 c# f
  53.     $connection->send("hello\n");
    0 m: G1 E+ |$ K
  54. }  n6 H/ x3 F7 h7 G3 c
  55. ( P" L0 F/ q3 e; }$ y( p
  56. // 运行worker: i+ `' E' ~7 A, r6 `# f4 j
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    $ ^# P7 Z1 r1 `" f" s2 e8 h* f
  2. use Workerman\Worker;
    3 O" D# L: k) H8 F* S
  3. require_once './Workerman/Autoloader.php';2 H+ E  y& z! w$ n. y
  4. // 初始化一个worker容器,监听1234端口
    / w2 r; w3 E- o) I, o. \( e
  5. $worker = new Worker('websocket://0.0.0.0:1234');- y7 p, F$ k% E9 f2 H9 e5 m3 ?. E
  6. 7 e7 S: A4 K- N- R! R3 K+ t
  7. /*
    " U6 _* F  R1 b; Q6 v
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误5 o- ]+ }& v' w( u2 s0 e; e
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    * m7 X3 {$ p( }/ C$ ?
  10. */
    4 V6 j3 y6 `5 E* _1 x
  11. $worker->count = 1;5 J: j7 _- W. z" t
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# G* p, h" ]& W, C/ V
  13. $worker->onWorkerStart = function($worker)
    9 o& |. }) s% Q; b) K
  14. {9 ~7 D; G* a  U" H% Z/ [3 X
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    - V3 i) b* L5 N
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');$ |( ?% O* e8 {
  17.     $inner_text_worker->onMessage = function($connection, $buffer)/ V3 F4 @! Y# J- i
  18.     {1 w0 g" ^7 R6 P
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据- z  z3 y) e2 D& Q- ]0 p# C$ o2 p
  20.         $data = json_decode($buffer, true);6 X8 `/ M3 v7 H# U" X( q! o$ \
  21.         $uid = $data['uid'];4 q; {0 _8 q2 B& k7 e: U
  22.         // 通过workerman,向uid的页面推送数据
    . L7 \( U3 a9 N4 w) g+ Q4 W
  23.         $ret = sendMessageByUid($uid, $buffer);3 W" d5 O3 y: ^8 v) W6 ^5 m6 P
  24.         // 返回推送结果
    6 ^/ t' l4 s3 x4 I0 v
  25.         $connection->send($ret ? 'ok' : 'fail');
    1 ~5 v9 Y* D# Q2 ]# l1 U
  26.     };: z0 Z  \9 _2 ]& W- ~
  27.     // ## 执行监听 ##
    4 ~5 V. Q' z5 W" H% `# E; g5 e
  28.     $inner_text_worker->listen();
    1 }3 z9 J* h( r. S. ~
  29. };9 v5 H- H. s3 F2 l5 i
  30. // 新增加一个属性,用来保存uid到connection的映射+ L7 d8 n7 ^* f4 \$ F8 b/ k
  31. $worker->uidConnections = array();
    & `% Q; W/ m7 Y1 {6 \
  32. // 当有客户端发来消息时执行的回调函数6 ?3 Y+ W  |9 S. w
  33. $worker->onMessage = function($connection, $data)
    % j; H$ P2 E: a# x1 {8 [
  34. {
    $ Y& t* K* {: P
  35.     global $worker;
    ! Y5 a+ r6 x" f7 n3 N# F, D4 P$ j
  36.     // 判断当前客户端是否已经验证,既是否设置了uid& D6 @: k2 f* J! S3 G% H" k
  37.     if(!isset($connection->uid))$ T$ r  w; `6 P8 n, @
  38.     {/ w1 {1 `9 |+ T1 k* S; P( f
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)% F) S! T4 E2 P) n: d" Y0 R" i
  40.        $connection->uid = $data;  n: i) @6 A+ _, Q
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    4 J4 S2 t( p0 U, `# V* y
  42.         * 实现针对特定uid推送数据
    & Q0 {( S) j+ A
  43.         */7 {& {7 z% V0 k, ^$ y
  44.        $worker->uidConnections[$connection->uid] = $connection;- f: V  D  v$ s4 X# ]1 ]
  45.        return;
    / g. ?. p0 R, r  g+ J& ^
  46.     }
    ! |- _  Y# S, n1 p: Y
  47. };
    / v0 }# J1 N6 B( h( Q7 L. c% L9 r( ?) l

  48. , j+ t9 i+ U$ d0 h' ^( d
  49. // 当有客户端连接断开时$ G2 T% |; C! ^# H
  50. $worker->onClose = function($connection)
    3 E; X0 D2 |" R4 q  S# F1 u, V! E
  51. {
      D4 m- n: P+ V, q
  52.     global $worker;& T( s$ T, w' n9 o
  53.     if(isset($connection->uid))' ~) K5 ~. _) m$ N
  54.     {
    * C: ]+ U2 J4 `# {' e" H0 i
  55.         // 连接断开时删除映射8 e1 l# ^2 i" C- @1 o
  56.         unset($worker->uidConnections[$connection->uid]);
    & h9 ^5 Y7 u3 a: F
  57.     }% m6 t8 Z& q) u+ `7 V
  58. };
    $ ^1 M- F; \2 y" L/ c
  59. ! h8 Q  ?4 @7 W: Q5 m4 }6 U6 {
  60. // 向所有验证的用户推送数据1 ]  u% a9 ?6 T
  61. function broadcast($message)' |+ B. b; s+ z# K
  62. {
    4 e, |6 S/ T0 |: s: Y
  63.    global $worker;
    7 _2 U  _& }; k, m2 ^# l
  64.    foreach($worker->uidConnections as $connection)( G$ r3 _, a, ?0 t- h
  65.    {
    $ N9 Y# k% q% p
  66.         $connection->send($message);& a. b3 r6 Z/ r( A* O# L
  67.    }
    4 N! n9 K+ \: _( z- y
  68. }- j2 \9 z# Z" r' y! Q

  69. * C1 z) N" R/ @2 ^; C3 h
  70. // 针对uid推送数据
      G7 z$ B9 R$ H. ^5 D2 R7 d
  71. function sendMessageByUid($uid, $message)/ w3 e' G2 U/ D$ {
  72. {
    " E2 ^+ ^& o! ^2 ^
  73.     global $worker;; ^) I2 x% T4 F2 B  y6 ]( A7 w
  74.     if(isset($worker->uidConnections[$uid]))
    5 q7 L6 ~( u# X! j; C( T2 R
  75.     {6 Y+ D/ v7 ?* c
  76.         $connection = $worker->uidConnections[$uid];
    % \, b/ [+ a7 {6 c# S
  77.         $connection->send($message);
    ) B  y" a" i' P5 U1 Z% Z  F9 M! G3 a
  78.         return true;
    3 C, E3 I  Q+ \. M+ o1 ]
  79.     }. T6 Y1 w2 a1 h/ j% C% S4 |
  80.     return false;
    : ]$ b7 L+ i% P! ~  {' t
  81. }/ F9 `8 A0 e- Q$ ]: L

  82. 7 C' i/ E8 h; P7 g1 z
  83. // 运行所有的worker
      Q# l- ~! p! E; l; N& r  o
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');/ m- ^& X9 t& V2 C
  2. ws.onopen = function(){
    2 j+ e1 P5 W% ^
  3.     var uid = 'uid1';
    5 _: P; N9 U) Y! ?3 S9 M
  4.     ws.send(uid);5 `3 G- E% r0 B1 f! q
  5. };
      d. J; \7 L. }! f( N2 F- M0 S
  6. ws.onmessage = function(e){
    4 q# h/ }2 G9 |& Q
  7.     alert(e.data);
    7 N# U; ]  N' p% y) V. k/ A
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口# s  X) D( G0 g& p# x
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    7 }* _, n/ \) I% c; M4 y+ }
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    9 R" J8 h, ?7 p, b( @9 P5 x
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    : c, e  r/ u' L0 C$ ^. f
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ( q8 g& S  e2 t; ~8 L
  6. fwrite($client, json_encode($data)."\n");
    / J8 J  ?; n! o) n
  7. // 读取推送结果& ^% x  U, T, _' k7 p9 p
  8. echo fread($client, 8192);
复制代码
2 T8 t. i. B3 {% X# n) e# a. W. J7 U

4 L9 j% o5 D& |6 f; L* a9 o
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 21:42 , Processed in 0.054319 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!