您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14802|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    $ E) P# H  S- M* |3 q
  2. require_once __DIR__ . '/Workerman/Autoloader.php';1 p( {* ]) O$ N
  3. 5 r) C9 ~" w! L9 o8 I" p" X/ E
  4. $worker = new Worker();& v3 ?- ?0 u) W. n
  5. // 4个进程' y# a( W" B; m
  6. $worker->count = 4;
    6 S0 H/ h6 e, n# v; s2 N4 W
  7. // 每个进程启动后在当前进程新增一个Worker监听
    $ r# ]6 D2 l2 a  K- D
  8. $worker->onWorkerStart = function($worker)8 ?- g0 U" S1 R
  9. {+ T( ^! @8 B& v* ^# ^: i4 w- v
  10.     /**' a; G% R0 k. {2 s, p% Q9 w+ q8 O& o
  11.      * 4个进程启动的时候都创建2016端口的Worker
    , I; l, f2 l; G. G- f: Z6 T/ u$ p
  12.      * 当执行到worker->listen()时会报Address already in use错误
    & E" J* Y/ o% }( i. t# }1 @
  13.      * 如果worker->count=1则不会报错, S9 g7 D# T1 i/ P' Y/ F
  14.      */
    5 W2 b# q" H7 M
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');8 P6 }! C- Y, S) x3 {/ U6 P
  16.     $inner_worker->onMessage = 'on_message';) d$ e  j- b& F$ i2 Y, A: n
  17.     // 执行监听。这里会报Address already in use错误7 r7 w, x+ C0 D; B, T: i
  18.     $inner_worker->listen();. v( y/ c8 l! t$ J8 i# w& r
  19. };+ f# v+ u3 ?4 z; i: F
  20. 4 h/ R2 Y- w$ o
  21. $worker->onMessage = 'on_message';
    4 w) `7 Q+ T  p4 U( A' o
  22. , O1 V+ p' `/ L* p7 ^' p7 f
  23. function on_message($connection, $data)
    . O: o% j6 ^, C$ M
  24. {
    : m+ Y) X* ?  y
  25.     $connection->send("hello\n");7 d. U& S6 g- a9 p! S
  26. }6 W/ i' {1 z5 D& ^: m' F0 H
  27. 4 v, w9 H/ |9 K+ W4 Y# L7 L
  28. // 运行worker
    ( c' c7 [" y2 [0 C( f. i
  29. Worker::runAll();/ B9 N: t  ?& e
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:/ y/ T/ N7 _% d2 E, v  K

  31. 4 s" X1 M, i  T- Y4 u8 m
  32. use Workerman\Worker;
    1 ?6 I0 C2 M6 ^/ h! Z8 G/ _+ m
  33. require_once './Workerman/Autoloader.php';
    ( F7 D) K% l, ?# P& G8 d
  34. 6 b& _2 O7 o2 N0 M, O$ Q& y0 b
  35. $worker = new Worker('text://0.0.0.0:2015');! P. T0 a/ f$ b+ q& S6 Q- k, H9 p
  36. // 4个进程
    , O2 q' u5 l# G/ e/ n- X
  37. $worker->count = 4;; N0 l% }) [. J6 P
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ! n! D5 v( U/ f' b* H: H  F
  39. $worker->onWorkerStart = function($worker)  D( M, W7 {2 a
  40. {* M. R  u0 N) ]
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ; ~( T0 v4 R; v+ ?  u
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    4 ^! w6 s0 h# L+ z1 g* @  U
  43.     $inner_worker->reusePort = true;' G+ x, o* C0 A# P2 q
  44.     $inner_worker->onMessage = 'on_message';+ t  V  C5 p9 a/ q, D) d
  45.     // 执行监听。正常监听不会报错2 g0 }9 a) g& A( O
  46.     $inner_worker->listen();
    ( @6 o- i7 k. K- t0 g: \. ]" m1 u
  47. };
    % K+ `0 v& A6 Z) |$ N+ G

  48. 9 ]4 p. V! ]5 O# M& u2 v
  49. $worker->onMessage = 'on_message';
    9 }- z: y, j3 R# g! ^, }9 p

  50. & e( C# J4 A  {6 h5 w
  51. function on_message($connection, $data)- }2 ^' H+ [7 }$ b' y8 q
  52. {
    % P6 Z9 }, |" T
  53.     $connection->send("hello\n");
    + [( ^. ]" k7 \8 o# r& g. S
  54. }
    9 s+ s# F& f9 i

  55. 4 }  u8 Q' {, U+ @+ G: c
  56. // 运行worker
    7 u9 K. m7 Z# u2 \
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php7 C4 y# q$ W' k; Y
  2. use Workerman\Worker;2 z5 N% N- G2 ~) c: @
  3. require_once './Workerman/Autoloader.php';
    ! E* m+ N4 Q0 p( i
  4. // 初始化一个worker容器,监听1234端口
    2 N% S( ^/ t% w/ Y7 f9 g7 o
  5. $worker = new Worker('websocket://0.0.0.0:1234');" v: \' K5 H. ?& t
  6. 1 {1 _: E4 w3 e9 G3 U1 G
  7. /*
    + H0 L; b! k5 H
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误: k6 O) h5 Q$ L6 u
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true). e9 v! I8 _! f  v3 v. x! b
  10. */
    5 ]. Y9 M% G) H3 l8 p! s5 o9 }! ~
  11. $worker->count = 1;
    . ]% a6 P7 \  [; ^. {" [- a# ?7 n
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口' t8 y7 J6 h6 k* c
  13. $worker->onWorkerStart = function($worker); P7 n& K, U5 @2 G- T" M( K
  14. {
    ; ~; K6 e/ [% T5 L! D
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符( P2 t# q! r  `" e3 j( ^
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');/ E9 ?* O/ T1 D3 c
  17.     $inner_text_worker->onMessage = function($connection, $buffer)' m& B8 \8 s" A( i+ H( `
  18.     {
    1 n2 n: P: y/ @# E2 X/ W! q
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据
    + g8 m1 r& o' s% I
  20.         $data = json_decode($buffer, true);
    4 ~# G  n8 o" _1 T- \: A, g
  21.         $uid = $data['uid'];$ @: d5 f  f0 I! F) J. ^6 M+ N* F3 J
  22.         // 通过workerman,向uid的页面推送数据, w5 Q8 j  T% v/ t/ k. G& @. `
  23.         $ret = sendMessageByUid($uid, $buffer);0 z( X/ k. f* c
  24.         // 返回推送结果. [+ D# ^8 d% ]2 }' e
  25.         $connection->send($ret ? 'ok' : 'fail');
    & P+ X8 M+ @/ R5 n$ ~: V) o
  26.     };+ p0 a0 M- r6 ^! l3 O5 J3 o6 [
  27.     // ## 执行监听 ##; f% _& \$ Z) E# ~
  28.     $inner_text_worker->listen();
    ) k! o# n7 H* U$ m0 N* l
  29. };' Q$ B/ A" D0 D0 g  n8 I5 ], g7 e
  30. // 新增加一个属性,用来保存uid到connection的映射. e% `7 R: u7 z  n: Q  I+ c7 E) n
  31. $worker->uidConnections = array();
    3 @4 s  W  O8 z' Z0 p
  32. // 当有客户端发来消息时执行的回调函数5 ?6 M7 s) l' ]9 u; [
  33. $worker->onMessage = function($connection, $data)( r- x; y) O" z7 Q/ H
  34. {
    ( U) g; ~# ^- B* B  B
  35.     global $worker;( [* U+ W* c3 s! }6 e0 M( }; K! z
  36.     // 判断当前客户端是否已经验证,既是否设置了uid, {" E, M3 H  J
  37.     if(!isset($connection->uid))
    ( i: ], p# a8 h+ _" J( t' S3 b. I
  38.     {
    $ Y8 C" o/ J( _2 U
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证); m4 k# N' [1 a, Y6 z3 O* m
  40.        $connection->uid = $data;
    + E8 w" b+ @* u4 q* p
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,8 |  n0 A6 }; j
  42.         * 实现针对特定uid推送数据
    ; Z& a# k: K. L: Z& S9 B
  43.         */* @4 T  Z8 j$ h6 T0 O" f  V
  44.        $worker->uidConnections[$connection->uid] = $connection;
    & b5 @+ F; M8 g* I
  45.        return;, i9 Q9 }: E- D) g
  46.     }
    1 t- h" S% {+ Z' Z1 y
  47. };
    + u% O" Q/ m/ c5 R4 Q

  48. + A7 U* b' d7 o2 |3 E* T3 R* E
  49. // 当有客户端连接断开时
    " r- k% a  a/ {( ~
  50. $worker->onClose = function($connection)  G  Z3 \# C9 t2 s
  51. {
    & O8 Z% h/ k" r6 @. i
  52.     global $worker;3 F, j& l5 p& Q6 r
  53.     if(isset($connection->uid))( I0 \/ M0 q5 i, l
  54.     {
    0 A. m# D  W" V8 c# S! Y
  55.         // 连接断开时删除映射
    ; C) @' h! K  x4 J
  56.         unset($worker->uidConnections[$connection->uid]);2 v7 {# p1 X3 F/ x( s7 w) ]
  57.     }
    , W$ f( Y0 d2 F2 n* n0 |
  58. };
    ( @. j& o3 j  F: n. n1 i1 p3 I

  59. 1 B+ Q; _: U( p. f
  60. // 向所有验证的用户推送数据% D6 l1 X0 E% |2 `# t# @/ O
  61. function broadcast($message)
    $ \6 t; v: ]) ~% x
  62. {( g0 k8 q5 _8 n4 X" p# Q: L
  63.    global $worker;& j/ S- F  _; v1 N% S( M
  64.    foreach($worker->uidConnections as $connection)/ O/ H4 D: q9 s5 V; j
  65.    {
    , l) y1 S& N2 q9 T" u5 Z; b
  66.         $connection->send($message);# H- X) k, I) x1 Q8 j5 v
  67.    }  d- X7 Z" g) c4 a
  68. }) V6 w# K7 g* C% t

  69. ! E4 U/ V# U' A' c  A$ n0 c
  70. // 针对uid推送数据
    ( q5 S9 B, \0 ~, `
  71. function sendMessageByUid($uid, $message)
    2 }* A" _3 w+ `  w# D9 o
  72. {, M( Q3 @5 Q5 U# K+ D% H3 n8 f7 k
  73.     global $worker;: V8 Z% |  M* ?! a0 p( e
  74.     if(isset($worker->uidConnections[$uid]))8 J: c: N/ r4 G9 M1 s* y% {& d$ c6 j
  75.     {  Z' F$ B' P, ~
  76.         $connection = $worker->uidConnections[$uid];
    2 a' U/ M, T  u9 Z0 `. U
  77.         $connection->send($message);3 [, k, q- H' M: ?! R! U
  78.         return true;; v, o2 f7 ^& q- K! Z. X; c
  79.     }  x1 D# a4 Q8 A+ R
  80.     return false;
    0 f) Y9 L. \) A  a9 Z9 D
  81. }
    / r+ {/ k+ M% B' K+ C1 h; I; q2 _  H
  82. ! U+ P  n" U2 |6 b$ v
  83. // 运行所有的worker; S' b6 y/ O! X2 M4 U+ t2 Q
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    # R* [; L- c# ?3 q
  2. ws.onopen = function(){9 ~+ r: S6 T9 O8 b) ^
  3.     var uid = 'uid1';
    8 _1 k) Y$ P$ }! P1 `1 ]7 X6 G
  4.     ws.send(uid);
    ' R! Y; a' X% P1 i3 @% b
  5. };
    & ^$ s1 M) W9 B1 N
  6. ws.onmessage = function(e){
    ' T& f! r& v3 F8 \" Z3 Q3 @# R
  7.     alert(e.data);; E: u7 N) D& N* n6 W" ^
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口5 l4 _" K9 S& t5 {2 g
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);. M$ M: a( C: J2 k$ N. F% _
  3. // 推送的数据,包含uid字段,表示是给这个uid推送/ J! m5 X: G2 s8 T
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');8 X: |7 b( r& B  K  |6 w3 U
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    + ]; y  \$ T! ?% b7 z
  6. fwrite($client, json_encode($data)."\n");
    % b/ _+ }1 m7 P1 M( s
  7. // 读取推送结果
    9 o  |+ ?7 Z6 ]; g% P+ U9 O
  8. echo fread($client, 8192);
复制代码

% x* n: F1 N# D( W. j2 b; H% W+ a" M$ w) `. j) v5 [$ C  C8 S: ?
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 18:10 , Processed in 0.053948 second(s), 20 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!