您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14804|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;- A* g1 N, V; M) E4 P
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ( j8 j2 a! ]( K* Z7 z
  3. ! F3 x! F3 S5 F/ z$ \
  4. $worker = new Worker();- K2 U% u+ c! P1 Q5 O; |* e% e
  5. // 4个进程
    ) K1 E; d$ Y, e" _
  6. $worker->count = 4;
    " @* P  E  G, K8 S; o7 M- `$ P
  7. // 每个进程启动后在当前进程新增一个Worker监听' f! L! }1 M0 f: [  T4 ^/ h
  8. $worker->onWorkerStart = function($worker)" _, D* @7 f5 p- o2 A1 t
  9. {' L0 O- D7 z3 m; z9 e' H' L
  10.     /**) s5 z9 @; R8 ?
  11.      * 4个进程启动的时候都创建2016端口的Worker* Z3 h- v. G) F5 N& R
  12.      * 当执行到worker->listen()时会报Address already in use错误5 N0 o- R6 a8 m3 ~
  13.      * 如果worker->count=1则不会报错6 `/ T" u  _; _/ j5 v' `
  14.      */% a  S+ w  {1 K# k
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');  J/ L. O9 V' z. M3 b  A
  16.     $inner_worker->onMessage = 'on_message';
    2 s) _  p) v  }
  17.     // 执行监听。这里会报Address already in use错误
    ! ~* O7 g7 L; O  f) N
  18.     $inner_worker->listen();# p: f" W+ ^9 e  |' ^
  19. };6 R* D# F7 p. X. x0 Q% F
  20. # r- J) Z, ]6 u+ H! D  F2 o
  21. $worker->onMessage = 'on_message';
    . |2 B) Q. R* Z3 `. B
  22. ( k8 ]3 t4 V1 X9 v! i& h5 f3 Y, J
  23. function on_message($connection, $data)
      i4 y# S7 C2 i* u) l  i& K6 P
  24. {
    9 v; N6 q1 Q/ h2 c6 ]: \
  25.     $connection->send("hello\n");
    6 o9 X4 \: z2 c' Q, H
  26. }6 m0 J( O+ G( H8 j2 c- E+ W

  27. ' S" j: h3 I7 ?4 O9 j/ q; Z
  28. // 运行worker; E$ \5 ]: R# h, Q6 n; \
  29. Worker::runAll();
    ' M; {# E! z3 g3 \/ A8 d
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:9 X* W5 b1 ?; [3 M+ Y9 S
  31. ) o* A' ]7 h: X
  32. use Workerman\Worker;
    7 T6 z( x3 \& _2 r1 N
  33. require_once './Workerman/Autoloader.php';
    4 J0 Q9 @) Z) Z$ Y) I4 v3 w7 X5 c

  34. ! {1 e  A, e0 H4 D7 Y
  35. $worker = new Worker('text://0.0.0.0:2015');) C- d$ K0 z0 g  N  U
  36. // 4个进程
    2 S1 N. p% M# r' N8 V* _
  37. $worker->count = 4;
    ! U- h1 d5 ~: J9 d* Y
  38. // 每个进程启动后在当前进程新增一个Worker监听: d1 ?5 }+ u: p8 m
  39. $worker->onWorkerStart = function($worker)/ A/ P" }' M; O. ^
  40. {2 l8 \! [$ r6 ?. q3 ?
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');% E' |" w$ s6 K1 B- Y3 o
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)7 R% ^4 w& ~! L5 @8 H6 G
  43.     $inner_worker->reusePort = true;
    ! G7 o7 T& w0 e
  44.     $inner_worker->onMessage = 'on_message';
    $ t3 P5 i7 ~  Q% s
  45.     // 执行监听。正常监听不会报错# p. U' i3 C6 _& [' i+ N
  46.     $inner_worker->listen();
    : o% ~6 t$ l- j
  47. };2 I  V  \4 L) q4 r4 i4 @, C
  48.   a4 D7 W% |! J+ _
  49. $worker->onMessage = 'on_message';
    ) H, k! Y: F+ A; j9 d$ d

  50. * X; A, B( f  c4 I1 A. i
  51. function on_message($connection, $data)  N) y" t" C- m- W/ [' e
  52. {
    % H' Z4 s) [! \& e
  53.     $connection->send("hello\n");
    + u) |7 j. u/ G
  54. }
    / L, \3 @7 N! f

  55. ' M( m0 {4 n" `
  56. // 运行worker, p/ j" Z1 I+ L' h
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php. ^# }2 E, Q% Z- q3 ^
  2. use Workerman\Worker;
    6 i5 o9 L) i$ O, W( M- p. R" s' ?
  3. require_once './Workerman/Autoloader.php';/ L8 ?/ c. `3 \
  4. // 初始化一个worker容器,监听1234端口
    / _9 h- a: Q6 G5 W0 M
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    3 ?% D, j1 _( k& E* F. t4 G- M

  6. ) H- V9 z; C- s! i+ h
  7. /*
    6 Y! @% I& R3 H0 O* a+ Q9 ?# X$ a
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    8 E  {; c: L. p$ f
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    + g0 T6 X$ O; e0 L: I2 [0 V8 j
  10. */
    . C+ C% {8 d& d( q& w
  11. $worker->count = 1;
    - x, T+ P& U; K& ^# T4 d, }" |
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    + A8 I5 a/ j  y0 D! X. x- y
  13. $worker->onWorkerStart = function($worker)
    / _0 ~' f! l0 m# w1 }7 G  v
  14. {
    " @3 l! D: {8 T# n. m
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 J+ e1 r- w' ~8 o. I# M
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');0 X1 H+ Q" I: j% B+ O" g
  17.     $inner_text_worker->onMessage = function($connection, $buffer)% o) S5 n2 X+ X. F1 q
  18.     {
    5 w( i4 J. w; s; G: r) c+ ]; s
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据, i3 S/ S2 {  W+ j1 J" @, V8 s) f. ^
  20.         $data = json_decode($buffer, true);
    " a1 J' m# g5 @3 j0 P' w
  21.         $uid = $data['uid'];
    $ ^  ]  D/ L: l
  22.         // 通过workerman,向uid的页面推送数据
    ; C) |; S8 L# l
  23.         $ret = sendMessageByUid($uid, $buffer);$ D) k' n0 ^& v, ?: [  H2 U
  24.         // 返回推送结果
    ' D) {( ?7 {) z7 {
  25.         $connection->send($ret ? 'ok' : 'fail');
    / f4 E7 X* N6 Z1 B3 z6 N$ U0 p8 y
  26.     };1 Q7 L4 t$ O7 B
  27.     // ## 执行监听 ##
    8 s! k* a, g5 T; [
  28.     $inner_text_worker->listen();$ O) I" E0 {, P. }; p
  29. };% D) k2 U1 p0 w8 j2 o0 l) o, U) c
  30. // 新增加一个属性,用来保存uid到connection的映射, _! k. `& y9 w. \9 ^; w2 a
  31. $worker->uidConnections = array();
    7 ~& w+ C$ W: C
  32. // 当有客户端发来消息时执行的回调函数
    ) b, |& P# q  q0 o9 A- \" `  L
  33. $worker->onMessage = function($connection, $data)! Q% R- F7 |# ]4 n
  34. {
    " b! n6 G/ n% [" T! g$ Q0 v
  35.     global $worker;- M( a& Y/ O- P  q" U
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    2 n  K2 L- Q7 `1 `
  37.     if(!isset($connection->uid))
    % h, ]* ~% N, M* w
  38.     {' T7 {/ V; a7 ~4 x' U
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    1 ^. D; }8 c4 g- b# X
  40.        $connection->uid = $data;
    2 Y& u/ J! e" ^
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,- e* t$ d+ B, Q) M
  42.         * 实现针对特定uid推送数据8 O/ r4 ^8 i+ z3 K5 }9 E
  43.         */
    / B! R- f$ ?, ~
  44.        $worker->uidConnections[$connection->uid] = $connection;- c  c5 B: J0 N! ^9 D$ U
  45.        return;
    $ W+ {' [1 h, l, g
  46.     }
    % O& K% U+ U/ C; L1 [( N0 c
  47. };
    " j- ~/ ~8 \; s, {2 k

  48. ' }( q* m/ P- `3 R- @
  49. // 当有客户端连接断开时
    ( Q2 J: Z% ]% h) F$ |5 S8 c! F
  50. $worker->onClose = function($connection)
    2 e1 J9 t! X4 @5 B
  51. {
    / e! O6 X! r0 K. U% o/ a
  52.     global $worker;
    3 H9 n% }: H, s3 d8 F
  53.     if(isset($connection->uid))" |7 `2 M/ K. k. {
  54.     {+ {3 K# P! Q+ R& _: v3 {7 D7 ~+ ]1 Z
  55.         // 连接断开时删除映射' f; C3 W3 g- j8 `
  56.         unset($worker->uidConnections[$connection->uid]);7 \, Q$ W3 E: b% I3 w) g, }9 B5 E
  57.     }% v/ q) h: A; @' r7 \+ s2 X
  58. };5 q/ K6 x$ h* y$ M) v+ S

  59. ! N1 D, ~( a& N' D& q6 h: e
  60. // 向所有验证的用户推送数据; ~! ^- @) l3 a% J1 b5 w
  61. function broadcast($message), P% q7 @6 P1 V% V
  62. {7 @2 J( ~- j8 `- s- C( _
  63.    global $worker;
    2 G) D2 J4 N. `) I
  64.    foreach($worker->uidConnections as $connection)
    % h0 X* a9 J1 Y0 Q: Y# `- z
  65.    {
    ' J& A+ P7 ]0 `" c7 G
  66.         $connection->send($message);
    ' |: ?. f7 h) O1 ~, |7 e1 [# i
  67.    }4 D" a2 N2 f+ X
  68. }6 r/ k( J- ~6 i* l
  69. : z* _$ b! v" z+ W/ |3 {
  70. // 针对uid推送数据+ N; p& j2 T! \, D- K
  71. function sendMessageByUid($uid, $message)' l# ^1 O. u$ @: v
  72. {- h' K) a0 {9 [/ N" S* b+ [! p
  73.     global $worker;4 Q9 W4 p# I( a
  74.     if(isset($worker->uidConnections[$uid]))% v6 m# C! h$ S) l0 V, \6 P$ o* Q$ q
  75.     {, E$ Z5 M3 e( [
  76.         $connection = $worker->uidConnections[$uid];/ Y+ Y/ G* d6 M% u/ R& J
  77.         $connection->send($message);
    , z7 g# A6 P: [9 }; ~
  78.         return true;& F2 r8 l+ h, E1 x; p% [
  79.     }! [7 G# H: M: ]
  80.     return false;
    1 q* S2 E8 Y: @1 |% _# @
  81. }- F+ t, y; v+ U2 I, z; ?
  82. 6 ?4 U% X1 W/ Z0 w6 w% b  I
  83. // 运行所有的worker9 r7 C6 X( y' B- n: S4 Z7 T
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    6 V' u; m5 o3 _% G3 r# ?
  2. ws.onopen = function(){. l) n. ?1 f) X& {; \; v0 t9 Z
  3.     var uid = 'uid1';8 q3 j8 D: m+ J4 H
  4.     ws.send(uid);
    ( T+ R0 `9 _( Y# l6 i
  5. };2 X0 i9 v! p1 O
  6. ws.onmessage = function(e){
    2 U" \. m% e, I3 l
  7.     alert(e.data);
    % g" K- ~( @. h- U  ]3 k
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    2 l) k* w, u8 L" P% r7 Z
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    1 G( C. G* _' y- e  O% S
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    / I+ j& H1 l' k0 w
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    6 h' i; u, `8 P$ b' p/ s. c# b
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    * O: e8 M- G' ?0 ]0 X) S! I
  6. fwrite($client, json_encode($data)."\n");9 W. D. _" f# |" }( B0 I
  7. // 读取推送结果7 t, O' `9 h% [3 Y. i
  8. echo fread($client, 8192);
复制代码
* D/ ^) N+ v' l" }7 ?8 N5 q3 y
2 u$ R. ]6 D; u
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 18:42 , Processed in 0.055247 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!