您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 10515|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 显示全部楼层 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;( F* [) G5 M. j# Y- ^4 S: v
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    / Q+ h' q  C' X4 Y7 m/ r
  3. # A; ~, j3 q+ I0 [# G; K/ g
  4. $worker = new Worker();! B2 W" H, f( N7 }0 Y" j
  5. // 4个进程5 Q5 ^; U0 x6 s  G: S8 j2 B7 u
  6. $worker->count = 4;
    ! O" C) A  ]# O% L4 [
  7. // 每个进程启动后在当前进程新增一个Worker监听
    ; X) j9 }6 m- ^
  8. $worker->onWorkerStart = function($worker)' P; ]( h, u+ T% P' f4 l+ X! P- [+ j
  9. {8 [4 ^3 V$ U0 q& B) [
  10.     /**
    . K- {6 Y+ M) G" U
  11.      * 4个进程启动的时候都创建2016端口的Worker/ C, P8 D8 o% G+ P# H
  12.      * 当执行到worker->listen()时会报Address already in use错误6 z6 }! M3 k8 l. l
  13.      * 如果worker->count=1则不会报错! G$ q; p4 U+ x9 N
  14.      */
    / X/ Q* s- g9 o! f) {3 [9 c
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');2 C/ F5 j* S& P7 z7 M
  16.     $inner_worker->onMessage = 'on_message';
    2 X( o( n8 l: \( W2 Y1 l
  17.     // 执行监听。这里会报Address already in use错误' X/ }5 [! @( |2 V: C3 ~
  18.     $inner_worker->listen();
    / f5 v# P6 F+ u1 U; r8 E# B
  19. };+ L  {9 v; j3 U

  20. ; y5 s( n4 M' Y8 Q
  21. $worker->onMessage = 'on_message';
    + z. T8 g# m- S0 T7 z

  22. - ?1 w2 A3 Z' ]6 z& [) N
  23. function on_message($connection, $data)# k2 O6 _# U& M! \
  24. {
    " M; g* B; K7 q' l: N
  25.     $connection->send("hello\n");
    - M6 R- L7 h2 Z9 ]# c
  26. }/ O2 E0 G- f) h6 C0 i
  27. # n- ]% c0 K7 a% C2 \( _) w
  28. // 运行worker% y0 M9 X1 M. g0 l
  29. Worker::runAll();
    5 q( T" ?8 ~9 M3 R1 J0 ^
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    3 z. O/ o4 d5 O; `, P" b0 R- J
  31. 4 o: M. n) d& ~6 m
  32. use Workerman\Worker;6 q+ m5 i, m$ {0 G  M1 N# m9 n
  33. require_once './Workerman/Autoloader.php';# g9 o% [- d0 O  [8 r$ d3 S

  34. ) `3 Q: {! ]0 f
  35. $worker = new Worker('text://0.0.0.0:2015');
    ! s# i2 i0 `6 X0 ^( _* m0 q, p: k
  36. // 4个进程2 Y+ ]; n: M- Y1 n5 I' g: E+ o/ T! z
  37. $worker->count = 4;* L1 z* w! s% s9 G# z% s* I
  38. // 每个进程启动后在当前进程新增一个Worker监听/ e2 ~. c/ [! ]8 c0 y+ N
  39. $worker->onWorkerStart = function($worker)( F, K1 V4 F4 g$ G3 j4 z5 Z
  40. {5 V( Q/ ?4 c9 s. {: [
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');+ F' n: W3 ~( t) P
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)3 M- H8 D1 @( I% Y9 @6 T- ^+ f' d5 e
  43.     $inner_worker->reusePort = true;& V) N0 e/ u1 K6 ]) z
  44.     $inner_worker->onMessage = 'on_message';" U: W9 G) a$ t0 \
  45.     // 执行监听。正常监听不会报错
    + K! s9 J" s, Q3 E
  46.     $inner_worker->listen();
    0 Y2 E/ z5 S0 o/ l& [
  47. };, O1 Z0 Y( R4 q  W" Y6 Z6 F7 _1 I
  48. % G" S% F* W3 J6 M7 j
  49. $worker->onMessage = 'on_message';
    % @; [' F& r# S$ X5 T1 l

  50. # d4 f3 A; k8 d% L( b
  51. function on_message($connection, $data)
    * V2 S6 T  \! L( M
  52. {, C0 ~" ~) A9 {8 h
  53.     $connection->send("hello\n");
    , c0 e; M3 W' q  C4 W& z' `: ~
  54. }
    2 Z0 @" i8 D: a9 g& S0 H9 ]

  55. 6 r3 O* E: r0 Y. ]4 C; C6 s) F
  56. // 运行worker
    % B& k- C3 r0 S! d! z- K9 j, k
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php) }1 L! x; w. a" m
  2. use Workerman\Worker;4 ~. S+ w/ `3 z, z
  3. require_once './Workerman/Autoloader.php';7 P: s( S4 g& p" d# H' Y3 z6 w* K& U
  4. // 初始化一个worker容器,监听1234端口3 K% h! e; v* T2 r6 @- V
  5. $worker = new Worker('websocket://0.0.0.0:1234');4 O3 R/ D; u. x! G7 X9 o! X0 Y

  6. : w6 F% x8 G( i
  7. /*
    " \, n0 N/ ^( k9 P+ ]
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误& u6 g' ?0 X( c3 ~4 s2 t7 N  ?
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    9 e3 f7 A1 }# H" y  Z7 b0 T. ?9 S4 d0 H
  10. */
    * w6 `; g/ i1 V+ d2 i" {
  11. $worker->count = 1;6 m; k# N2 {( }
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 g+ ~# k$ ]5 }, a! E% G7 g
  13. $worker->onWorkerStart = function($worker)
    ; p7 v& I- m- ]0 i) H2 O
  14. {. n6 m" i4 Y: t  K" f
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符9 U: R2 ?5 d1 ]1 s( k, P# ?
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    2 @) z! ~, [. p# M, W* X
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    9 Z4 n5 e; u$ F$ Q2 N7 I
  18.     {
    5 `2 r' o/ A# |4 B8 J4 j  n
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据  T! u/ g' B% R: l6 r
  20.         $data = json_decode($buffer, true);. R$ O, O& F- P. y$ U) ^
  21.         $uid = $data['uid'];5 t; m8 N3 ?9 B/ ]3 t
  22.         // 通过workerman,向uid的页面推送数据
    , N- _( _5 x/ f6 g, F0 L1 T! q
  23.         $ret = sendMessageByUid($uid, $buffer);
    ) E; `; B5 u7 H- @  E
  24.         // 返回推送结果) c) g+ q2 u3 k" S4 k
  25.         $connection->send($ret ? 'ok' : 'fail');
    " Q' m2 I4 a1 H' ^: J! t/ m
  26.     };5 J: A$ i1 B) R- E# a% M
  27.     // ## 执行监听 ##
    $ M( h5 ^0 L& [* }' K; B5 |1 I
  28.     $inner_text_worker->listen();2 Y1 ^' i6 ]5 {  m" `
  29. };( g3 D1 N* E: S
  30. // 新增加一个属性,用来保存uid到connection的映射# K) L1 X4 J7 |! }# g
  31. $worker->uidConnections = array();
    ( J" z6 @/ b% m& U
  32. // 当有客户端发来消息时执行的回调函数/ L  S" `* t5 j9 W8 B6 U: s' I
  33. $worker->onMessage = function($connection, $data)
    ) A3 l: M5 F: {2 G  h: K
  34. {
    : d( m0 q; @7 \2 @& \
  35.     global $worker;
    4 T9 u2 }. @( |+ Z: m4 M
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    " W% n& @% K3 d7 c7 \5 Q0 M
  37.     if(!isset($connection->uid)), @! Q, o/ M+ P! v
  38.     {
    6 V( @; p+ p! @( y
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    9 \" Q: D/ C2 B6 a0 j
  40.        $connection->uid = $data;
    9 M" y  u7 J/ ~# Q
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    ' Z) F4 V* ^/ K
  42.         * 实现针对特定uid推送数据: O- b2 _: L* Q( L3 ~3 O
  43.         */( I8 C( H- T) n. j
  44.        $worker->uidConnections[$connection->uid] = $connection;9 W- |0 M1 ?2 F0 b5 ~
  45.        return;
      r( g7 D2 k8 x5 E# z- r& }3 d
  46.     }1 b/ x) y# Y. ?3 M. w, [: K  M
  47. };
    6 U7 X0 i/ C* k( ]

  48. * b$ x* }. S$ r& j1 ]) U6 f
  49. // 当有客户端连接断开时6 A" w$ i0 |) {2 W
  50. $worker->onClose = function($connection)3 L1 l& Y# |; O+ e) E
  51. {# f  b0 d+ I: Q0 D$ \, r" q3 n8 ?
  52.     global $worker;. u; Z- @9 @* R4 y
  53.     if(isset($connection->uid))
    ' [0 v% n  I; L$ i8 S; O
  54.     {$ s) X0 M$ B! |( l. r
  55.         // 连接断开时删除映射
    + O3 y$ F, p9 O+ L: l2 Y
  56.         unset($worker->uidConnections[$connection->uid]);; l. \" o' L5 Y) @
  57.     }
    & D/ S; w4 E9 c8 |: F
  58. };( v$ R  a7 d! o- P6 [' p

  59. ; T0 _- U# ^/ N. `0 l8 ?$ w8 A
  60. // 向所有验证的用户推送数据
    7 K: D( c7 }% a$ F" O! t, }
  61. function broadcast($message)
    - Y2 A  l0 b9 h4 n2 H
  62. {
    + M7 W/ y3 @) K3 g' J+ ]
  63.    global $worker;
    6 f% d+ T+ m$ u4 V7 F6 K& C3 F8 E
  64.    foreach($worker->uidConnections as $connection)) P5 n5 N% `" I! S& z% s6 ?
  65.    {
    1 o* B4 o% v3 k2 _5 _$ @% s
  66.         $connection->send($message);# @$ Z/ \' T0 ^7 j* ]% T- b" A$ q8 t6 {
  67.    }
    4 S) J2 Q1 {) I9 X! E1 d
  68. }
    ) e! i% F$ q' Q
  69. + b# A6 q: `- {$ [
  70. // 针对uid推送数据7 I8 A0 _  e; \9 p
  71. function sendMessageByUid($uid, $message)
    3 Z4 q5 E( @' F  Y. Z
  72. {( T. H% i) _, g2 }8 }
  73.     global $worker;
    ( b. _- d4 f  {
  74.     if(isset($worker->uidConnections[$uid]))
    % N# h: T& A1 E  t
  75.     {
    & k1 Q7 c: p; d; h9 [% `
  76.         $connection = $worker->uidConnections[$uid];* x3 w+ R! I5 J* Y
  77.         $connection->send($message);, b3 A8 [* z" V7 u% U
  78.         return true;! F0 W" V* {3 v
  79.     }' x+ [: G$ U; J* A
  80.     return false;
    . O6 ?/ T6 D7 S
  81. }
    7 v* ?/ g7 w6 t# d4 M" F% G- t7 r
  82. & z! y+ M& E% K, \9 J. W
  83. // 运行所有的worker
    " e8 h8 G5 l9 v% U) ]8 i
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');& p  j) |' V( e; p& d) d) X$ \- S
  2. ws.onopen = function(){
    % f6 a& R# G3 B: ^
  3.     var uid = 'uid1';. |" L& q6 l2 {1 M* n
  4.     ws.send(uid);
    . y6 n3 B! h; n
  5. };
    2 H. l' }, U1 E) Y: ]5 g
  6. ws.onmessage = function(e){
    / F8 A2 h$ j$ K# K9 `
  7.     alert(e.data);
    * [( k0 a) S3 z6 _9 n1 K8 r% F4 R
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    1 J$ e: |# z; Y4 L
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 R0 L, D2 z. D" l  h
  3. // 推送的数据,包含uid字段,表示是给这个uid推送5 \1 z$ o* s" b
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');" b- K; u/ O. w. j
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    . w- J( J7 b5 G& }$ f
  6. fwrite($client, json_encode($data)."\n");
    - g3 @: K1 T, e4 T% E
  7. // 读取推送结果
    ' U0 X5 L2 R5 m( p7 }
  8. echo fread($client, 8192);
复制代码
( _3 L. M; J* w( o2 B
; ~8 e, S7 s' |' @. }/ z; q
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-5-18 23:28 , Processed in 0.105811 second(s), 19 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!