您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15071|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    0 s5 g% F8 ^, w; z8 w
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    ' K: f9 t: d7 |9 B" J" d) U
  3. / `8 Y( @& g4 A% {3 k
  4. $worker = new Worker();
    & n; v1 t( }/ f5 h$ F
  5. // 4个进程3 k5 T5 m$ ^" I% R8 Z
  6. $worker->count = 4;; y! S  J8 l: J1 K# \4 K0 B
  7. // 每个进程启动后在当前进程新增一个Worker监听/ O$ y: a) w. G4 F1 _! D
  8. $worker->onWorkerStart = function($worker)6 }# U, g( V2 M
  9. {
    3 s/ F& ?% p& Z7 \/ B: C
  10.     /**
    1 G! J! i0 F# g# ]
  11.      * 4个进程启动的时候都创建2016端口的Worker
    5 m# }& n. n# }/ W7 e
  12.      * 当执行到worker->listen()时会报Address already in use错误
    5 }3 Z  r: u" Z; f
  13.      * 如果worker->count=1则不会报错5 M& \) S  G. ?! D9 P  C- B7 ?4 V
  14.      */
      ?7 q$ t/ r: t1 A( A3 O4 F/ P
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    " {2 `; D2 Z+ k% N
  16.     $inner_worker->onMessage = 'on_message';, n! d) _- b  V& b6 {- u- ^# h6 _% C
  17.     // 执行监听。这里会报Address already in use错误
    - \0 v8 l5 r6 K! j* ]% p
  18.     $inner_worker->listen();
    & K  R- j) O2 ?2 u
  19. };3 t6 i; v" x! |4 a- {- z4 m

  20. ; H" Q" _( i% y7 l3 b
  21. $worker->onMessage = 'on_message';% w+ Y2 @) C$ K) U& N

  22. + V" T. |5 x5 N  @4 v" x& }
  23. function on_message($connection, $data)$ e+ K* k, p0 X5 a: }
  24. {6 U" e# X- d' K! \& @" E( Z
  25.     $connection->send("hello\n");
    0 E3 S# h! K; P. Z2 j4 V9 M8 k
  26. }4 }0 a2 Q9 R/ A
  27. 4 N8 O3 K  K, t9 p3 ~9 C
  28. // 运行worker$ `0 B; u- ?+ ?2 g4 f: {9 Y9 B
  29. Worker::runAll();
      n2 R1 Y# w/ v" i+ Z+ C
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    ! \+ ?# e2 @) x& q8 L  C
  31. : ^- }6 m( s" f, ^8 H' q
  32. use Workerman\Worker;: u* n7 g- S7 L
  33. require_once './Workerman/Autoloader.php';
      S" N% t6 W8 U, o  W( L8 s* ]% K* k

  34. : ?  g! m. _, o; z9 s! M
  35. $worker = new Worker('text://0.0.0.0:2015');
    ! e6 B7 V1 E, `" a$ }/ v
  36. // 4个进程; y  i/ t4 K& K: j1 [! A- S
  37. $worker->count = 4;: a) |- t3 Y9 ?3 J
  38. // 每个进程启动后在当前进程新增一个Worker监听
    % u2 X7 b2 D; w4 d1 n( {- c
  39. $worker->onWorkerStart = function($worker)  I/ D0 H' @3 K) k* j: @! g& c
  40. {
    . e  H9 Z( a. z7 P$ [  C8 x
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    , u- a' m2 V- f' b" l6 y
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)& h) |1 s# Q( l/ D, L5 K
  43.     $inner_worker->reusePort = true;
    9 d1 v: s( ^4 c9 ]4 w- q4 n
  44.     $inner_worker->onMessage = 'on_message';- [0 I; C4 q/ @& m
  45.     // 执行监听。正常监听不会报错1 \# h8 I+ h; j  X2 o
  46.     $inner_worker->listen();8 O) n6 B1 M4 ?" U' t
  47. };  i5 E6 W! G* X) D

  48. 8 a5 N2 S) X5 J# z) G, s0 T5 `
  49. $worker->onMessage = 'on_message';8 }6 C, Y4 o9 i8 x
  50. 0 E; Q' ]$ f) j  [
  51. function on_message($connection, $data)* ~( @  F5 w3 ~' j/ V
  52. {" b$ E& K) v9 E9 e
  53.     $connection->send("hello\n");1 e7 {2 G. n( w; p* ^
  54. }5 Y. \# K9 I- k8 R! ~! A
  55. & z1 D7 |" z, V( r3 L3 {
  56. // 运行worker" N1 b3 Z$ q6 I5 B6 G
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php: p  Y% U1 z7 k( t$ Z+ o6 i" A
  2. use Workerman\Worker;5 w) v( M; E: r. w% \: P7 x0 A. f5 \
  3. require_once './Workerman/Autoloader.php';0 g8 ]" h# I2 d, x" Z7 a- y
  4. // 初始化一个worker容器,监听1234端口
    9 \  P3 W' E/ f7 _; ~" D% m
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    8 l9 W0 G7 }0 w# O

  6. 7 f" E; q: o& b" b/ p
  7. /*8 F1 w# n; u9 S# O3 D  D* d
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    & q7 Y2 M9 C& \; l
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    / D# A& \) z6 G7 g1 i# W* k
  10. */
    ( q2 S( h: F' g7 `+ G, M
  11. $worker->count = 1;8 X) l8 L" A* r! E
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
    1 ]5 K9 h) n% Z  c9 [+ I( v
  13. $worker->onWorkerStart = function($worker)
    8 \9 z' x- p2 s, f
  14. {
    + o" k( P; L; E
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 J$ J. b, A! ^3 x+ J0 u
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');  U! g% x5 e- `$ L: u
  17.     $inner_text_worker->onMessage = function($connection, $buffer)9 c8 l  N) G$ W' r6 Y
  18.     {7 U9 ?- J3 F4 b" D0 |6 N. n) b, ?
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    $ c; E# o3 d4 [' h% d
  20.         $data = json_decode($buffer, true);/ @9 d  p0 \! `" K+ |+ a# n
  21.         $uid = $data['uid'];
    0 {% R  q! ]& S
  22.         // 通过workerman,向uid的页面推送数据) ^" @' k1 u+ I1 o
  23.         $ret = sendMessageByUid($uid, $buffer);
    " a7 e9 c7 c. N0 W5 Q$ I% S
  24.         // 返回推送结果7 V( n1 j$ ?+ e. A" O
  25.         $connection->send($ret ? 'ok' : 'fail');
    - [3 B( a: P) S( {+ j9 G$ @$ |" V
  26.     };# W$ C. E" y5 o# q: A
  27.     // ## 执行监听 ##* \+ r8 L4 u6 H8 o
  28.     $inner_text_worker->listen();+ s! N( ?, R* _' n4 O, u
  29. };
    ( X4 t5 ^5 H8 f! |
  30. // 新增加一个属性,用来保存uid到connection的映射5 T, z. d( z2 f/ }" d2 {
  31. $worker->uidConnections = array();" X/ N5 {& C' g* S* W9 w
  32. // 当有客户端发来消息时执行的回调函数; E" h7 k  z5 L0 M* l, Y- ?
  33. $worker->onMessage = function($connection, $data). }* N2 x4 E8 l! T
  34. {; P3 l; R3 Q9 C( ^
  35.     global $worker;
    7 i6 |2 y/ H& V% v7 [( ~' Q& ?7 C/ i8 C
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    ( I& m' A- f  D( C
  37.     if(!isset($connection->uid)); z/ Q: x9 R' {3 V
  38.     {( y7 W5 v2 q+ U; L$ }  Y1 ~7 B
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    % h1 r/ m8 X9 j, j" P& y
  40.        $connection->uid = $data;/ l6 Y# q, V1 w' X7 R. P* u6 d
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,& ]3 C% z/ C4 ]
  42.         * 实现针对特定uid推送数据$ `" K# e+ ?* @' ]; E
  43.         */
    + u5 h9 v5 K+ h( H) i/ Y( a
  44.        $worker->uidConnections[$connection->uid] = $connection;
    " E; L4 @$ ~6 Q; O) j
  45.        return;
    - H' L( Y7 K8 U1 ~3 c6 s% n
  46.     }
    5 L7 M9 m, N/ m# c) `8 E  P
  47. };4 v0 `; ?/ j5 _: A# s

  48. " {# c( g" }( T: U
  49. // 当有客户端连接断开时% x% F! K2 s. P8 M9 P$ k2 m+ L
  50. $worker->onClose = function($connection)+ {0 Z, }8 S% g/ \9 x' Y" x( A
  51. {6 Y, o4 J4 f0 A) [* b. t" l
  52.     global $worker;2 x# o. M, u6 G/ k" w
  53.     if(isset($connection->uid))
    / B+ d) C2 {# A+ H
  54.     {7 C2 Q9 B0 N2 @; O% k) z0 ?
  55.         // 连接断开时删除映射
    1 ^; |# W: J6 m* B; B% p- T9 \
  56.         unset($worker->uidConnections[$connection->uid]);! F% t: V& A" Z7 g. _& u: a
  57.     }) `+ Y% S* B) ^' [% Z
  58. };" I: J0 o% _. K! h
  59. 5 b# w  w% ]+ [2 i5 t' i3 g
  60. // 向所有验证的用户推送数据+ X2 N8 }* ^1 a! j" S
  61. function broadcast($message)
    , U/ w" o# ]& |8 N+ o
  62. {5 K4 X, E$ N$ z7 e) W& D
  63.    global $worker;
    / F* V6 s. F' }
  64.    foreach($worker->uidConnections as $connection)" S& Y3 o$ s. c8 o4 F2 ~! x
  65.    {1 P% [3 E! B7 a8 k8 L- j# [
  66.         $connection->send($message);
    3 q8 A" O8 k1 b" N: X; u0 v+ L+ |
  67.    }3 Z/ M3 r. Q+ x
  68. }
    " q& Q! {  u6 m) z

  69. 7 x2 q4 F% j, `: T& u( D- \
  70. // 针对uid推送数据! I7 W: P; f3 Q+ G7 f
  71. function sendMessageByUid($uid, $message)
    % S# A5 Z% t* g/ D
  72. {, V( @4 X, K0 r- O( T1 B7 X0 T2 G! i
  73.     global $worker;
    - ]& ~  e& C% o; f) T" ^3 S
  74.     if(isset($worker->uidConnections[$uid]))
    ; v( O# N0 e) i9 Y0 X2 y' \% s. Z
  75.     {
    4 _" {4 W% e2 d) @
  76.         $connection = $worker->uidConnections[$uid];) |( m; r( r8 K; s+ o7 @
  77.         $connection->send($message);
    % J1 t! e% ~' C- P- u8 u& f; {
  78.         return true;+ E6 ]1 `6 e) X7 w$ }( J
  79.     }
    8 \+ t' m, L9 R5 V0 ]# f; b9 i) W
  80.     return false;; ^5 L" d  `/ ]1 S& w
  81. }
    ! u& o6 A" G2 d: s
  82. 6 k6 r; w0 F/ U$ T9 g; _% C: O  I! A* E
  83. // 运行所有的worker
    - v4 s& V% n; h
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');9 W4 Z; b8 P1 n) v6 e3 k
  2. ws.onopen = function(){3 p' U* v, e) @; n
  3.     var uid = 'uid1';
    . r  v5 q- N3 ~* S! O
  4.     ws.send(uid);
    ! ^* F+ w( j! I6 _) V% ^1 _# P9 [
  5. };9 S  f2 g' P+ F
  6. ws.onmessage = function(e){
    " h% ]1 D2 H7 r. T, o
  7.     alert(e.data);9 P: K0 c  e, B+ e# j
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口8 X1 d% a0 @+ l) d9 |" ~1 I
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    , A4 R% P5 Q4 p3 d6 b
  3. // 推送的数据,包含uid字段,表示是给这个uid推送; e3 I7 m' Z9 K$ }) L
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    8 @8 ^" P' }: |+ x1 Z! v
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符9 S  ]* J! V1 G7 Q5 u  m5 b
  6. fwrite($client, json_encode($data)."\n");
    5 m3 @$ a. w* _; g3 I1 r% E4 s
  7. // 读取推送结果
    : `, h6 w+ p% ~- P; ^
  8. echo fread($client, 8192);
复制代码

$ J! r& r3 Z3 F5 u5 _8 b. \7 j# o! m" Y% ]  D
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-4-30 21:43 , Processed in 0.086355 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!