您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14665|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;/ m5 c7 }# j4 X6 ]/ N* h
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    * ?" v9 y) F& G! Y3 l' X
  3. - [) Z0 V6 ]6 J! l4 S& j
  4. $worker = new Worker();
    * v* z& v, \/ k7 M6 F
  5. // 4个进程
    , M* \8 }8 i" h5 m' S
  6. $worker->count = 4;' e( a9 ~. {4 p/ j; O
  7. // 每个进程启动后在当前进程新增一个Worker监听' S# c; l1 [4 c+ \0 `. T
  8. $worker->onWorkerStart = function($worker)( _7 U" K. m& V  V3 `# H' E$ j
  9. {: j" S, B/ a4 f' O) @
  10.     /**
    0 ~) ~/ L  H: v, a1 b
  11.      * 4个进程启动的时候都创建2016端口的Worker! {6 d3 Z6 Y6 h* I, X
  12.      * 当执行到worker->listen()时会报Address already in use错误
    9 J# T$ D; C1 T1 m) S, ?, h% s  c
  13.      * 如果worker->count=1则不会报错; V. Y7 r5 X' D: K. t8 O7 C
  14.      */
    . d- g; @7 ^4 O
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');. [, g2 }9 @9 V
  16.     $inner_worker->onMessage = 'on_message';  C) f2 Q5 R6 _9 t" F3 J" J
  17.     // 执行监听。这里会报Address already in use错误- o. F, f0 u) s% Q
  18.     $inner_worker->listen();
    9 G! N- }3 I4 M& m2 k" b6 u/ s4 t" D
  19. };% \/ I0 o" z" A
  20. & [0 w7 o: q* k0 A% H
  21. $worker->onMessage = 'on_message';3 X* c, |1 N: @4 ^1 w4 [1 m& F- c

  22. ( e- ]( X6 I5 B3 C: r1 ~7 I( @
  23. function on_message($connection, $data): G( y& C7 S" m1 A
  24. {, s1 R9 q" W6 f0 e, i. |
  25.     $connection->send("hello\n");
    8 \, Y, p; o: e* m7 C& B  q4 ^! R
  26. }7 f9 Z0 p8 @# {2 j& e+ ~' R

  27. ) @: ]# E% L* E
  28. // 运行worker. |2 m8 u8 j% [0 v. B; J
  29. Worker::runAll();, I9 J- \+ B, T
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    " f( L8 C1 @. U7 ?9 ^1 \
  31. 8 ^  \3 @# b2 Q8 `+ ?4 o
  32. use Workerman\Worker;
    7 W* S: [* H. |& t% h- r
  33. require_once './Workerman/Autoloader.php';; D! q! @8 [$ x! [+ [

  34. " T9 s! t* d2 y) s# L% }
  35. $worker = new Worker('text://0.0.0.0:2015');2 |" q/ q" y8 m8 P4 {, {5 c
  36. // 4个进程
    $ j& Q4 X3 g9 Y
  37. $worker->count = 4;& \# e9 E8 R' W; D5 P
  38. // 每个进程启动后在当前进程新增一个Worker监听
    3 X5 G' K7 S/ P9 i1 j( r
  39. $worker->onWorkerStart = function($worker)! v& p% }& k5 H7 i3 R, {
  40. {% \  a& A( P. M+ `% C
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    - s5 K0 l6 v$ P, A4 \
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    ( g3 O0 D: F/ w  K# ~5 C) j- i
  43.     $inner_worker->reusePort = true;
    $ h7 f6 z7 _& M  O1 _( ?
  44.     $inner_worker->onMessage = 'on_message';
    # [5 d; ?" f+ L# W" x
  45.     // 执行监听。正常监听不会报错/ X9 l' W: d' i% _3 x; d
  46.     $inner_worker->listen();0 v1 E" N% L& C; E' G
  47. };
    : s" s# O% ?7 L7 w
  48. " [0 k$ p. a) o- e* _+ {
  49. $worker->onMessage = 'on_message';' H6 l7 x7 ^; ]4 g# J; P
  50. / X! o  [; \2 K: {3 m9 `
  51. function on_message($connection, $data)
    * F  \/ \7 Z# B# {' p" K5 I* t
  52. {
    ( p7 P* L- l5 y, ^2 p% x+ [
  53.     $connection->send("hello\n");
    ) N7 k* _8 k2 ]! ~) x
  54. }, L& r; Q. b- ~7 r9 i3 s0 y
  55. 8 F8 q# W% D3 }$ ~& ~( o
  56. // 运行worker
    0 t; g6 \6 u* z& t. x6 z
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php& P8 {. T+ H/ s& x6 j& Z& y; l
  2. use Workerman\Worker;  b9 G* B3 Z: E7 c* ]
  3. require_once './Workerman/Autoloader.php';" F$ z" k) {: L
  4. // 初始化一个worker容器,监听1234端口
    1 A" Y) {, W" r& B
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    - R7 E$ f/ K# T0 Z, S. ^& ~

  6. ) O' u1 I9 [2 T) x4 m2 E
  7. /*
    - w1 i( W- t, S3 t+ F
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误( j5 V  C5 E: F) n) i) T5 b; |# v1 e
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
      \7 Y# Y9 ]( d
  10. */
      U: P2 y7 A" {1 A& Y
  11. $worker->count = 1;: v* G6 r8 `" {! `
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    . `/ m6 Y$ i* e, @$ e$ G3 n6 W
  13. $worker->onWorkerStart = function($worker)
    / V' f( x8 p3 u% M8 o& @! Q: o
  14. {- t3 U5 K  A3 C2 I- T1 z/ _
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    - U7 R. Y" Z- @
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');6 v( v. `/ g% ]& s
  17.     $inner_text_worker->onMessage = function($connection, $buffer)) S5 I" [$ B! ], v" A
  18.     {
    ( a, C$ ^. _4 b" y) Z
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    0 X5 e+ T* H  B3 w! k5 E
  20.         $data = json_decode($buffer, true);/ P! x! i. ^& n3 ~' l
  21.         $uid = $data['uid'];% d( o- S$ c: K; H, `6 {
  22.         // 通过workerman,向uid的页面推送数据5 S1 ~7 F! Q1 m
  23.         $ret = sendMessageByUid($uid, $buffer);) o4 z8 _7 ?0 V0 {% g
  24.         // 返回推送结果
    2 b" u3 M! r  |! `
  25.         $connection->send($ret ? 'ok' : 'fail');1 ~# K- u( U9 }0 x: x
  26.     };
    9 p: V( G% V3 e3 c2 y4 q3 Y
  27.     // ## 执行监听 ##" {/ z' m; Z3 v5 @7 e
  28.     $inner_text_worker->listen();
    2 n0 g# d  p5 K/ T3 e0 f5 L
  29. };3 F9 c( Z! d! a8 B+ o: f
  30. // 新增加一个属性,用来保存uid到connection的映射
    5 P# U# k0 n# u1 s; R; u
  31. $worker->uidConnections = array();" D$ h7 b- w* ]2 M) o
  32. // 当有客户端发来消息时执行的回调函数) x8 f" y2 j# N, S1 ~
  33. $worker->onMessage = function($connection, $data)
    7 E% a4 ]7 L1 f  n4 I! u
  34. {
    6 G7 ^3 Z6 [8 d( v7 C+ @' y  {
  35.     global $worker;9 }# N2 l! }5 {
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    1 ?# o1 e$ |0 N, Z
  37.     if(!isset($connection->uid))6 i; l# @8 b1 o" E
  38.     {, H5 i, E) k6 s/ |+ m
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    + h! M( m' X5 v+ U+ a9 [
  40.        $connection->uid = $data;
    4 e2 ^7 o: O3 X9 y* W0 Y
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,# l* b! Z0 R* V9 h/ K! Q- P
  42.         * 实现针对特定uid推送数据; G  b. e$ S8 i9 e4 d2 P- q& Q
  43.         */- d& r" I4 l/ |# }' Z4 g1 g
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ; \0 _7 p2 r' U1 f
  45.        return;
    5 f+ ?" J% k/ Q5 l0 i9 R  U
  46.     }
    7 N3 v! Y3 E5 ?4 n' ]
  47. };
    / u/ T: ?9 B, r! f+ e6 X  R2 j

  48. 4 T8 g2 L5 p! h) S+ ~
  49. // 当有客户端连接断开时8 I% P; R' f' Q
  50. $worker->onClose = function($connection)0 V( V9 ]. B8 ]) K/ \$ x8 i% U
  51. {
    " a  t1 g0 K8 k9 e& c
  52.     global $worker;! n* |& R- j, @* f
  53.     if(isset($connection->uid))
    6 Q: G9 ~7 _: H% u/ p* V
  54.     {+ u" x: }) q1 l1 ?0 X6 r/ g. P) w3 ?
  55.         // 连接断开时删除映射) y: `7 Q8 O0 |/ g1 r4 S+ f( j
  56.         unset($worker->uidConnections[$connection->uid]);6 z: i% p# l: V. _5 s
  57.     }$ r9 h: U% c+ y5 q
  58. };
    . @0 a1 _% S4 [* N

  59. ' u0 {4 @5 H2 d
  60. // 向所有验证的用户推送数据. R7 r. G7 D+ o8 f( h
  61. function broadcast($message)% T$ r/ n# C6 g! ~0 M2 e/ k
  62. {
    " a- R) N. s. C+ e( [  x8 E
  63.    global $worker;
    & _* k% W) H7 @$ K
  64.    foreach($worker->uidConnections as $connection)
    8 h, a' {8 Z: i2 C
  65.    {2 X: r/ c2 M8 f4 f  r& R
  66.         $connection->send($message);: ^- h7 @  ]) [
  67.    }/ b+ j: ~3 E  Q* q. C6 p
  68. }$ T0 M) o7 u7 I' c
  69. 8 O8 h. b. U! s! e3 _7 r% h
  70. // 针对uid推送数据
    8 A. H' h% ]( F- D; _
  71. function sendMessageByUid($uid, $message)
    : j# |) ~" i& X$ g% ]6 d( J
  72. {4 O: {6 Q+ ~4 g) w; L
  73.     global $worker;- V% N# l  ~# S, `) {
  74.     if(isset($worker->uidConnections[$uid]))
      L9 P: j9 C+ Y: ~
  75.     {# O/ v% t6 ]' h; c, V  W# H6 b
  76.         $connection = $worker->uidConnections[$uid];! K* T4 S0 I! @1 C
  77.         $connection->send($message);. v4 g$ s8 ]0 _9 Q
  78.         return true;3 c+ q/ D, d' W4 ?/ d
  79.     }
    1 O" f/ w2 H+ u8 O' M7 Y
  80.     return false;8 f( p2 N) K  C8 }0 u: ?. n4 y
  81. }+ v1 [: g# V1 M+ k

  82. 0 d% O* t+ t6 @! v
  83. // 运行所有的worker
    / Q! I5 s6 [5 _/ h8 l
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    ! g& Q2 l4 {& z3 ?
  2. ws.onopen = function(){
    $ X5 s  ~7 A7 e+ S1 v
  3.     var uid = 'uid1';- c+ @; ?* Y1 a  {+ Q; P
  4.     ws.send(uid);: y$ M! X# R0 C& V
  5. };+ \+ k9 X) ~3 W4 X" @/ j
  6. ws.onmessage = function(e){) Q& c' e( W1 _9 `' J
  7.     alert(e.data);5 l# Y0 f% L3 x. Q' I5 E/ ?  X
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    : `; w7 @9 P: ~7 M2 |, t
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    2 N$ t4 j( H5 Y3 a/ }# N7 O0 x7 `
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    3 Y* ]; N2 f/ X$ C" `
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');/ o3 J: p& J! _1 m7 L3 a( V3 m4 \. ~
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ s$ P3 @9 x3 @/ w5 U$ w( g  p
  6. fwrite($client, json_encode($data)."\n");# c4 H6 }  u" c0 q) [# j
  7. // 读取推送结果' ^* X( ]' p( \  o. j
  8. echo fread($client, 8192);
复制代码

" p! J7 r7 {: q- v/ A0 d8 M' J% A! _
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 15:55 , Processed in 0.066373 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!