您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10511|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;2 i: u0 n) P. d( Y8 P1 k! W
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ) V6 K9 Z# Z, r, G9 \  ~$ O0 u6 \4 R

  3. 8 I3 F4 b! w; L$ Q/ o3 o3 M: h
  4. $worker = new Worker();# y+ x8 K' O7 x: C9 g  P, C0 R
  5. // 4个进程
    # S# X, z9 u, @; I+ }! u
  6. $worker->count = 4;" u2 J% e1 k  F  Z
  7. // 每个进程启动后在当前进程新增一个Worker监听# ~4 j- p' Q. L
  8. $worker->onWorkerStart = function($worker)+ [* r4 l6 p7 O9 x  X; S3 l
  9. {
    6 f0 I  ^  Z# [# s$ P4 ^
  10.     /**
    $ H% K  v$ a5 e8 W! `" |
  11.      * 4个进程启动的时候都创建2016端口的Worker
    - \' g* a  m8 D5 R* \+ I* Q
  12.      * 当执行到worker->listen()时会报Address already in use错误/ \& J- H" Q( _3 I
  13.      * 如果worker->count=1则不会报错; }# R4 r4 @  r; F6 b: y- G7 C1 r
  14.      */
    " ]. J1 p& C3 X) T: P; Z1 d1 H
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    : q( W7 r5 ?9 ^" @
  16.     $inner_worker->onMessage = 'on_message';
    % j# M" V4 D) F" ]2 ^5 U& O
  17.     // 执行监听。这里会报Address already in use错误! a% Q3 k+ p" T7 N# f
  18.     $inner_worker->listen();9 _) q2 |# T- f# v& j0 e! ~; v
  19. };
    1 C; s* z4 H: O' G6 U/ r" l
  20. * c. ~5 I; |$ s1 ?" A8 n1 Q6 H
  21. $worker->onMessage = 'on_message';) l! a) S, b$ |
  22. 5 G. u9 W3 W5 e- P) S
  23. function on_message($connection, $data)
    8 o! s4 W1 R8 F4 R' o6 l! {2 t+ z
  24. {
    * A1 X# ]5 Q7 A4 p+ N
  25.     $connection->send("hello\n");
    5 z1 f1 m, N- _3 @# p, \. h
  26. }
    # ^0 l- v) b/ x( @3 f

  27. + V( R% y6 d$ ^) V" `
  28. // 运行worker9 ^1 {! T7 |: o- S6 G, h- N
  29. Worker::runAll();7 p* a; I1 s: l, r- _
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:1 a# k. f2 y6 U0 n& p) }0 l

  31. , g  M" t4 G0 C) K
  32. use Workerman\Worker;; R8 G$ w6 a! D0 u5 e2 f' P
  33. require_once './Workerman/Autoloader.php';
    % A" J( z9 O# g/ L& u$ A$ t

  34. ( {4 e2 R( K% w; n* I; T, E1 e
  35. $worker = new Worker('text://0.0.0.0:2015');: b) ]1 T3 ]5 Q
  36. // 4个进程9 {% t- x( l; _4 G( z/ `
  37. $worker->count = 4;
    # A4 @1 u3 S3 q1 E$ G
  38. // 每个进程启动后在当前进程新增一个Worker监听# G1 y4 H7 I9 W
  39. $worker->onWorkerStart = function($worker)
    # t( ?; E0 o! y9 D, S( T" S% k
  40. {
    ' x# Y- w' Z% v
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ' {* ~( j; i( A. w' Z
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& i$ \1 J- ]: c8 U% F* ]& f" ~
  43.     $inner_worker->reusePort = true;
    ( U9 E. j+ _9 ~+ h
  44.     $inner_worker->onMessage = 'on_message';
    - I( y/ G& D6 ]/ W9 V; c
  45.     // 执行监听。正常监听不会报错' m6 B8 B7 @% ^7 T
  46.     $inner_worker->listen();9 t2 R3 ~2 h( @9 o9 X
  47. };
    & V* }' ~* Z/ \& l

  48. 2 I' h! s, d3 {( S- ^6 M+ v: ~
  49. $worker->onMessage = 'on_message';
    4 H" B; I: v) f5 b+ w
  50. / g, Y- ]# M# p$ I2 ^! w
  51. function on_message($connection, $data)/ f. q: _' i1 @* t
  52. {& S( v3 r0 C; u) H
  53.     $connection->send("hello\n");2 z: J$ c; l0 m6 Y8 S6 h  {  h
  54. }) y& @' U+ ]& m/ ^* s, D

  55. 4 z0 G9 q5 D/ g  M0 K; g
  56. // 运行worker
    / ^2 \, w: W2 n% n& q4 g
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    , R3 d, Z, F8 G+ Y5 |1 z# z# w
  2. use Workerman\Worker;: ]4 |4 y# F  p: f# b7 I
  3. require_once './Workerman/Autoloader.php';  K! }& e, a5 ^2 W+ g4 v
  4. // 初始化一个worker容器,监听1234端口- Y" n5 T+ ]7 b* J& K* M
  5. $worker = new Worker('websocket://0.0.0.0:1234');) j/ X+ ~8 C9 M" ]
  6. 5 C9 t; E5 n7 h0 b' y
  7. /*
    6 s5 R6 l' A% R' l* e3 b6 T: g
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    . S; g' T4 m9 S1 p  X
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)' @+ H3 ~* N* j  H4 t8 K) d
  10. */: u4 n  h1 x/ @5 g- G
  11. $worker->count = 1;
    ) }1 @, ?' n$ i
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口( f- E) Z2 G4 j) R, ]
  13. $worker->onWorkerStart = function($worker)
    3 D+ T( I- h; {+ g% y  ~
  14. {6 m. K9 Y( @+ r
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
      R1 L5 i' {8 T& ?0 {
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    ( \+ R+ j) W1 J- }4 _
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    4 k0 ?9 n  H, p$ [# }
  18.     {
    , O) f- Y9 `+ ]; a7 x% i- V
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    9 x' U7 I5 s' P: j: P! p! |
  20.         $data = json_decode($buffer, true);
    - r' [. h/ E, \9 f4 |
  21.         $uid = $data['uid'];
    4 c4 E+ |- g+ |* @* V
  22.         // 通过workerman,向uid的页面推送数据
    3 x6 [( o% M; k2 {
  23.         $ret = sendMessageByUid($uid, $buffer);
    ) ?2 f, u& [1 i- D: i( h- f
  24.         // 返回推送结果
    1 X1 E- r" N/ z+ ~& |
  25.         $connection->send($ret ? 'ok' : 'fail');
    : w3 N1 q) B5 [- U& z$ q& o
  26.     };
    % r3 [( q5 B) c" o/ g+ `
  27.     // ## 执行监听 ##0 [' `; W' {$ Q5 `( o
  28.     $inner_text_worker->listen();
    % n3 C" x: U, d, v2 k4 J6 t( O! `1 B2 i
  29. };
    . S2 j9 J& D/ `& ?4 b6 {, j% O
  30. // 新增加一个属性,用来保存uid到connection的映射
    7 F( J& L5 x" A$ }
  31. $worker->uidConnections = array();; e1 m" F& O6 f$ T" N, B8 `( v
  32. // 当有客户端发来消息时执行的回调函数& k8 S9 ^2 g0 b" F/ P+ d
  33. $worker->onMessage = function($connection, $data)
    " l- D1 v: Q5 b# }& f" I4 B) }. ?0 m
  34. {
    % C; t: f( V/ n0 H3 B! E
  35.     global $worker;- \( _5 |5 B( m2 V
  36.     // 判断当前客户端是否已经验证,既是否设置了uid& B  M. o9 A3 b' b
  37.     if(!isset($connection->uid))
    . y1 e+ i# B- a. u( u2 ^& p
  38.     {
    . \% q5 K( X, m  ^6 j, s3 q
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)) _  Z! e5 A, v4 _' [$ J
  40.        $connection->uid = $data;
    3 q2 N. f2 ^7 ^4 Q: r3 U3 Q3 }
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    / t6 S2 d, f  r5 ^6 V
  42.         * 实现针对特定uid推送数据
    " y# x1 {. F7 ~/ E9 G9 ~+ T
  43.         */6 b8 S( X( D& n$ y
  44.        $worker->uidConnections[$connection->uid] = $connection;
    - O9 p( g7 F! H, {0 n7 @/ F
  45.        return;, M1 B' M' C7 y8 x/ b: `
  46.     }. i+ R( U" M7 \/ z
  47. };1 a8 S6 Z1 U. ^- I( f2 D
  48. 4 i- g. \$ O5 m; W& D$ V
  49. // 当有客户端连接断开时
    ( ^7 I9 Z, S5 D) y
  50. $worker->onClose = function($connection)) X4 ^" F% G9 d. |: C. @
  51. {
    1 \( ?. ^, m0 w& W7 m( ^
  52.     global $worker;4 |6 e( k/ V* X
  53.     if(isset($connection->uid))4 \+ s" x9 Q" l( b1 @- \; d
  54.     {8 s' I, @4 K  f, p3 S
  55.         // 连接断开时删除映射/ \2 W: q  V& @6 a$ A
  56.         unset($worker->uidConnections[$connection->uid]);
    6 `1 ~, {% l) Q/ q0 i3 }
  57.     }
      e+ m  u; a  F* A% G9 Y
  58. };& l3 {- g5 s0 [
  59. * t9 H! `& W. R5 F: R
  60. // 向所有验证的用户推送数据( W+ {' ~1 Y5 x
  61. function broadcast($message)5 d4 V) \+ `$ |9 i3 @4 ?
  62. {
    , j" O  z& u9 c, X  \! L# i8 e4 L
  63.    global $worker;, S0 I* T8 N: D$ G3 T; K. u
  64.    foreach($worker->uidConnections as $connection)
    ) I2 d  A5 N3 G  Z
  65.    {
    ( ^( A, n9 E- T& z
  66.         $connection->send($message);
    & r$ n% w8 H9 t: \
  67.    }: O3 V& L+ S4 S0 |# I: s* f
  68. }8 d0 {/ G+ J* V$ N* I
  69. * `; W; Y! T" b- M9 B% o5 B
  70. // 针对uid推送数据% \  q7 e6 K7 [3 u' Y" l
  71. function sendMessageByUid($uid, $message)+ H2 R! j" ^  x0 W, Z( {
  72. {# _5 E  u7 a: j! P  _# c  I
  73.     global $worker;! }; E7 T9 S5 Y$ O
  74.     if(isset($worker->uidConnections[$uid]))8 h) I$ f* F6 g, u6 w$ r
  75.     {! E3 u; j7 x, y8 A0 a
  76.         $connection = $worker->uidConnections[$uid];, H2 p/ H  d/ w$ T9 |+ m
  77.         $connection->send($message);
    & _5 Q' \3 X. o& k5 K
  78.         return true;5 \; Q) L; {' Z1 z$ m
  79.     }
    6 k; Q0 _" f' ~
  80.     return false;! i; a1 R7 q6 n* Z8 g
  81. }
    - G1 H5 y( N) o2 A

  82. 1 S  P% b) [6 U
  83. // 运行所有的worker# @5 a: t4 u" t4 Q9 I
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');1 A; H7 h" A3 K  b/ [
  2. ws.onopen = function(){
    ; R5 c) a! w. s, Q9 u" d/ J
  3.     var uid = 'uid1';
    4 O" o* d. S5 s, e4 d* X
  4.     ws.send(uid);8 Q8 n0 e# i- \2 S( V$ e
  5. };
    ; n5 L. U' T% p$ A- {# z
  6. ws.onmessage = function(e){
    ! d$ i& y) I, b- _$ F# B" Y6 k* ~
  7.     alert(e.data);
    9 ^* G& a( e! ]; m  U5 H1 ]
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口# ^; W+ V, C7 C" w
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    9 D1 p& R! I6 f1 H7 G8 E
  3. // 推送的数据,包含uid字段,表示是给这个uid推送5 Y" R+ j2 {: }4 T. c
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');+ o9 P) |. H8 c+ |3 f
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ( m* p$ j& J6 ~9 W9 j
  6. fwrite($client, json_encode($data)."\n");
    / v3 m" o- L& a0 E" O
  7. // 读取推送结果
    : A1 \" l  c& N2 n1 L7 Y8 p0 ~, E
  8. echo fread($client, 8192);
复制代码
0 I  y+ K2 d, a. X; z

; k$ F- L. {& W# ^0 T
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-18 21:12 , Processed in 0.118580 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!