您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14805|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    + B2 A4 S; ~+ M* [5 \5 J" B
  2. require_once __DIR__ . '/Workerman/Autoloader.php';: O* }/ U! X/ [
  3. ! N/ o5 n  n- J& N% s- O
  4. $worker = new Worker();
    ) B0 ?) ^3 M6 s
  5. // 4个进程
    , ]$ @0 ]/ x+ \2 _( b* [0 B8 z6 @
  6. $worker->count = 4;
    " C" g4 d  w/ C  W! Q5 [
  7. // 每个进程启动后在当前进程新增一个Worker监听
    ) z* |3 Q0 D1 g' Z2 I4 Z9 a
  8. $worker->onWorkerStart = function($worker)
    3 O9 p6 Y2 R' I9 z0 h0 A/ P
  9. {4 c& e0 Y0 y# J) Q- S+ ^7 e# }6 K
  10.     /**
    - y  a; s% \5 C  R
  11.      * 4个进程启动的时候都创建2016端口的Worker9 o. W& I; X: Q( C
  12.      * 当执行到worker->listen()时会报Address already in use错误9 ^& z6 B" ^: O  H" G
  13.      * 如果worker->count=1则不会报错
    9 ?' S! G8 W: Q+ g
  14.      */
    , o" ?! W/ l% ]' E" m5 u. w
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');. Q  x  _) w8 P& Q
  16.     $inner_worker->onMessage = 'on_message';4 A8 f  k5 m2 P- ]
  17.     // 执行监听。这里会报Address already in use错误
    1 ^/ l+ G, z. E8 k2 \
  18.     $inner_worker->listen();
    1 Z+ P' h8 f3 r( U1 X4 k. F# s
  19. };
    ! T+ j& E$ \$ e
  20. ) M" H2 J4 y5 y) J  P2 v
  21. $worker->onMessage = 'on_message';+ H& O( S8 I6 T) Q2 E$ i

  22. , m" k( T1 l- H! o* k
  23. function on_message($connection, $data)
    " W) X) R& }, D! ]( V9 E( b/ k
  24. {
    % `2 b7 {  f7 ]. d
  25.     $connection->send("hello\n");
    / M% d( V$ M4 k1 l
  26. }
    . A( L0 c. x) Y+ Y, v

  27. 7 e4 @5 f3 I9 s
  28. // 运行worker: d/ l# ?8 V, T$ \% R8 U
  29. Worker::runAll();
    3 L. [7 j0 J6 ?) Y* h; @! k  Z
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    6 w7 c# r. h. S1 Q

  31.   `3 W6 M# N1 a3 J& H
  32. use Workerman\Worker;
    6 ^# H' l6 c4 S+ i
  33. require_once './Workerman/Autoloader.php';/ J' w$ o, k- g! n7 H0 B
  34. 0 {, v7 }9 F/ S9 h2 P+ o/ ]
  35. $worker = new Worker('text://0.0.0.0:2015');
    / C: Q0 r8 {: D& ^& Q# u# g
  36. // 4个进程
    . l: e+ {1 k4 f6 m
  37. $worker->count = 4;4 n  ~- S( v4 x) |3 X
  38. // 每个进程启动后在当前进程新增一个Worker监听
    2 c3 x0 _8 k" f. _  E, z
  39. $worker->onWorkerStart = function($worker)7 M# C+ @9 ^" J6 K
  40. {
    1 \; t. a* Y0 M; T6 p6 O. e
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');0 h9 e: f) V/ o( p; Y3 N
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& _  L) U, v  t! m0 d7 H
  43.     $inner_worker->reusePort = true;7 v( E, B' [( O% F$ J
  44.     $inner_worker->onMessage = 'on_message';# W0 ]% y1 x, ~8 o: c9 O# D
  45.     // 执行监听。正常监听不会报错
    ( X0 d+ B. s) h9 K- L9 a
  46.     $inner_worker->listen();0 W0 r9 L( y7 [
  47. };
    / }$ ^* @9 ~$ v) J
  48. # u3 u4 H( S. {3 N
  49. $worker->onMessage = 'on_message';
    : t1 X2 B7 m4 x) K. ]& j

  50. / L. V1 J1 T# r& y8 c& ~
  51. function on_message($connection, $data), |+ g5 \0 h6 {  b0 w+ g/ H) `
  52. {! W, Z0 ~6 j9 ~% x3 J
  53.     $connection->send("hello\n");2 k$ L- D  N  e0 _: j) _9 e
  54. }
    ) C/ U: v% z% x& Z' G

  55. % I$ v7 o  a! r1 m$ m
  56. // 运行worker/ [. V9 c4 {7 s% D( J$ m
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php' m# x" l8 `& l! x
  2. use Workerman\Worker;9 H% T5 |6 a7 K! T; {% Z
  3. require_once './Workerman/Autoloader.php';  E, Y1 m  X2 `- H
  4. // 初始化一个worker容器,监听1234端口2 V2 C7 Z& T, ]! D# Y7 z
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ! ^! [' G1 }8 H. o- N( v- d# R

  6. 9 s0 ?/ m5 q* r  A
  7. /*/ |1 I6 u; c3 x) g. M
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误  w1 Y9 G" ?" C& f" q2 o  _  d. v
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    9 g* I% N4 M8 x; E$ \" F2 p5 i
  10. */
    ' e5 R2 _2 \9 U; o( ~$ v! {
  11. $worker->count = 1;6 j6 }5 Y  ?. E  @1 [& s% O9 W7 l- o
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 z- I5 f) B* R( A
  13. $worker->onWorkerStart = function($worker)* j, i* e& Y8 d; q
  14. {+ U  {7 ~( s! _8 s
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    9 R- @$ ~2 K/ |
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    7 p$ B: E9 R5 H
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( m: h! w2 B3 `2 l( {# r
  18.     {! ], i' E9 g" l$ a3 O. N
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    2 W! G) O$ h# L! j& ]
  20.         $data = json_decode($buffer, true);
    8 q! T* y+ v5 ^& d: \+ a& I
  21.         $uid = $data['uid'];- l4 L& _% _/ J
  22.         // 通过workerman,向uid的页面推送数据
    ! }" }$ \; p3 H+ F% _2 n; n
  23.         $ret = sendMessageByUid($uid, $buffer);+ l4 l9 B. D6 j4 s, \; o7 {
  24.         // 返回推送结果
    0 p2 C6 S+ P3 p, @
  25.         $connection->send($ret ? 'ok' : 'fail');
    , m' t9 E9 n0 j" F! _* A7 s( ?
  26.     };- t" i2 `2 w' T
  27.     // ## 执行监听 ##/ \$ M4 U2 k9 p& w* `* M8 B
  28.     $inner_text_worker->listen();6 ?+ {+ M0 X0 p6 ?4 w3 k
  29. };4 Q  N6 G# u$ _8 d4 O
  30. // 新增加一个属性,用来保存uid到connection的映射7 N3 l% a, M# T) e& e9 T0 g! i
  31. $worker->uidConnections = array();
    0 [) m# v) y8 G' H
  32. // 当有客户端发来消息时执行的回调函数
    * K7 ?! Q' F3 t, [* X0 B% ^' e
  33. $worker->onMessage = function($connection, $data)6 y- H6 h) J- n
  34. {8 y8 K* `1 i' S9 X7 _
  35.     global $worker;
    " C& Z: ]' G! ~/ W0 G
  36.     // 判断当前客户端是否已经验证,既是否设置了uid' Q9 ]' L4 T. F8 F
  37.     if(!isset($connection->uid))) G& h/ r' N( |+ Q4 a
  38.     {! H1 A2 J4 E7 G
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* |- e6 }* m1 P: P; m7 ~
  40.        $connection->uid = $data;
    / c) k3 d4 f( F3 C. M& l* T
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,: l. l+ ]& o' f: Q6 C  k) l
  42.         * 实现针对特定uid推送数据% h' c! d" ?+ y/ ?; O
  43.         */( F9 T6 r  o( g. ?
  44.        $worker->uidConnections[$connection->uid] = $connection;
    : v8 K3 }2 C; R  \/ k7 V* h
  45.        return;8 u5 i) `; ^7 g8 Q5 b+ l& T# i
  46.     }9 @; A( s" o8 O0 G  K' f; A% i
  47. };
    , x3 A. u% B6 q. f+ |2 e
  48. 0 D# \/ t) [+ Y1 n
  49. // 当有客户端连接断开时
    0 E( ?) s8 f) O5 K8 G0 I
  50. $worker->onClose = function($connection)
    ; A  s! h  |# Y4 z; L8 T; J) o% t
  51. {' A" ?$ ], @7 x: U, [
  52.     global $worker;, Q0 g+ v% J0 e4 I8 {
  53.     if(isset($connection->uid))
    0 ~! {: I4 a6 D0 [4 j
  54.     {9 R' Z: ~) `; c& q; C9 K
  55.         // 连接断开时删除映射- ?/ z1 ]7 f6 T! ^
  56.         unset($worker->uidConnections[$connection->uid]);
    4 K( k" S2 P$ ?' F
  57.     }; @: ]* g* V) j3 Y1 A
  58. };
    . d3 y0 t, {$ t/ N- n
  59. ) e- m% R5 K9 ]
  60. // 向所有验证的用户推送数据
    / h0 y7 s0 J+ ^. L7 s( i9 K/ [
  61. function broadcast($message)
    2 R8 _4 S8 d8 D. n: o+ A; |
  62. {
    5 }. a; ]! O; R) f3 e( M: x' s
  63.    global $worker;
    $ L% p# O6 N. Q9 r+ e- W
  64.    foreach($worker->uidConnections as $connection)& r; Z1 F! y! g* O$ S% ^
  65.    {
    0 V1 e# M0 V! v5 K
  66.         $connection->send($message);9 o: O. |' |# v7 C
  67.    }. @3 @4 S2 e7 o; d4 H9 W, R
  68. }' Q0 V- A' @0 s9 u0 p

  69. ( |7 J+ Q/ J0 Q! \: ^
  70. // 针对uid推送数据8 f' D3 Z$ Q2 M* j' i& z; [
  71. function sendMessageByUid($uid, $message)
    2 F+ h9 z0 p( b
  72. {
    / v0 I, s% P, ^5 ?( R
  73.     global $worker;
    ! F8 d0 z, l$ j8 M  h* @
  74.     if(isset($worker->uidConnections[$uid]))' L8 {& j7 ^% o2 A* i- Y
  75.     {
    $ P7 @5 a8 M- B( M
  76.         $connection = $worker->uidConnections[$uid];7 u; P3 W9 R& P1 x
  77.         $connection->send($message);8 w  S* c+ L: {6 ~& {+ _, c
  78.         return true;
    ' O3 I+ S6 A: P- }1 g
  79.     }! v7 w, \0 }6 n' ]$ c' V( B7 z# b* p
  80.     return false;  p4 l1 j4 ]. H( M  s
  81. }
    7 Q6 p# d! B6 E( C# y3 ~* v

  82. : m; V) H" d  V: w
  83. // 运行所有的worker7 k7 p. u  K9 k% ?
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');: A& i0 Z# a  g$ q  ^2 A
  2. ws.onopen = function(){: t" W5 ]% G0 v# L1 y& F
  3.     var uid = 'uid1';& e0 ?( s7 Q% s6 V, S. B/ a- X
  4.     ws.send(uid);% ~* c& f" H3 l+ j, D
  5. };
    5 U+ [* h9 T/ Y, D: {' A
  6. ws.onmessage = function(e){* b  Y4 [  H5 M
  7.     alert(e.data);
    5 W9 p7 |+ O8 L+ y5 p# A8 a
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口' M4 J  N4 M. V5 v
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);- q9 {* j& g0 P( m# a: a
  3. // 推送的数据,包含uid字段,表示是给这个uid推送9 y5 v  m: b" j, n
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    ' v( u( S. k# O! d! i4 d  U
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ o8 {: N4 v9 w/ w
  6. fwrite($client, json_encode($data)."\n");
    ) X1 J1 P# v, G
  7. // 读取推送结果
    , F0 E1 R7 Q  N. `: {  D9 i$ n
  8. echo fread($client, 8192);
复制代码
9 I1 }* m- C. B

; Z' a( i- f0 R( Q  z
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 18:45 , Processed in 0.087087 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!