您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 15067|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;( T3 e8 ^/ {& w, s6 `5 {* Z
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    2 R7 l6 G8 R9 K8 h' j6 r- Z( i

  3. 5 y9 b: b; F" ?' t$ ~5 x
  4. $worker = new Worker();# T; I/ Z$ ?( T/ T5 x2 j
  5. // 4个进程
    . T" M/ o+ ~; K1 n
  6. $worker->count = 4;* c2 O% A" ^- M+ X' v
  7. // 每个进程启动后在当前进程新增一个Worker监听
    . b' x* Y, V- x1 l. U* E" B
  8. $worker->onWorkerStart = function($worker)% B1 D0 }( `+ u
  9. {5 j# g' i; G7 U" C
  10.     /**
    ) A6 E1 d6 [) W+ M' M$ C% [5 s
  11.      * 4个进程启动的时候都创建2016端口的Worker0 @8 ?  I/ V: l
  12.      * 当执行到worker->listen()时会报Address already in use错误' _/ z1 u# C3 Q. A6 x
  13.      * 如果worker->count=1则不会报错2 @& y! G0 `8 ~
  14.      */9 F5 r1 o- s( a0 S
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');, X, G% \" O6 Z. d' l  y* U
  16.     $inner_worker->onMessage = 'on_message';. \3 ]) B5 E; S/ u  G, F
  17.     // 执行监听。这里会报Address already in use错误- _& F, d+ y$ e/ }4 h( f3 q
  18.     $inner_worker->listen();
    ' c- p/ w8 m, I8 y) c
  19. };( q- @0 k+ u) L% e

  20. ( ]  m2 T1 [" y2 R( R
  21. $worker->onMessage = 'on_message';
    ; i8 Y4 @, C2 d: n
  22. 4 I, e7 A' q/ A! D! b
  23. function on_message($connection, $data)' r3 b4 f% B5 ~( }- c. A
  24. {9 h3 r7 k" _, u9 D; p  d- \! A
  25.     $connection->send("hello\n");
    ! Q5 i' [+ P" Z: b/ [* v
  26. }
    6 T/ y  w( i  w7 j: @
  27. / S- G, }- d& Q+ T6 ?6 |
  28. // 运行worker" J. b; z1 |1 V' C
  29. Worker::runAll();, l" ?1 M) f" T- i# p  ~8 O- j( q! O# a
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    , D' G/ i6 ?1 u1 T: S' D* k
  31. 4 m; K: N. m7 C1 A! F- A
  32. use Workerman\Worker;
    5 {- U; K/ X' ^& r3 v' f) m/ H, J
  33. require_once './Workerman/Autoloader.php';0 [, Y; Y/ v; m5 x9 K3 V3 p6 F

  34. . x2 x; N8 G* ]
  35. $worker = new Worker('text://0.0.0.0:2015');
    . C" S' ^* w2 G2 O
  36. // 4个进程
    ; X% ~" J, j  M% @* Y- T; N$ q
  37. $worker->count = 4;
      A0 ~! X6 H& L, g2 U) `
  38. // 每个进程启动后在当前进程新增一个Worker监听; N- X- g9 _5 c, d# _. s3 U
  39. $worker->onWorkerStart = function($worker)6 f% D; I* F+ e2 L. ]# @# {
  40. {0 k  j4 G, _) r
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    / @0 v; r5 u7 d8 I" ?3 }7 q
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    - L/ f: U7 `% b+ C- e& U- S. G, |
  43.     $inner_worker->reusePort = true;  |0 M2 n/ o. u; S' o- l- |
  44.     $inner_worker->onMessage = 'on_message';9 c# ^6 G7 ^( ^/ ^) |
  45.     // 执行监听。正常监听不会报错
    ) ~9 L# L7 u6 w" R
  46.     $inner_worker->listen();
    % \5 i' u3 M2 |% y: S
  47. };
    : F0 H* g+ _5 }3 @( M  H1 X. {) d

  48. ( X7 N$ n. L. x1 j
  49. $worker->onMessage = 'on_message';
    5 A; H) I6 q9 j% Z: ~# K
  50. 2 O9 z6 o9 g7 b- V
  51. function on_message($connection, $data)
    1 G# D* c& X& u" h) E+ F5 i7 Z
  52. {
    5 }1 T! j! c% D2 [' y
  53.     $connection->send("hello\n");( e0 }1 ^1 {, p9 H, |" k( y
  54. }
    . R- g+ k$ e# U$ \3 l/ A' N& B. V
  55. & S" p$ [: \8 L7 O; W* P/ o. y
  56. // 运行worker
    ; |1 N3 ]" I3 v/ N8 v. W
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    3 ^- ]: n( H$ H1 v/ e# V8 ]  L7 e* J
  2. use Workerman\Worker;+ d6 K% K# j! D5 ?
  3. require_once './Workerman/Autoloader.php';
    : [5 R3 v) y4 j2 I) L  t: X2 h) V
  4. // 初始化一个worker容器,监听1234端口
    9 {: A4 i, V0 T" p
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    , [( w1 y+ U; F2 b1 I0 i( p5 @
  6. / \9 R( ~6 x2 ^; u  x
  7. /*
    4 U7 T9 m$ N6 V: s) n' g
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误1 G/ S0 L1 H* ?- a
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)! w3 L. R3 S$ E; W6 _: ?  Z0 a! K
  10. */6 h) }- R/ k- z4 G9 w
  11. $worker->count = 1;
      i) S2 N" K: R
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口9 _, T! @9 |/ \) @# x7 G! c: R% b
  13. $worker->onWorkerStart = function($worker), A( W6 K. c& R, ^4 D  X$ B7 F: S
  14. {
    1 F% Q, v8 q. \" }) v3 U. @
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    % Y( @* i8 T" v
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    ; H( m* d3 ^$ Q$ n9 q: e: ?( q/ k1 s# }
  17.     $inner_text_worker->onMessage = function($connection, $buffer)1 B! z: v5 e7 ?  |
  18.     {6 V% K& O. D3 @) q. A7 t8 u, w
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    ) a& W8 I/ R) {/ v2 M! F0 l
  20.         $data = json_decode($buffer, true);
    8 k4 ?4 K3 a! Z* i4 b
  21.         $uid = $data['uid'];
    " l0 {8 v  V" g, f+ `1 D* \9 G) x% t3 m
  22.         // 通过workerman,向uid的页面推送数据
      N0 i' Q% ~# w* N. P1 f
  23.         $ret = sendMessageByUid($uid, $buffer);
    0 m4 u. X, U9 [' j
  24.         // 返回推送结果$ E8 Z8 ]2 f5 b/ U$ Q5 S" \
  25.         $connection->send($ret ? 'ok' : 'fail');
    2 `* n# V/ ~9 n' z& _- |4 t4 O
  26.     };
    9 e0 y' p2 v* r/ Z( C
  27.     // ## 执行监听 ##* ^  X8 x6 z( a! Y( s: ]
  28.     $inner_text_worker->listen();# {8 t% J3 y4 @4 `
  29. };6 b) B& P% x$ G( K, S
  30. // 新增加一个属性,用来保存uid到connection的映射+ u5 V! k4 V! K. ]4 L
  31. $worker->uidConnections = array();
    ! D8 F9 W9 H/ ]+ K
  32. // 当有客户端发来消息时执行的回调函数5 Y( t- x! }" q, y
  33. $worker->onMessage = function($connection, $data)
    ! ~! ^# H: K- h0 a0 ~- o" L7 M
  34. {+ S/ u  P1 Y& {2 z6 U6 w2 ^
  35.     global $worker;
    3 n3 O2 X7 e  ^4 n
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    0 c: D/ k1 [- ~/ S$ ?- T  O( I3 D7 h
  37.     if(!isset($connection->uid))5 [& _# R# G; n% X
  38.     {
    1 O7 O3 L0 g; b8 b& `0 p0 g
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    ! ~% r- b  Z8 q7 Q
  40.        $connection->uid = $data;8 R, ]1 z2 }2 t4 M) t$ W0 \
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    " q8 Z& `' Z2 H4 Y0 u
  42.         * 实现针对特定uid推送数据& Q, c5 V5 ^# S, Y4 d% w0 }. }" {
  43.         */
    , E; e* C0 i. r
  44.        $worker->uidConnections[$connection->uid] = $connection;; ^; X4 N% z1 ^  @
  45.        return;, g; L: K" z0 w2 ?& n# |$ G( y8 s
  46.     }+ v. F2 t; b* y5 r
  47. };6 x# ^0 d) B7 X/ h* G; b% J& {

  48. 4 h; f: K# q$ F. w9 w7 w, A
  49. // 当有客户端连接断开时4 ]9 E2 `8 C1 u+ a# v" |' o
  50. $worker->onClose = function($connection)
      T7 _+ l) d' j0 ?
  51. {7 `# d. u, \* q- H1 B1 `
  52.     global $worker;
    * ]" u7 D, j' g. O6 K9 X+ L- u, y
  53.     if(isset($connection->uid))
    # V0 q  d  a8 L* p( N$ A
  54.     {7 M7 F) A0 D5 o) r% k! `
  55.         // 连接断开时删除映射( S1 ]% J$ V3 K+ h' Z
  56.         unset($worker->uidConnections[$connection->uid]);1 l9 ~' {# b4 A9 F" f
  57.     }- u2 u+ j  _" c: J9 c6 O9 i/ q! [
  58. };1 _3 `, d2 H5 s  O, Q5 }
  59. # v" ^$ k9 M5 V! [- N; r7 J, ?
  60. // 向所有验证的用户推送数据" y1 m# A) z( y5 w  }6 S# m
  61. function broadcast($message)- U! F7 E- d, R
  62. {# W9 ]; A. S' ]
  63.    global $worker;! D/ B! }; Z( [" b
  64.    foreach($worker->uidConnections as $connection)
    # R2 r1 V  S+ |- W
  65.    {7 r! Z+ \! i# I- v+ |+ @# Y& v$ i
  66.         $connection->send($message);* W2 ?5 X# {; D2 z5 K( u) q0 U0 s  z
  67.    }
    6 l5 D+ h5 w5 R+ j7 S) C0 N
  68. }
    / F: |3 Q- _+ @& Q" c" `& x
  69. 8 k: W( R; l4 S# K0 u6 G
  70. // 针对uid推送数据
    ! ~& E1 @' x" f1 Q4 a  w
  71. function sendMessageByUid($uid, $message)
    * X8 r, Y- g" b1 K( j. [! Z
  72. {
    * @% K/ ?) Z' [/ l! `: j0 [2 M
  73.     global $worker;
      [, h; I2 k$ \# i* l1 i; U' Q
  74.     if(isset($worker->uidConnections[$uid]))% [  F4 s+ y" j/ C4 y
  75.     {, j! s& y8 ?# w3 C
  76.         $connection = $worker->uidConnections[$uid];
    1 W. u  {! c: o/ y* o9 B
  77.         $connection->send($message);, }3 T  ^; j. t2 a, ^' F: s' {' }8 a
  78.         return true;4 V& D8 E' {- W* q0 d/ B
  79.     }0 R2 T' `6 v, h  _  \
  80.     return false;6 V" ?$ `2 g  h+ \3 u3 G$ e
  81. }
    ; g" Z) l3 J; f& N4 G  R
  82. 1 i, k- E' c- z: D$ @
  83. // 运行所有的worker
    6 Q' q3 R( v2 O- ~
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');4 j5 P1 u5 w" Q1 V
  2. ws.onopen = function(){
    ( m# _' y; V- p# \
  3.     var uid = 'uid1';
    ; \3 @; C% B: _
  4.     ws.send(uid);
    " G1 J8 h6 ^2 g$ P
  5. };
    4 g) Z0 q7 e4 e9 [4 F# t: C
  6. ws.onmessage = function(e){  G; l" W. C# J$ s- Y) ~
  7.     alert(e.data);
    & p; m; b" |- W1 R# y! g% X1 r4 B
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    $ o, X2 L  H: d8 n1 m
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);# S" I+ ~7 {2 t' o6 z
  3. // 推送的数据,包含uid字段,表示是给这个uid推送2 ~4 K! Y! J- _8 ]  A9 x
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    5 W- z- q8 G' E- z8 d" r
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符4 ^! R3 j! \3 W) X
  6. fwrite($client, json_encode($data)."\n");
    # @& Q0 w5 Q: \  y  N9 j5 A9 n
  7. // 读取推送结果+ G5 W# O* ^: C/ L* s4 q
  8. echo fread($client, 8192);
复制代码
& @5 V/ C! [6 H- q+ W
1 g" @4 }2 m: j
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-4-30 20:50 , Processed in 0.056801 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!