您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15224|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    7 I. |1 J0 t% W2 m4 z0 \
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    5 @: F' l9 o  D6 s1 a# |7 u
  3. ( i; |: t; S$ k3 J
  4. $worker = new Worker();; E: N) G* P" C; r
  5. // 4个进程2 |' K! P" V6 @: G- ]) Q
  6. $worker->count = 4;$ N, c6 B4 |/ h
  7. // 每个进程启动后在当前进程新增一个Worker监听
    + n& K! _; d+ C
  8. $worker->onWorkerStart = function($worker)
    ! b: J- }; J" S, L1 R' W0 V! {
  9. {. H5 h3 Z+ p3 T% @
  10.     /**5 _# G2 x7 L  C) m: b
  11.      * 4个进程启动的时候都创建2016端口的Worker3 g& A/ D2 K1 H: p2 J' S
  12.      * 当执行到worker->listen()时会报Address already in use错误3 s# a8 s" `8 s8 U  _0 Q
  13.      * 如果worker->count=1则不会报错
    5 Q9 P/ D" c1 ]
  14.      */
    : a$ i) i2 V* w/ @, g5 k, Y5 a
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ' T+ q8 Q0 ^6 V1 e7 _8 \" Z
  16.     $inner_worker->onMessage = 'on_message';
    * |; `0 V, l8 |
  17.     // 执行监听。这里会报Address already in use错误2 Z: x( H3 r  }9 R# w* ]- F# I! b: i
  18.     $inner_worker->listen();# R) P" n  V0 g0 B8 e' @
  19. };
    & {3 U6 S- v9 s/ f1 e; k
  20. ' h2 T3 {  T5 h- {
  21. $worker->onMessage = 'on_message';( n3 L: y6 M4 a9 S
  22. 0 M! K% G! p% K% |1 V- S1 j7 s4 t
  23. function on_message($connection, $data); c0 k+ z5 a3 n" @
  24. {
    " h0 h  r% K6 A4 _& p
  25.     $connection->send("hello\n");( N* c* u, Y: `
  26. }) Y# C5 P! M7 c8 \9 c5 r& o

  27. " j5 T. a8 f" r$ T* v, t
  28. // 运行worker
    1 L& P9 S' Q" d  s* I/ J
  29. Worker::runAll();
    7 v  F3 F. s9 g# p' y
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    " \2 d# H: y- z

  31. 3 n  F* u6 ]( I& V* S. _! H
  32. use Workerman\Worker;  c/ Z' M: D9 J6 z
  33. require_once './Workerman/Autoloader.php';& l  q, c0 d* E9 V8 j  x* u9 E
  34. 5 e- r2 R  u' s1 v# `1 D7 r
  35. $worker = new Worker('text://0.0.0.0:2015');9 e  P' W. ^" n8 F  R3 T3 t4 B
  36. // 4个进程
    ( ~" D1 c5 U$ I" z
  37. $worker->count = 4;1 [+ z5 ~' N* E/ ]( g
  38. // 每个进程启动后在当前进程新增一个Worker监听
    7 w- Z1 V8 b4 s( _5 q% p3 Q
  39. $worker->onWorkerStart = function($worker)
    " Z& @; X! n3 c1 }+ C, {1 `
  40. {) u; G. T3 \+ w: {: y' A
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    + ~/ x- ^- Z. J, a
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)0 N4 U* x; y/ E' i0 N! M$ @
  43.     $inner_worker->reusePort = true;
    7 V. Y6 D! p/ T& |7 u8 f3 V
  44.     $inner_worker->onMessage = 'on_message';
    ' a: z( u) V2 T! B
  45.     // 执行监听。正常监听不会报错
    $ O) d; \; ?. ^, F& ]# ]; q
  46.     $inner_worker->listen();
    ' M/ A5 U. ^; B" W
  47. };( E' A# Y; M4 T8 `& c% i! w8 P' w

  48. + d. ^; x1 \5 T
  49. $worker->onMessage = 'on_message';5 G7 r0 m" Q3 P; v

  50. 5 z# A0 w3 @- _* [' S5 u; i
  51. function on_message($connection, $data), r$ T4 n4 i# a# d% I! J
  52. {! W( m( e- |: A  g# `/ X2 w
  53.     $connection->send("hello\n");
    - w* T5 U1 i# |6 |+ P+ V
  54. }" d: ^/ h9 {: I6 Q# j% c8 s

  55. ' y/ W" ?" O. ?% G+ z8 {2 T
  56. // 运行worker
    0 O8 ~6 a, C7 Z; x! x% E) @
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php/ E1 _: k. r% B& y# X4 O6 B
  2. use Workerman\Worker;  j& T3 h2 w  ^. g1 L# u  E( U
  3. require_once './Workerman/Autoloader.php';4 J/ @0 K$ \9 f  s) E
  4. // 初始化一个worker容器,监听1234端口
    / l9 l1 C# q' O% P  |" f0 Z- s
  5. $worker = new Worker('websocket://0.0.0.0:1234');' T* o9 i) E: o" z1 o/ H# E% V0 |

  6. ( J% b' |9 f% s- X% G; J
  7. /*% r: n7 L2 K8 B, z
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误0 F1 ^- c! U. z4 O3 z
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true); N! E; S  r, q. }, m+ H" j
  10. */
    % M) _5 w% M: L% O1 g% U, v5 R
  11. $worker->count = 1;' c% B+ G- M% K1 [" g
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    3 Y3 x0 K% k: J& {
  13. $worker->onWorkerStart = function($worker)
    " {. k; j) a$ _+ R1 B' H
  14. {3 b. V- f9 W9 M
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    7 |; k- Z  k% M; Q8 V! d
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');( m7 }! K. D$ y" m# w# B
  17.     $inner_text_worker->onMessage = function($connection, $buffer)9 V1 f' t7 U% m  X: y
  18.     {
    8 V* l( l; F1 M0 ?1 o6 o
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据% B1 T+ C- ^& p7 F8 N
  20.         $data = json_decode($buffer, true);
    2 o% b% c! F4 X8 k
  21.         $uid = $data['uid'];
    % y. Y& `9 u# g6 G$ `
  22.         // 通过workerman,向uid的页面推送数据0 l, P8 P3 ?; q
  23.         $ret = sendMessageByUid($uid, $buffer);
    ' p. t& n3 h/ L  }0 y# ~
  24.         // 返回推送结果
      z; W) w. Y' m% l
  25.         $connection->send($ret ? 'ok' : 'fail');
    2 A3 x& h% @$ x; g
  26.     };
    " B5 R9 c8 v& X5 d. z' \" B: `, S+ _. X
  27.     // ## 执行监听 ##
    3 }& W2 R; q; Y5 G2 T
  28.     $inner_text_worker->listen();( k6 d. v3 X9 d! S/ g3 ~
  29. };
    . H& t& `) X1 k$ O) z2 D6 D6 h/ h
  30. // 新增加一个属性,用来保存uid到connection的映射  I' P8 T+ l, b1 i6 U8 @
  31. $worker->uidConnections = array();
    % l! a& d/ [* d1 L* ?( k! Y# s
  32. // 当有客户端发来消息时执行的回调函数
    , ~( O$ S- x- F: Z5 ]
  33. $worker->onMessage = function($connection, $data)& ?8 P4 v5 }. T/ U, v* }
  34. {
    * `/ W! }9 G. j7 N. L' q4 a. O
  35.     global $worker;; j* E3 N6 V! P9 V. V
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    8 `( q" N1 O* `0 f9 Q$ A
  37.     if(!isset($connection->uid))
    , t2 S* Q2 }) N% P+ m- q
  38.     {$ `# k' V4 p& f4 i
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
      ]7 L5 M7 t2 ~! r, k% Q, m$ S5 F1 m  F; J
  40.        $connection->uid = $data;8 H% A: f( x& p$ K% D
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    / ?7 ^/ `/ V6 {( Z/ X
  42.         * 实现针对特定uid推送数据
    , E9 i9 d$ u+ H9 ~' R" w: u$ l
  43.         */) x- T3 |  Q6 O, Q9 v% W; n
  44.        $worker->uidConnections[$connection->uid] = $connection;  u3 w$ J4 h9 N8 J
  45.        return;
    * [! M  j# S+ P+ r
  46.     }
    8 J2 Y& ^( B+ J
  47. };
    3 a1 H3 M; q# r
  48. + V3 m( ~1 Z( a4 W2 n
  49. // 当有客户端连接断开时( y  b* v* C, ^& y5 w. I5 [* G
  50. $worker->onClose = function($connection)
    # F. y+ J% m  w# K$ y
  51. {7 R) N. S1 _3 R- |3 d
  52.     global $worker;( X- G2 J6 g& C( h1 U
  53.     if(isset($connection->uid))6 u6 y2 y2 B' q5 G( b, N
  54.     {! c( A5 G7 q! \/ d/ [
  55.         // 连接断开时删除映射
    ) @% t  E  i' c
  56.         unset($worker->uidConnections[$connection->uid]);8 X5 ?4 U% q) \7 t% a1 s" U
  57.     }
    $ _8 z0 p. n/ Z+ a9 k
  58. };5 S& [. x9 p- @" F' u3 J! i+ ]

  59. $ D. `' F/ t# W$ P- R
  60. // 向所有验证的用户推送数据
    % Y& ]5 J) T8 U9 {* \
  61. function broadcast($message)" ~4 a8 n5 w; f. s; m% r
  62. {
    % s) `/ _* l- x/ V
  63.    global $worker;: Y& N/ g, m4 l/ s- g7 ^
  64.    foreach($worker->uidConnections as $connection)
    1 v3 B6 r( o# ]/ N  U6 P3 h7 k
  65.    {
    & h6 U: P8 L, j, n" Y, s
  66.         $connection->send($message);, P5 _! g/ |9 B
  67.    }
    - P# @$ G) U  t0 F& u8 G
  68. }
    ) J8 t6 W1 Y# @4 F2 Y+ W
  69. ! p0 C- d, G  D- F: A2 X" V
  70. // 针对uid推送数据8 D0 a9 s1 @2 m% Z0 _- k, ?$ K- L
  71. function sendMessageByUid($uid, $message)0 n3 D( w; w9 o# m
  72. {$ L8 M* M& T- r( n9 M. j6 k
  73.     global $worker;: m: J. |* e  o3 b* j% J& J& ]
  74.     if(isset($worker->uidConnections[$uid]))5 Y% |  _- E" \' k$ ?3 R
  75.     {. w0 V# X2 k, [3 {. j
  76.         $connection = $worker->uidConnections[$uid];
    ; u. W, D8 o+ T* l+ z; U# i1 G) S  \
  77.         $connection->send($message);0 a3 V, Z) x: l, ~9 K$ `
  78.         return true;
    * p6 s8 k$ u7 H- T/ N3 ]
  79.     }
    3 e- {+ }% M# @- i
  80.     return false;
    # `( j/ Y# o3 h+ q
  81. }8 T6 Y. g, k. k, d( n1 @1 D

  82. , g$ V' K8 f) K# y
  83. // 运行所有的worker
    8 h9 J" a/ k* I; n, m, j3 T
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');' Y% I5 g3 M0 V' t
  2. ws.onopen = function(){0 ?$ J* H# X# n
  3.     var uid = 'uid1';
    . C+ h& ~1 q: K) H- [
  4.     ws.send(uid);6 R  ?, s# @7 e* M  Y! X
  5. };' \8 K1 o# P% y4 W# ~2 ]" K, f
  6. ws.onmessage = function(e){
    ' |# c" C$ w+ J- Y+ o9 v& s" t# Q: X
  7.     alert(e.data);( F9 t5 `- i4 `  S  f  ^( P4 G
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    - i' v$ X/ [# l& c8 K1 J
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    2 _9 C' T% t/ Z
  3. // 推送的数据,包含uid字段,表示是给这个uid推送9 M+ X2 a# L- [' E
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');, N3 p2 R1 g# w0 Z: ^
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ( X& q7 N* G" I; T3 x! z* q
  6. fwrite($client, json_encode($data)."\n");
    ! m) J" M/ X% F2 }) u8 D2 p; Z- a+ d
  7. // 读取推送结果1 a* N5 W  G( n8 l% S( y
  8. echo fread($client, 8192);
复制代码

! G1 D7 U$ ?+ h2 q
5 I( L# X2 B0 X2 A1 @
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-6-19 22:43 , Processed in 0.052148 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!