您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10518|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    # G2 s2 f" ^: H, L
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    1 E$ v& g7 W5 ]' k8 ]& R
  3. 5 f1 g$ w6 ~9 X# L- X6 G* t
  4. $worker = new Worker();: q3 [) C$ R% S! _; x
  5. // 4个进程& t3 s( s! B4 X# Q
  6. $worker->count = 4;
    6 n- |: q! M' g4 K7 Z
  7. // 每个进程启动后在当前进程新增一个Worker监听
    0 \* Q* j+ q/ E
  8. $worker->onWorkerStart = function($worker)
    , U. Y( A- u) G; t5 i
  9. {% G; W3 `3 M( t& V0 S
  10.     /**
    - d2 Y% V, z& R; S
  11.      * 4个进程启动的时候都创建2016端口的Worker! H) v& Y7 P( V7 g6 i
  12.      * 当执行到worker->listen()时会报Address already in use错误
    7 q! ~, N; G/ M/ N1 Y3 u
  13.      * 如果worker->count=1则不会报错! h( a% Y6 c$ f# C1 B: `
  14.      */" N% ]4 \  Y# G5 y) W( h9 P7 Y8 k
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');2 S- E4 Z6 `% B! |( R0 w% X' ~
  16.     $inner_worker->onMessage = 'on_message';/ K, r  z; }9 a5 |) d- e# O
  17.     // 执行监听。这里会报Address already in use错误; q7 b5 h( \6 t4 ^0 D
  18.     $inner_worker->listen();
    $ r* [( [8 E) ~3 p- m/ F* r, \' v6 a
  19. };
    4 W& P4 d/ s; n2 ]) J
  20. 4 w# x. {  t+ ?% q% G: E) k
  21. $worker->onMessage = 'on_message';, b* Y5 g) b, ^) X

  22. 6 h% H+ o7 R, G& C) Z% y3 r
  23. function on_message($connection, $data)
    : F9 N/ Q  l" Y& T# P& D( B* A# G0 s
  24. {
    6 D- o* }/ D* P- c2 X
  25.     $connection->send("hello\n");
    9 a+ v1 f! [- u4 A; o
  26. }
    6 E' v. k5 _% ~

  27. : ?5 w7 C, l+ Y0 f4 W
  28. // 运行worker8 U8 X% j( v0 u- Y
  29. Worker::runAll();
    ( M1 @6 K' z- A( `* u+ S; y$ b
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    / k! S* ^7 M, b. P7 q9 ?

  31. % @% w  ~8 b( `8 K5 \% |, b
  32. use Workerman\Worker;
    ) z0 }5 _, n9 d9 E
  33. require_once './Workerman/Autoloader.php';. ]/ m  a$ V! k" H7 j
  34. + m, X& g4 w/ B0 O7 y' b. n
  35. $worker = new Worker('text://0.0.0.0:2015');
    , n% v0 G  r# m2 M
  36. // 4个进程9 d6 Z4 t9 b; ?+ C
  37. $worker->count = 4;, _. j, C# p2 f# T0 }$ p0 Y0 ^
  38. // 每个进程启动后在当前进程新增一个Worker监听
    - z+ n5 [* l) U$ Z( G. {( G
  39. $worker->onWorkerStart = function($worker)
    + ]  f# B! h. Z3 P
  40. {
    4 Z2 o( j* J/ Q3 s
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    " r( K! w& Q7 R8 e
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    $ L/ B! A) m. U4 X7 a
  43.     $inner_worker->reusePort = true;. {7 x3 Z! m0 p5 z. W; m9 B
  44.     $inner_worker->onMessage = 'on_message';
    5 P. e& P2 s9 Q0 K" v; f, a; O
  45.     // 执行监听。正常监听不会报错! l! |2 F. V/ A! U3 q0 G
  46.     $inner_worker->listen();7 X6 l: ~' ?- p: h- F9 U3 v
  47. };
    ( K  ~- n3 v. D0 ~

  48. - u; o* W! ]& n8 w" i6 k% P
  49. $worker->onMessage = 'on_message';
    + }: p; n3 p# A- U

  50. 8 u1 [4 G& k- s2 k7 z
  51. function on_message($connection, $data)4 f8 k* t- }1 _. }, H
  52. {2 n: w# e# u) m& F
  53.     $connection->send("hello\n");
    5 x' L& o0 }" K
  54. }
    5 h" M/ c5 l2 s+ d# R' @8 \

  55. + x7 F2 r6 @6 q6 `0 ~
  56. // 运行worker
    8 m1 P5 ]& [  H: W; g* x% ]
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php' [7 E3 W/ V3 X& p
  2. use Workerman\Worker;
    , _) l+ \# ^' K, I2 A7 ~7 S
  3. require_once './Workerman/Autoloader.php';
    % d. u% _5 @' y
  4. // 初始化一个worker容器,监听1234端口
    7 C- n1 u9 w  u) U9 W5 }0 |% m
  5. $worker = new Worker('websocket://0.0.0.0:1234');
      Z* G+ `4 H5 y, M; b
  6. 2 a8 I) M+ d, ?3 G" l
  7. /*
    ( F% c% `' T5 T' Z8 D" J
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误0 m' _& n7 j1 x1 K
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    % K7 @  a. k3 J0 I
  10. */
    & V2 O, s1 l5 j1 P+ H
  11. $worker->count = 1;5 A- o8 t$ K: {4 w
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口# u1 s# {" f; m8 R* n3 {0 @/ t
  13. $worker->onWorkerStart = function($worker)! h1 N* x2 ?7 b3 L7 D" c3 J7 B
  14. {
    % }9 c* q, s0 W, b
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    7 r" b- b2 y- X  J5 @
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    $ T. r2 m: K: d( o: [* j" E
  17.     $inner_text_worker->onMessage = function($connection, $buffer)( ?; _5 ~) J; N# T* e
  18.     {
    / G' l2 @& @& `+ C" m
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    % t, ~, F' e6 ?; S
  20.         $data = json_decode($buffer, true);
    " i, U, p2 ^& V/ S; H8 |& r8 z8 t
  21.         $uid = $data['uid'];
    ; h% L7 u% Z! D
  22.         // 通过workerman,向uid的页面推送数据  U! S) |! L$ b4 x; f+ y) K- {
  23.         $ret = sendMessageByUid($uid, $buffer);
    ) J1 q- H% a# ]0 T! b
  24.         // 返回推送结果; |% [9 f0 U( ^6 u; n
  25.         $connection->send($ret ? 'ok' : 'fail');
    1 f5 }! Q2 L  A0 O
  26.     };
    6 y8 \; ^; t3 H9 k" Y+ f* x, r3 e& P
  27.     // ## 执行监听 ##
    2 x8 `  q- B! G6 c3 U+ i
  28.     $inner_text_worker->listen();
    + ]# [. ?# [! w; B8 n: y9 `
  29. };
    2 i4 v7 q; [, M6 w4 g" G
  30. // 新增加一个属性,用来保存uid到connection的映射4 Z: n5 ?- u7 x: [+ `* S, d
  31. $worker->uidConnections = array();
      b6 l  r0 h$ H, n: A) g7 }8 G, g7 r
  32. // 当有客户端发来消息时执行的回调函数
    : Q% ?6 J& _# z1 H  ?0 r6 [! `4 [
  33. $worker->onMessage = function($connection, $data)
    # N2 M6 u& L9 A' V" d
  34. {- ?! i1 f- v1 D7 b% z* B8 o
  35.     global $worker;
    ' [# ^* e1 J6 k( U7 d0 T: c
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    # h, i( R& L6 F. {5 F4 v2 V
  37.     if(!isset($connection->uid))
    : ], v) Z; n: M9 t# L* }
  38.     {
    ' I! h( a3 c" i9 t, x5 O5 t
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)1 t0 U6 A% F2 k
  40.        $connection->uid = $data;
    - D- z$ G' T4 [* I# u/ \. u
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,  |0 S7 p, Y; |) ?7 c# s3 p
  42.         * 实现针对特定uid推送数据
    8 B7 U# i. ?$ c7 b; C- {6 l  {; W) x
  43.         */) a* u( d+ ~& ^% Q/ o
  44.        $worker->uidConnections[$connection->uid] = $connection;
    , H  U5 y6 Y8 H; @$ k' c2 z/ b2 f
  45.        return;4 Q2 F* Q1 \7 }
  46.     }- e" ~9 U6 F& _; G
  47. };# O  s  Q! Z, \/ K5 N0 m5 a) K5 U; q
  48. * P' o7 d; N" ~! c3 f. f/ X
  49. // 当有客户端连接断开时; D0 V: N5 s; B; j- i( u
  50. $worker->onClose = function($connection)) L' c$ i4 q6 w- k! t/ W% h
  51. {
    0 l9 p0 A( T, r+ w5 l
  52.     global $worker;3 o) Q) K% v5 S2 S/ v" w8 Z
  53.     if(isset($connection->uid))0 q2 V5 t; p: H
  54.     {: `7 }0 j( {4 F" m8 J, C
  55.         // 连接断开时删除映射
    * J* {( z( q" m7 o! ?; r
  56.         unset($worker->uidConnections[$connection->uid]);
    8 Y) k: G) M% C0 ?! K4 N5 e
  57.     }
    4 y" v& y( P1 z7 G$ ?) q5 v
  58. };
    0 w1 m- E4 Q3 Z* j- Y
  59. & L' V& i# L; z* c6 \
  60. // 向所有验证的用户推送数据
    * _( q  h: A* P  _/ y2 [
  61. function broadcast($message)
    0 M/ E7 v% I' P# |/ d) j$ {1 W( k
  62. {1 }2 A5 ^* w2 Z: X, [5 j& ?9 R
  63.    global $worker;8 I% F1 B  l" N5 O
  64.    foreach($worker->uidConnections as $connection)
    $ B1 }6 g2 b8 P; i) Y! m# p* R
  65.    {6 X. e& R( K7 x) O# d: U. C; H
  66.         $connection->send($message);0 Q/ E$ M! p  o( P- S* X
  67.    }7 v6 m3 @* V, V/ A8 e
  68. }- u% [8 D  [1 N# i
  69. ' ~+ j- K( M% K0 h8 ?' J# v4 m7 p
  70. // 针对uid推送数据
    4 \' X# g8 m# X* c- g, N8 ~
  71. function sendMessageByUid($uid, $message)
    % F+ R; t0 K8 y
  72. {- u5 ~/ k% X9 q/ i
  73.     global $worker;* |6 G% x4 ~2 h( H( c$ Y( x
  74.     if(isset($worker->uidConnections[$uid]))+ l: o! N/ f3 N6 Y1 y
  75.     {
    , B, n4 k. _3 n8 {
  76.         $connection = $worker->uidConnections[$uid];4 r5 a' u. E" L6 J8 \- }+ A" c
  77.         $connection->send($message);2 R2 A3 X% e" V7 ?) l
  78.         return true;+ s2 e% @" }$ r9 G  i
  79.     }+ e# s% ~  L, b+ n3 W( Q
  80.     return false;
    . L# ]( I9 j+ R6 T& v$ j
  81. }) `/ ]3 ~5 V# l: m& t4 q0 U
  82. 3 J. N% k1 i* |6 r9 ]% s. T- O# ~9 K
  83. // 运行所有的worker9 A6 P4 y( u+ D/ ?8 Z: H' k
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    9 P1 v0 V, [4 b. O( T  X, u6 d
  2. ws.onopen = function(){
    ; f5 s/ e- z0 f! q) j
  3.     var uid = 'uid1';9 I+ y8 M6 y3 _+ l% [* N% f9 k
  4.     ws.send(uid);9 M* s8 T  d: @$ _! g
  5. };/ I( c. ^2 R/ m3 r* e. m1 P
  6. ws.onmessage = function(e){
    / c! ~( N* N$ A+ M. T* A$ V% ]1 l
  7.     alert(e.data);
    # Z- L' _8 H+ h
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口; {  {1 C0 a3 z* u+ m3 s3 {
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);4 `) _  K5 \# l- N
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    * P& p. x$ T& b) d7 p, ~
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');" H4 F" w, U# u
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    ; l8 f- ]( z' k/ I" }
  6. fwrite($client, json_encode($data)."\n");6 K; s/ o* z( v  j) Q" t, J7 `
  7. // 读取推送结果
    ! v& l7 w- D. o8 D2 `1 c' }# I3 u
  8. echo fread($client, 8192);
复制代码
! H( K6 G0 Q7 c% e  U2 M

  X/ E' P6 J' w: n
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-19 00:26 , Processed in 0.146869 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!