您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10404|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;& H3 i4 L( B, {( q+ i* k) O+ o
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    0 s! [% `8 ^( a/ b2 m

  3. , V) o; m9 w' o+ s" g
  4. $worker = new Worker();! H4 [6 M: a  a
  5. // 4个进程8 J* _$ {2 ~, J
  6. $worker->count = 4;
    ; p! v  c' T4 T3 ~
  7. // 每个进程启动后在当前进程新增一个Worker监听) G5 u% L5 l0 O+ ]* M% n& s" @
  8. $worker->onWorkerStart = function($worker)' ?8 ?; j. V- x( L
  9. {/ ~. g2 S3 o$ E. w* q. M
  10.     /**$ c: @+ I* O# w" U2 Q; m
  11.      * 4个进程启动的时候都创建2016端口的Worker) [1 C& e/ |9 }* t
  12.      * 当执行到worker->listen()时会报Address already in use错误
    , f6 H& W0 B0 ?3 t1 B% X6 \
  13.      * 如果worker->count=1则不会报错, W6 @9 D, I, M0 V, ^
  14.      */
    1 L! E  `, z% Y9 n- D8 M+ B4 ~
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');8 }9 G5 |# h8 |1 a- Z
  16.     $inner_worker->onMessage = 'on_message';
    - {) W0 `% v- @9 a
  17.     // 执行监听。这里会报Address already in use错误
    6 X# K& y) I1 q1 s
  18.     $inner_worker->listen();
    # `' l$ l! \0 M0 O# I, i! p( Y% @
  19. };
    , b- o! B+ b% E! O8 r

  20. ; l# M- M8 N  c6 j& {% o5 H
  21. $worker->onMessage = 'on_message';/ h! W' I, m! p
  22. ( S& x9 D# z1 D; P9 d' D, t3 J9 E* X
  23. function on_message($connection, $data)/ Z8 k$ N* N( Y+ R
  24. {& |* r/ q$ f* N9 {
  25.     $connection->send("hello\n");
    . a& D6 ^0 i1 w
  26. }
    0 W0 D0 L8 d+ e  L2 Y9 s
  27. # y: M. \! `9 n0 `" O% @6 ?$ {
  28. // 运行worker
    0 P0 B, o! K3 c
  29. Worker::runAll();
    + x5 j4 p! c7 A7 ^3 Q0 L$ v$ z  o
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:: P8 v5 }1 t# ?9 w( \$ r

  31. 8 K: e/ E4 w# x, @( j0 ^
  32. use Workerman\Worker;/ c9 ~* z3 A% R* S
  33. require_once './Workerman/Autoloader.php';
    % ~" P% m; ~, n3 K% I4 C
  34. ! s/ w2 \2 T) E. n% T  y/ P  @
  35. $worker = new Worker('text://0.0.0.0:2015');0 D0 X; O8 z, w  @
  36. // 4个进程
    % H. v- }: r, J6 r7 O
  37. $worker->count = 4;1 l. X: E% x* V) I: [( R" |7 g
  38. // 每个进程启动后在当前进程新增一个Worker监听
    6 E( b0 ], @& V; [4 j' Q
  39. $worker->onWorkerStart = function($worker)" _' ^' a# M, g, X
  40. {5 ~2 R" N# [' U
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');9 Z; N0 i7 s  l5 }0 U( f3 b
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)2 u2 W% ~2 e$ j& D0 c* B3 m; T& [
  43.     $inner_worker->reusePort = true;
    ' j1 H/ h% w6 L5 Z# G
  44.     $inner_worker->onMessage = 'on_message';7 R1 k' I) F) z% B& n! Q; }
  45.     // 执行监听。正常监听不会报错
    # Y( |9 B# z3 ^
  46.     $inner_worker->listen();$ H4 ?( q" D* q6 k7 t
  47. };# r7 A4 Y- H# @) z' t3 ]" x

  48. & J+ s. C. S1 f- \. `- z
  49. $worker->onMessage = 'on_message';; R" l8 \, h2 Q) ]; s& s, J

  50. 8 J8 H0 V4 k4 L1 Z8 y# G
  51. function on_message($connection, $data); S" s1 j) }4 S7 ^7 Z4 H- ^
  52. {
    ! S! B% r" z% q
  53.     $connection->send("hello\n");
    4 \7 w1 X" v' `6 d1 x
  54. }6 x+ B' H# K6 g( i

  55. ' T* O  E4 v! d3 \
  56. // 运行worker
    ' Y& m! P5 ?9 E' M! y
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php5 x3 I; L8 }* i% N; h) r
  2. use Workerman\Worker;/ Z8 r2 j, P3 W% G4 R0 E, K
  3. require_once './Workerman/Autoloader.php';
    1 m" s- P, a- @1 l
  4. // 初始化一个worker容器,监听1234端口3 e3 k7 Z/ |" }4 _" R: v; y
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ) D# r( S% A. t% [
  6. # m. f0 g9 G% Y" o/ |
  7. /*
    5 {2 N* d' {6 K. J
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    4 P) z% x, r. l- y1 C
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)- Y' G. F5 y1 O2 W% X- W$ m% s
  10. */
    - k/ @+ e/ y1 ]2 F
  11. $worker->count = 1;
    7 p6 u9 M/ }# f+ U! w/ w
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口) k7 B( @2 D, d4 e  }* N' Q( T
  13. $worker->onWorkerStart = function($worker)
    , b, ^! F' Q, y& r' A+ h! c
  14. {
    3 ~' z, b1 c1 p% ]0 b/ O$ ]
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符9 y' a* ^8 P( I: h. T5 w
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');: P2 x: w( @. A/ z* g
  17.     $inner_text_worker->onMessage = function($connection, $buffer): n& d% K4 `) P8 f& \
  18.     {
    . M4 `3 E  b. I: F( |
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据0 I, x" {6 q# M( @; z* v- Q. ?- X
  20.         $data = json_decode($buffer, true);7 z! U. ^0 k0 o- Q
  21.         $uid = $data['uid'];( F4 z' B3 n2 u1 \& n
  22.         // 通过workerman,向uid的页面推送数据. @( ?! T+ }! z1 @8 @5 _
  23.         $ret = sendMessageByUid($uid, $buffer);( m5 K# _9 N- r  {/ L
  24.         // 返回推送结果
      L0 N8 @3 j$ I% l
  25.         $connection->send($ret ? 'ok' : 'fail');
    6 M8 y! W. D( I) F8 a/ d2 S& l6 C
  26.     };  W$ ?$ q8 H- `, J
  27.     // ## 执行监听 ##
    ; t0 }# |9 R, X! g3 U1 k
  28.     $inner_text_worker->listen();) i( V$ @% t1 [: d& G6 \$ O
  29. };
    0 ~4 T/ z0 P/ f- U* c# G4 p- J) Y  n
  30. // 新增加一个属性,用来保存uid到connection的映射
    ' [+ u! x- U" q$ j  O  x
  31. $worker->uidConnections = array();
    ' x# `* z) ^1 G' C. f
  32. // 当有客户端发来消息时执行的回调函数; D# u0 u+ A7 h. o; F
  33. $worker->onMessage = function($connection, $data). U+ M- d$ W1 ]$ v% k# G
  34. {4 U4 {# N( y( k
  35.     global $worker;5 `  T4 T* |4 J- V# L& s. ^' E
  36.     // 判断当前客户端是否已经验证,既是否设置了uid, z  n* A4 n, V: o
  37.     if(!isset($connection->uid))
    $ ~+ F% b( m% y4 c1 J! Q) P- N7 u
  38.     {; F/ v1 {$ K) t7 [. x( d
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)' T& `% G4 B1 Q- R( }, |4 }, ?  Q
  40.        $connection->uid = $data;- z! }: d" |# r6 F6 r$ `5 u
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    # d- N9 G. P+ a- a, |2 e
  42.         * 实现针对特定uid推送数据0 n8 t9 C& ^. o1 g
  43.         */
    # v+ c7 |. X( g  Z2 Y5 ]7 p+ _
  44.        $worker->uidConnections[$connection->uid] = $connection;
    . [0 `: c3 x4 m  J
  45.        return;% x8 Q% e) X. ~; s2 X
  46.     }+ M3 f5 ]8 S4 n) S  ~$ b
  47. };
    % \; a& S4 G5 D9 W, c: i
  48. 5 X) t/ h3 v  Q5 I$ f, s
  49. // 当有客户端连接断开时
    3 I( c( f2 \: i% S) T1 D
  50. $worker->onClose = function($connection)
    / G! F  y' Y1 d6 R
  51. {
    # a! J1 @8 t( ]7 G
  52.     global $worker;- ?& J$ q, M$ L. N7 q) L
  53.     if(isset($connection->uid))
    ' A3 `+ ?. f) V" d4 d
  54.     {1 _9 B. s% t$ y4 y: a6 N
  55.         // 连接断开时删除映射
    + X& z$ V. y9 L- W8 U5 q( b( N
  56.         unset($worker->uidConnections[$connection->uid]);3 A8 n  y7 i+ F" ^0 s/ |: u5 P
  57.     }
    * v+ f8 P6 {; {( F
  58. };1 A) t7 v% x6 r' ^) ?! e, U0 u- P, V* g

  59. $ ~9 k8 q0 Y! V6 g  B/ N
  60. // 向所有验证的用户推送数据& T' n9 X# S6 k1 _" J! ]+ K
  61. function broadcast($message)$ `# I  {: \  n* R
  62. {
    3 t$ Q; x2 {) y$ F7 J# D- S4 a: S
  63.    global $worker;/ X& g( P7 ?% s+ ^4 Y/ k, a, p
  64.    foreach($worker->uidConnections as $connection)
    1 g) P4 _( @% D- o+ Z, }
  65.    {
    1 w) v8 Z( F; O" c" l9 R" c5 E2 D
  66.         $connection->send($message);6 v% [: X3 [3 i) [1 V; A0 v" f: y
  67.    }5 T4 {. v7 q2 ~- q
  68. }( o6 W- A' e2 E1 b7 M

  69. $ J3 ]+ \- q+ a+ u2 h0 Y/ Q
  70. // 针对uid推送数据- t  K  p; [0 g9 m8 `9 _
  71. function sendMessageByUid($uid, $message)
    ( b7 h* z2 |2 v7 q' W
  72. {- Z8 [/ I3 m& Z$ j+ W
  73.     global $worker;' W7 n; ^$ y9 u$ G
  74.     if(isset($worker->uidConnections[$uid]))9 U% Y0 E8 u* ?# E) k4 Q
  75.     {$ f8 \. f! ?& @* t( c" k
  76.         $connection = $worker->uidConnections[$uid];
    " O; V9 H0 D* Q
  77.         $connection->send($message);: Z3 ~  u5 o' U" j3 W! _
  78.         return true;7 Q& o, e8 r9 O+ ^' R& z0 S
  79.     }' y' C; z6 a( l' U: t4 e
  80.     return false;
    # A4 e! l8 J/ h
  81. }
    - v- V8 f# v2 H

  82. - |/ c9 ]/ f: Q
  83. // 运行所有的worker+ k5 s3 i. X* V0 C
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    8 {% K! E9 O3 _, q6 {, L! j
  2. ws.onopen = function(){" a. H) G$ g7 h5 j4 w6 Y
  3.     var uid = 'uid1';0 l7 g1 d& j* U* u7 O
  4.     ws.send(uid);' _: U+ y* U0 i( U
  5. };
    ) K2 n- K: D2 q( J
  6. ws.onmessage = function(e){5 f0 v: o3 `  j+ r2 K$ x* x* k
  7.     alert(e.data);
    * B6 E8 i6 q, p4 r; L, A
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口( W# Y, \9 Q  {- @; f* x
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    # _4 F8 @, Y: @
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    6 ^8 ~, p0 n  E6 I. j/ s
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');( W$ `  R* i4 i
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    4 H# x' k5 y# L0 o) Z! x
  6. fwrite($client, json_encode($data)."\n");
    & E' [8 D) M( \' `
  7. // 读取推送结果
    * I( r& e# w3 ?$ J& |. W
  8. echo fread($client, 8192);
复制代码
" |( q0 V; |$ {, \* M; f
8 P* S8 A9 Y, m# h+ G& C% ?
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-7 21:01 , Processed in 0.132627 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!