您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10517|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;/ e* P  t% r; k: k1 a! M0 x$ B
  2. require_once __DIR__ . '/Workerman/Autoloader.php';; N5 M  {  }7 Y# i3 N( }

  3. 0 ]( z+ d$ d/ D# m0 @
  4. $worker = new Worker();
    , N+ L9 W% r8 j
  5. // 4个进程8 X8 C) L$ r( |" ^4 A8 u* F
  6. $worker->count = 4;  n* V# f. v2 m$ c& q
  7. // 每个进程启动后在当前进程新增一个Worker监听& E% G9 @) ]6 R* B* V& E" K
  8. $worker->onWorkerStart = function($worker)
    7 R" D6 N' |% `7 E' G+ g
  9. {  `; G5 U- {- v5 D& A2 ?5 O9 {' _
  10.     /**
    & r/ h5 b' w; c0 `/ k0 u0 c- d3 A
  11.      * 4个进程启动的时候都创建2016端口的Worker
    - ?6 d8 ]6 ~$ T! r" h
  12.      * 当执行到worker->listen()时会报Address already in use错误$ u5 I% Q- t2 M9 H# _6 u
  13.      * 如果worker->count=1则不会报错6 v& F! Y4 a: g6 R' `
  14.      */6 s. P6 y0 n: q, n3 {8 A
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');( Q; t9 v8 b6 D" }4 \9 `' H
  16.     $inner_worker->onMessage = 'on_message';
    1 P) F8 k5 p8 X/ s
  17.     // 执行监听。这里会报Address already in use错误
    " L( K/ q# e8 `& Q; b
  18.     $inner_worker->listen();) w* I  Y- O& G
  19. };, L9 ?/ h# \) X- e* Y3 M  A& Y
  20. $ J* z' S' H5 l1 n( L* s$ y7 ~
  21. $worker->onMessage = 'on_message';
    , X: m! |7 f( f( T! z* \
  22. , V: ~2 a# q& h$ q6 g/ \# h
  23. function on_message($connection, $data)
    4 Z4 @" J6 g' L% C5 e/ n" T
  24. {( C7 e# s- w" a
  25.     $connection->send("hello\n");$ d9 m% R- K5 i4 p
  26. }
    6 O( U: i) ]' B- ~- ]
  27. 5 Y5 m) F8 Z- c+ K8 D
  28. // 运行worker) R' n" W% U8 i  t7 T( ]
  29. Worker::runAll();
    : `7 ?/ O! A: z& H
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    ; z) s/ e. ?( k: p* C

  31. , B" J4 K- {* q! f% r
  32. use Workerman\Worker;: m; o0 Q" p9 Z, c. S
  33. require_once './Workerman/Autoloader.php';
    : ], `' `! t# e$ I3 \- {6 R" {  L7 g

  34. ( \& f' H0 w2 M
  35. $worker = new Worker('text://0.0.0.0:2015');- f& u7 V. L- ]( R. H9 \
  36. // 4个进程# G3 H* N8 S" X$ ]# O  N! w
  37. $worker->count = 4;
    $ X4 P2 v% }* V9 R: i8 ]2 r
  38. // 每个进程启动后在当前进程新增一个Worker监听+ l# f( [% w2 c- @% X
  39. $worker->onWorkerStart = function($worker)" @1 B) s0 O& |! ~
  40. {$ i% m) z7 t" i# E( K
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    1 U7 _9 N2 {, u* T$ X, q, K( z) c
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    , L0 F% [! b( N! e# ]
  43.     $inner_worker->reusePort = true;
    / C2 B& D/ ~& M
  44.     $inner_worker->onMessage = 'on_message';* ]; O, \/ w* N% p1 N/ k
  45.     // 执行监听。正常监听不会报错
    9 S. U! S: E' }
  46.     $inner_worker->listen();
    # ?$ B2 O! d; D  `0 `! E- a
  47. };: S  J' }. y3 `: O" Q. m7 [
  48. * e% G# S8 T' q' {5 d
  49. $worker->onMessage = 'on_message';' q9 _; C+ }# M' j! ]
  50. 4 g2 C  g8 I$ t' T, x" P! P: g
  51. function on_message($connection, $data)
    2 q/ N* \* x& b+ k
  52. {
    4 D; m) D% i. L+ e
  53.     $connection->send("hello\n");
    7 S, {4 w# y& C; o
  54. }
    1 q- y6 U9 U% K0 e- [

  55. * ~% p9 |6 c' U) r3 e
  56. // 运行worker! j4 i1 J$ {4 M
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php' Q! ?0 K% Y& ?, k
  2. use Workerman\Worker;
    & H$ l* u* d0 S% L2 d3 v: g7 U
  3. require_once './Workerman/Autoloader.php';" N* q5 h  n. k9 S
  4. // 初始化一个worker容器,监听1234端口. g7 o9 R. s; `. S1 ?; H7 J0 E6 i, Y
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    ; C" }8 C% c. v2 b

  6. ' U$ `" V2 _6 n# I6 D; \
  7. /** ]0 s8 z9 h% ?9 i
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误4 T" i. A' C1 T8 P# n3 \6 z& w" e3 E+ J
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)/ o9 J2 e. ~+ e, G# X$ O
  10. */5 A' v% G/ i9 I) C2 ?5 k
  11. $worker->count = 1;/ L2 c% F4 B2 |" P8 c3 |& r" [/ Z
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口+ _" s( t0 j9 s! U6 d, }5 ?- `
  13. $worker->onWorkerStart = function($worker)
    # q- C7 Z# @4 U% p+ z
  14. {* D( ?6 D% o: F7 B5 U; o3 R, W
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符" y" `: k. d  x8 U
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');9 ~# i. h5 c( N* e+ J4 J
  17.     $inner_text_worker->onMessage = function($connection, $buffer)7 h: o1 S, O3 O8 {" i( W
  18.     {  {" R" Z- G9 x- q" i4 A
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据5 v3 F0 G" x- Y& B1 L. @/ T) K5 P
  20.         $data = json_decode($buffer, true);
    $ B+ U: {$ `9 H5 e- l
  21.         $uid = $data['uid'];( B. K( Q6 C& d
  22.         // 通过workerman,向uid的页面推送数据
    5 K6 ?4 F, z/ C( N% _* h
  23.         $ret = sendMessageByUid($uid, $buffer);
    7 _* H/ X4 l: J) ^/ y
  24.         // 返回推送结果
    ) N( F& @$ U: X8 k" l
  25.         $connection->send($ret ? 'ok' : 'fail');  `5 _+ E4 x' U8 s: V
  26.     };
    ' P, h2 [6 a) K" d
  27.     // ## 执行监听 ##
    9 N; j* C* {! {' J
  28.     $inner_text_worker->listen();
    4 `$ X) R; A: I5 Q; a, l
  29. };# I4 i) {, o! i8 [
  30. // 新增加一个属性,用来保存uid到connection的映射) C$ v* @8 G; R8 b8 a2 e' {
  31. $worker->uidConnections = array();
    ( |% R; Q( c4 m( R, \7 q3 ?; F6 z0 a
  32. // 当有客户端发来消息时执行的回调函数- u$ r5 d# Z0 v4 r
  33. $worker->onMessage = function($connection, $data)
    . t" L7 s6 x4 s! a4 e( h5 L
  34. {
    1 M% ~8 B/ X9 a! Q) ~6 Y8 H4 P
  35.     global $worker;4 N0 S) R8 ]  t
  36.     // 判断当前客户端是否已经验证,既是否设置了uid2 R! ^# q, ?: K9 r
  37.     if(!isset($connection->uid))
    0 U; j. B% Y7 x* Y0 F; X1 ^* T
  38.     {
    , Q; C, x. n9 ]# U) H
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)  T: q: V8 k# `. }
  40.        $connection->uid = $data;2 _* m) G$ M- o% w" H% I$ W
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    9 c* Q: R0 [$ D$ {$ w
  42.         * 实现针对特定uid推送数据
    $ c/ N! j8 l+ J7 E' b
  43.         */
    / k1 b- _! G/ u( U; V/ e  k2 f
  44.        $worker->uidConnections[$connection->uid] = $connection;" u2 d+ V- R1 P' J5 R, d7 c$ G
  45.        return;
    ! Y% M$ w# B1 T+ p
  46.     }" U& |( _  X/ X$ W( b
  47. };. G/ |1 X8 O  v- z

  48. 1 m* C7 K0 |5 t! [# a" E4 I, N3 }5 e
  49. // 当有客户端连接断开时
    ( l2 b& O& `' c9 L$ N3 \/ H
  50. $worker->onClose = function($connection)
    / ^) [( J) d" K: e( X
  51. {' s: [3 S2 `& z) Z$ O1 F# g
  52.     global $worker;
    $ p4 w6 p/ g, l& X  _/ d
  53.     if(isset($connection->uid))8 q/ w0 g( F- g# Z* ?0 Z0 `. Z
  54.     {6 g* D; `/ T4 g* k( B' E
  55.         // 连接断开时删除映射4 ]/ x' y- N, z0 r) k4 T
  56.         unset($worker->uidConnections[$connection->uid]);
    , P& [. Z) a# }$ j1 W- z/ L
  57.     }
    5 d/ ^4 x$ a$ b% i, Q
  58. };
    ! O1 H( l0 f; a# W

  59.   B4 D  F! l& S. x* n
  60. // 向所有验证的用户推送数据5 M  O( {8 G3 w3 j9 f, P  q
  61. function broadcast($message)* i& Z; R( h" C+ a5 f5 [2 d
  62. {# e# a+ \4 ?% i& X* T3 o! X
  63.    global $worker;* }# b+ r: u, W  d2 o! G& U8 k
  64.    foreach($worker->uidConnections as $connection)& B- o3 F, @$ k) z' Y
  65.    {
    3 j6 J$ _' q# a! c  e! v& j! g+ u; s
  66.         $connection->send($message);
    3 a. V0 t% F0 J$ C) ?: Q1 ]! L
  67.    }9 Z/ ?4 ^) O& t1 M2 j
  68. }! ^$ Z% ]' b" H; q
  69. $ P0 {+ Y$ W+ z4 s
  70. // 针对uid推送数据! ^( r& q- K/ M6 _1 k
  71. function sendMessageByUid($uid, $message)
    1 `$ X1 ?8 \9 L8 p9 H& {5 p( D
  72. {
    ( F2 N1 A) ^. M  n+ ]) m: N6 [: o
  73.     global $worker;
    6 r: y5 {5 C9 a9 j; F5 n
  74.     if(isset($worker->uidConnections[$uid]))- B! I9 r/ M8 {' Y4 i. c9 j% U
  75.     {
    ; h8 N- |1 F3 ^7 z+ `" Y5 j: ]! s
  76.         $connection = $worker->uidConnections[$uid];
    $ O, [9 h* b! S
  77.         $connection->send($message);) v, ]) r8 }% Q& P; \
  78.         return true;# G- p: j. |, T" J& R, e
  79.     }4 n1 h# N" T. v# }5 c4 _1 P3 e
  80.     return false;
    ( L7 K& w: X/ U2 Y* ]& u  ]1 W7 R
  81. }
    + A0 J  a( l7 E5 z/ [

  82. : y" ]+ b8 |9 a% a0 e  ]
  83. // 运行所有的worker
    ! x0 y# W8 R% i( i+ X' E
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    5 x2 @3 @  w# c* ^8 A) s
  2. ws.onopen = function(){
    & H5 y/ h; D+ |* P6 d+ L
  3.     var uid = 'uid1';
    - v+ L7 r& j2 e3 B
  4.     ws.send(uid);* f8 G3 q6 L0 Q6 c. D9 q
  5. };
    , L, p1 g. h2 Z% P
  6. ws.onmessage = function(e){0 w% S+ V9 [. ]7 S5 O
  7.     alert(e.data);
    . `, A/ p/ Y; s, X' G
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    : ~# D% R& z# K: h: t+ R  x# N. ~
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    ) g. ]4 j/ ~- m- L7 a+ I
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    ; W' u; f5 p, j+ p$ J  J
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    3 u( B1 A  I+ c2 Q8 C& L! T
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符+ L7 r3 z; A, s) ]4 P) V
  6. fwrite($client, json_encode($data)."\n");
    ' L' h# i+ w9 }3 @( k* ^1 ], u
  7. // 读取推送结果
    + W2 T7 P) M+ x2 i1 z' W4 L
  8. echo fread($client, 8192);
复制代码
0 e7 D( [0 j; J
5 p9 T; v9 j! J0 J) k. r
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-18 23:58 , Processed in 0.105373 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!