您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14666|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    - P; A  N# M# R  z% I; j
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    / z. B  I' n7 ^% J: ]9 _

  3. ' u5 p4 Z) |. H5 ^* k# L) k: b" a
  4. $worker = new Worker();- h  M2 s% s; ~( `- i
  5. // 4个进程  f# G( v& P  ~4 V8 P9 M
  6. $worker->count = 4;0 w) m# Q5 n  v; }
  7. // 每个进程启动后在当前进程新增一个Worker监听2 I6 p6 s$ o, `' x
  8. $worker->onWorkerStart = function($worker)
    $ l' u% I7 s5 Y. ~; G/ K( E
  9. {- I. @5 `/ X5 E, X+ q- r  k% W7 o
  10.     /**# c; z! i* t- B
  11.      * 4个进程启动的时候都创建2016端口的Worker9 e+ n, J9 z4 K; K% a  f# m
  12.      * 当执行到worker->listen()时会报Address already in use错误
    9 r: O2 X9 o- W( y% \' t
  13.      * 如果worker->count=1则不会报错* u: W  S/ y% I7 F; K( s3 u
  14.      */" Y6 O) X: R3 k) G
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');' m% L# S4 I1 Y
  16.     $inner_worker->onMessage = 'on_message';7 E! G# O9 r0 h) \% B5 `& p
  17.     // 执行监听。这里会报Address already in use错误
    " O: C& N% A, S4 \' j' k
  18.     $inner_worker->listen();
    % I+ k; h3 ^  j
  19. };9 G" b5 x1 d) E* ]; O( q% B; B7 [0 E

  20. 9 ^: h' R; v8 [! E6 k# i& @
  21. $worker->onMessage = 'on_message';
    0 C* M+ m! C0 ^0 F' f; X
  22. ! F5 Q) U+ R& ^& {- }
  23. function on_message($connection, $data)
    . v) H6 M. [7 t3 T
  24. {
    : h# ^5 I8 W- _8 i% a# U, P% c' n
  25.     $connection->send("hello\n");
    ) f' ]) n, N( G+ g1 d; R
  26. }4 L" q" S1 b% ~7 @
  27. , Y/ Q  B7 X, A. U0 B4 A; P* ^% V/ p
  28. // 运行worker: z: O( T. c' c4 Z9 j+ z
  29. Worker::runAll();. e- R, N0 W' B7 |! j1 I
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:5 l% z: k0 J* T
  31. ' _. @+ v( W+ u+ p1 L& m
  32. use Workerman\Worker;1 `7 {3 C4 B- T
  33. require_once './Workerman/Autoloader.php';" B: a+ B; T) P" `  }- ?# e3 T

  34. . _! Y4 K" C" M; r/ j8 H/ g9 i, `
  35. $worker = new Worker('text://0.0.0.0:2015');
    ( |, X$ G' K8 r5 h
  36. // 4个进程
    - V0 |5 _6 H2 c% w, ]% T- i* U
  37. $worker->count = 4;
    " e3 F4 P$ V' |% g- f: D
  38. // 每个进程启动后在当前进程新增一个Worker监听; y' B  o( `4 {/ W3 v. U
  39. $worker->onWorkerStart = function($worker)* T( S7 F  Q! v
  40. {4 ^1 I1 }% r* j/ u' v6 q
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    & A3 K& Z9 r; t! E2 d7 a) j" v  ?7 ^! Q
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    , {0 n: r5 Q7 P, s) q& I
  43.     $inner_worker->reusePort = true;
    - J7 S6 V0 c8 b8 X
  44.     $inner_worker->onMessage = 'on_message';' U# _& `3 ^+ B: j  U% g
  45.     // 执行监听。正常监听不会报错
    0 D$ V3 a6 x5 w8 L+ A. L& R$ L
  46.     $inner_worker->listen();" V# Z! _7 H5 V( A. @8 {) Q9 L
  47. };5 N, u) v' D6 B+ k8 `9 ?7 {% @! P

  48. 9 A0 {: U/ l* v8 [( H
  49. $worker->onMessage = 'on_message';8 j8 R  A3 P8 H
  50. * ?6 r. U/ p3 |1 h/ V* E4 M0 Z
  51. function on_message($connection, $data)
    % C2 _* Z! {# X
  52. {
    . E( ^+ ~2 b( R* H8 S
  53.     $connection->send("hello\n");  J9 z: U, G7 |
  54. }
      Y1 t0 B, d5 W$ p
  55. ! r* H4 T. G4 r8 T8 o
  56. // 运行worker6 |" ~, `8 P2 _: D6 e" Q
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    3 l1 L, k8 q0 L, w
  2. use Workerman\Worker;
    ; n* c6 v8 J, x* M/ A1 [
  3. require_once './Workerman/Autoloader.php';6 N; Z; J7 K6 M
  4. // 初始化一个worker容器,监听1234端口
    : w) Q7 g# e" J* a/ ?
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    8 D9 J4 ~% ^: N% V# D8 j; Y# }

  6. , M, y9 f: T% S. G0 ~5 F% p
  7. /*
    / ~% n& z' _; V! r) L
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    9 M8 |7 T$ x$ W: c+ X
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)( _/ r4 f/ x6 \* Z' B) Z& J4 W
  10. */: \; i* H5 A$ D: `" M: |+ K
  11. $worker->count = 1;8 C& ?. M- C; V: u2 ]7 Q- P; B
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    % L9 P: s. z- n$ A9 m
  13. $worker->onWorkerStart = function($worker)
    , w+ r( x! e" g* \
  14. {# s4 ?1 A% I" H; U9 V
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    ) j! Y( |- A5 z. [# k
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    # Q3 g( O& W5 A* ?
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    + E- i+ v) c8 B) C, l5 v
  18.     {( @5 ~& b5 h9 Z& ?
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    , F3 M. t# |# ]. U
  20.         $data = json_decode($buffer, true);
    2 V. [) c0 m  T3 ~& g
  21.         $uid = $data['uid'];/ d8 C8 f: H7 U8 L/ J7 c. a
  22.         // 通过workerman,向uid的页面推送数据( a2 F* T3 g* c7 u: }
  23.         $ret = sendMessageByUid($uid, $buffer);. q/ r5 I, H8 J- p7 t
  24.         // 返回推送结果$ U/ b+ O- R6 U! x- @( o& H# w
  25.         $connection->send($ret ? 'ok' : 'fail');
    $ y! \$ w+ x  j$ X" A. s- Y0 d7 I2 |
  26.     };! ?# e8 o7 |( Z6 D+ L4 w
  27.     // ## 执行监听 ##
    ; `. a" l6 f  d: Q' {
  28.     $inner_text_worker->listen();& O! g& K. ~! g: J6 J
  29. };
    7 M0 S( J" m( a
  30. // 新增加一个属性,用来保存uid到connection的映射
    $ V- u; G* l( U& ^( h5 [
  31. $worker->uidConnections = array();" j6 M& |" W( R
  32. // 当有客户端发来消息时执行的回调函数8 w/ m5 l, p, r: ]+ |
  33. $worker->onMessage = function($connection, $data)
    7 a6 j" W! q$ V' B6 i* x# g/ l* u( T
  34. {$ \1 g' B* M" G* `
  35.     global $worker;
    * y5 m; ?* B! _% C+ b( O2 E( ?
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    5 A4 o4 f+ J8 H9 |3 v" @9 d! j
  37.     if(!isset($connection->uid))
    " S' `  n" n  c2 n" _3 E. s
  38.     {- t$ u. F' B  V: k& S. A& V( ^
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)  ~& T1 _. ?8 M- W7 t
  40.        $connection->uid = $data;
    2 w+ B5 @/ ]) o) ^; L* w
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    ) c0 E$ u% N- S* ]% |5 k
  42.         * 实现针对特定uid推送数据9 D. }( R" c* r
  43.         */8 E$ ~3 _7 t" X" F, X% O- R' N4 O
  44.        $worker->uidConnections[$connection->uid] = $connection;
    7 Z8 }4 o" W9 _% u
  45.        return;
    4 y& B1 M6 Q% l: e( ?
  46.     }
    & u$ W* O' B8 s
  47. };
    8 [" }, o* P* K* |

  48. : r7 z5 g4 c( x7 I: `4 Z
  49. // 当有客户端连接断开时/ b9 H3 G- G- G; E3 L+ C$ j- h, ]
  50. $worker->onClose = function($connection)7 b, C' \$ F- s1 H7 o
  51. {4 |+ E" c, x: q+ L$ D5 h& X! J, w
  52.     global $worker;
    7 S7 P; S) `" Z  [
  53.     if(isset($connection->uid))# {  Q  Y! G* W$ v  G( u
  54.     {
    / ~9 w( e+ W0 \& m3 H. k
  55.         // 连接断开时删除映射  S0 X. V1 g- j3 r, L
  56.         unset($worker->uidConnections[$connection->uid]);% ~3 Y4 e3 B0 ]& t1 m
  57.     }
    $ O0 M; x+ A( @) b' ~( }
  58. };. {4 b: ]6 x; ?: t
  59. : H& Y3 L. m$ \3 f8 k& u6 m# A
  60. // 向所有验证的用户推送数据! P6 ]' Y& U$ \/ D
  61. function broadcast($message)
    9 O. L+ O6 {- {, |" U
  62. {0 W. o- w$ n9 [( S
  63.    global $worker;+ V8 W  d) o# k" i+ G' W' Q
  64.    foreach($worker->uidConnections as $connection)! Y9 w2 o5 ~+ r, ?6 Z2 E2 L( [
  65.    {# e  z- y2 z8 E
  66.         $connection->send($message);
    9 N8 x) E/ W. [8 S; C
  67.    }
    6 ]3 v- f- @8 n; R8 |0 |. r4 c% C: [& I
  68. }
    % E; q$ B$ F$ W5 u
  69. 2 K) [$ Q$ U: ?
  70. // 针对uid推送数据  k- l: v* b5 m9 u8 r
  71. function sendMessageByUid($uid, $message)
    , k! t' z1 [5 E, F7 P2 @' O
  72. {
    " s) p" x/ i4 q8 I) g
  73.     global $worker;4 \5 k+ }1 H! j+ K9 @
  74.     if(isset($worker->uidConnections[$uid]))
    % B+ e/ v1 Z2 X8 X$ z& M# ?0 h
  75.     {
    7 s" t4 w3 y) k: g
  76.         $connection = $worker->uidConnections[$uid];5 X3 S4 v5 U6 d. l7 _" U
  77.         $connection->send($message);
    ) Q) \! Q, H$ |0 s, O
  78.         return true;
    $ F( v  E8 w( D2 y/ o
  79.     }6 s' G, E. E5 M: ]2 }( [
  80.     return false;7 {5 S3 ~# |: I) m! {
  81. }
    $ W3 b% w% p3 H9 T( V; o
  82. : ~; |8 N/ |+ _- D- I  t" w
  83. // 运行所有的worker
    ! o' U* c( p! p9 T/ Q9 q$ C
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');$ p- e2 h& P6 W& @" `0 l+ j2 c
  2. ws.onopen = function(){
    6 O6 a! ]$ Z1 f( Z) W4 i* F
  3.     var uid = 'uid1';0 E0 C) k5 {$ g
  4.     ws.send(uid);
    5 W" B+ S6 n5 q7 o% D
  5. };, T+ K8 r7 _. E( B7 D0 Q
  6. ws.onmessage = function(e){3 M4 I# d  u& T
  7.     alert(e.data);. L% n& U, F# M5 F% k
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    0 q' ^% I6 ]: j
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);+ a: V9 O$ Y; m3 L1 z# p. ]
  3. // 推送的数据,包含uid字段,表示是给这个uid推送5 |* O1 l) N( G8 I( z; R
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    $ R0 ~# W8 U( J
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符7 \6 ]+ r1 n: A6 \7 C
  6. fwrite($client, json_encode($data)."\n");/ I  C4 Q7 {# o
  7. // 读取推送结果
    + O$ X. T; v; r" q  q
  8. echo fread($client, 8192);
复制代码
9 H1 B, R' W9 n4 q# I) F: H
; \; q$ x8 X4 }% ?7 ?
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-1-30 16:02 , Processed in 0.052256 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!