您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14803|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;( Z* F( m5 C) f2 }3 g1 t
  2. require_once __DIR__ . '/Workerman/Autoloader.php';% u2 O# g- E) N7 Q1 @
  3. & r. `- ~6 K& j" i+ t* l# S4 U5 z
  4. $worker = new Worker();
    ) U/ _2 T/ ?$ G+ u$ e3 v
  5. // 4个进程! C' B1 J( {: I' W' {  _& O
  6. $worker->count = 4;: C8 ~1 b" ~; W) O- ]: q: d9 ?  q8 J: B# e
  7. // 每个进程启动后在当前进程新增一个Worker监听7 C6 N  ^, w4 k$ S* T
  8. $worker->onWorkerStart = function($worker)
    . J; b4 z) W1 k: p  k7 W9 W9 D
  9. {
    3 X4 ^9 |& y5 L6 b- }. L
  10.     /**8 ~- b5 \/ x5 |9 P9 D: Q
  11.      * 4个进程启动的时候都创建2016端口的Worker
      h9 q1 F( J& f' q6 G
  12.      * 当执行到worker->listen()时会报Address already in use错误
    ! Q' _$ L9 R* I
  13.      * 如果worker->count=1则不会报错# R! F1 _* M. Z3 D1 Y' V/ a+ }
  14.      */1 z" P/ X# Q9 F' w6 P8 G
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ) f1 f$ ^. `; u+ z1 |
  16.     $inner_worker->onMessage = 'on_message';- ?3 T/ g1 R4 s6 l+ E& L
  17.     // 执行监听。这里会报Address already in use错误
    ' a" r+ v- U7 `( e- x4 e$ `
  18.     $inner_worker->listen();( q. @# w, B7 ~- L# N1 y, e' R
  19. };- I1 T* ?: |8 D4 n- a( d6 j0 @% L

  20. 5 x/ g1 s  x& R( ~* \, F
  21. $worker->onMessage = 'on_message';0 B- l" k6 P1 D7 f
  22. : f8 Y, C$ G9 `- i
  23. function on_message($connection, $data)2 I4 b% ~& K7 k! E9 Y% |8 I
  24. {
    ) _0 m7 E0 S  G& H: w
  25.     $connection->send("hello\n");% ]1 K9 j; C( A! S7 F; m( L
  26. }5 k8 K! L4 ~2 n0 B1 Q. B4 O) a: N# c

  27. 9 t+ e! Z; P! z
  28. // 运行worker
    ! Z3 m' L* @* r: u
  29. Worker::runAll();! H4 H$ B# A" C' y4 [! _6 O9 M
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    2 {$ q) t+ d" ]& m# s* @
  31. 8 h! L. {2 b" j! \, d, i: l
  32. use Workerman\Worker;. w. F+ ?8 V4 H$ [4 Y
  33. require_once './Workerman/Autoloader.php';( S* C1 X6 t  [. S
  34. 2 l( W7 O. r" s% d8 n
  35. $worker = new Worker('text://0.0.0.0:2015');  G# E. ]+ @; ~) I
  36. // 4个进程
    ' {- U8 a; i6 }. C7 B
  37. $worker->count = 4;
    6 Q+ V/ H" S( b+ z' |# L
  38. // 每个进程启动后在当前进程新增一个Worker监听
    ' u( N& D+ e/ T7 d  ^" M
  39. $worker->onWorkerStart = function($worker)
    $ o/ e: a2 Z4 p* D# x2 H. ]/ k# X" r4 h
  40. {' {+ \$ y* c. n) w) y3 p
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');& u# G1 F* D; m4 ?
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
    % f- a, k/ t/ o' Y( \" |
  43.     $inner_worker->reusePort = true;
    ; k# x& X! K  W/ z5 c$ P  \
  44.     $inner_worker->onMessage = 'on_message';
    ' g" R2 u1 n$ ?. g: l6 m( @. g
  45.     // 执行监听。正常监听不会报错' L7 q) A5 ?# g1 W$ G/ o2 i) B
  46.     $inner_worker->listen();
    ) J& n. ?1 L8 R) U! p0 S
  47. };
    # b7 J2 _) ^0 b0 v' \- n

  48. & B& c+ W: h  B: R* |) b) H
  49. $worker->onMessage = 'on_message';3 m: ~  H% Z4 ~+ w

  50. ) J; R" k/ N8 {
  51. function on_message($connection, $data)
    9 m0 s  L9 k2 ~; _2 @/ ~
  52. {' V, y# o0 f/ w  j" n3 v
  53.     $connection->send("hello\n");1 ]  [4 k4 p( b! h$ Q' g0 f0 N
  54. }" Q) F0 o/ f6 s5 j& X2 ~

  55. 4 D. m* F5 U& m
  56. // 运行worker
    / _. x  d) d& B7 n. F
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    2 ^) R! e% q' ]) Y2 q* N  R/ l3 G
  2. use Workerman\Worker;% n; y4 T, s. D: I# G2 _
  3. require_once './Workerman/Autoloader.php';
    8 ]2 E* g+ x0 e$ q
  4. // 初始化一个worker容器,监听1234端口
      s1 ?" y7 f$ F$ Q6 x; N
  5. $worker = new Worker('websocket://0.0.0.0:1234');1 v' X8 d) y2 @' g* Y% L9 [' O
  6. 8 D2 S4 w% I9 z
  7. /*
    4 q; z- b0 g- l# K3 J
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误. v8 t; N  H& d. x; Y' h
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)$ F, W" T5 s$ I+ |( o
  10. */( S# ]* _/ B% B
  11. $worker->count = 1;
      ]: L/ c% d! R# ^) Z
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口6 G9 N. ^+ i- i+ Q* I5 ?9 `) v) q
  13. $worker->onWorkerStart = function($worker)
    / J% W) G7 t6 C2 M3 s- I
  14. {
    / f; d5 p6 P" i! Z! V( a
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 A$ ?& H) n% w2 u0 W. `2 P5 }, t6 P
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    + Z& X$ l$ r2 h* r: m
  17.     $inner_text_worker->onMessage = function($connection, $buffer)" j$ Q! f# [$ q, U
  18.     {
    8 z1 f" U. A1 V; q4 p9 O$ z
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据. v! A- j5 p! j$ X* g  C* b
  20.         $data = json_decode($buffer, true);
    ' @( _' |! k) b2 o
  21.         $uid = $data['uid'];
    ! h  x# f2 ^9 w6 i5 }  m* n
  22.         // 通过workerman,向uid的页面推送数据
    6 M0 m7 g6 |, v3 }% [
  23.         $ret = sendMessageByUid($uid, $buffer);$ X# [$ x7 ]$ o" d8 m
  24.         // 返回推送结果
    0 v1 i8 N; f6 B0 ?
  25.         $connection->send($ret ? 'ok' : 'fail');
    9 r+ V; B7 M9 [
  26.     };
    # |9 m0 N( o  e5 {
  27.     // ## 执行监听 ##
    - T" u6 }; c" \& A( R2 M
  28.     $inner_text_worker->listen();# I: Q% W6 T/ G  \' I3 U
  29. };
    ) F4 V6 z7 P) D( F7 r
  30. // 新增加一个属性,用来保存uid到connection的映射3 d/ J( k7 W* u; W
  31. $worker->uidConnections = array();
    : J! P8 y* E  E; X# i/ O" {8 {! k
  32. // 当有客户端发来消息时执行的回调函数
    $ e2 J% k5 U" f, A- P; ~  r% F
  33. $worker->onMessage = function($connection, $data)0 x9 K0 m) u6 r7 s3 d$ n
  34. {/ A7 d) z- g+ V& k2 ~5 ]
  35.     global $worker;; |6 k6 k4 [- w0 q. k
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    * q  l% d( s( ~7 j0 r, I* L* q
  37.     if(!isset($connection->uid))
    ; x( M4 o: A% v( J3 V6 o
  38.     {
    ( R  L; R. O$ w! F2 e5 n
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)& V3 T  D6 X/ P4 ?3 c
  40.        $connection->uid = $data;/ ^  g' L; \1 w+ L9 I: ~
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,0 i* Z5 }) A( }9 |$ K
  42.         * 实现针对特定uid推送数据
    , {* M9 h" Z' Y/ A1 G8 N( ~3 a
  43.         */
    : ^) [0 _8 \5 K8 r
  44.        $worker->uidConnections[$connection->uid] = $connection;
    ) u1 K4 c6 S7 D. p) i# b7 o
  45.        return;
    $ G5 Z3 n+ W; [" A: _  R
  46.     }
    ; ]: b  f* I7 O0 z# _
  47. };
    0 Y7 C: y8 g4 [! E6 F
  48. ) t1 h/ I7 ^0 }: C1 q  B3 N8 m* |
  49. // 当有客户端连接断开时+ D5 [( A, y# g: i3 T$ U: {  Y
  50. $worker->onClose = function($connection)
    7 b- U! _$ U: A9 m: {- q$ q
  51. {8 y- {; O. O& [) i; [9 L
  52.     global $worker;
    4 H0 V. x' F0 c
  53.     if(isset($connection->uid))4 W+ g2 R& m( V' H5 n) B3 W4 Z
  54.     {
    7 d% [6 I% b3 s" A; g
  55.         // 连接断开时删除映射
    # k; G6 U8 }* K0 N( m: D
  56.         unset($worker->uidConnections[$connection->uid]);  n0 k* t0 Z' L
  57.     }! P6 k& t& a5 Q: p/ p  e, d
  58. };
    - g. F1 E* N% _; L3 i. ~
  59. ! Z+ H% a# v+ {9 u
  60. // 向所有验证的用户推送数据* s+ m/ `9 w) L2 |
  61. function broadcast($message)
      `5 t) T8 c9 P% n5 m' l
  62. {) y" k: g7 j$ o/ Z
  63.    global $worker;, R7 o. s! K9 @
  64.    foreach($worker->uidConnections as $connection)
    ) v- a5 t8 E( x6 Y  W- A0 H8 Y
  65.    {9 i; I4 H  e( K* q( g
  66.         $connection->send($message);
    6 h+ U# h9 U$ R% B- ]8 ^! k
  67.    }  {' ^1 m. D6 \7 w
  68. }
    - r9 X0 P9 M+ C/ M3 x& F
  69. 4 S$ M; L$ d& d& B( {  V: d# b
  70. // 针对uid推送数据* V! ?* ]8 x: h* e- j
  71. function sendMessageByUid($uid, $message)
    + Z& A9 v& q4 ~) }  k  z& ^
  72. {) g# a9 R( _4 L6 Y0 q
  73.     global $worker;: e2 F: N' P) k: s
  74.     if(isset($worker->uidConnections[$uid]))
    + _  Y* j, {! g/ B, z& Y2 i
  75.     {
    . h2 k2 X) E  ^) T$ o
  76.         $connection = $worker->uidConnections[$uid];1 N$ v. R3 E3 o- [+ y
  77.         $connection->send($message);  F0 `2 H" L0 U: p! W
  78.         return true;' X& O! q! ^! ?; p3 Q/ l5 [
  79.     }
    / X/ q# _5 C5 O6 \
  80.     return false;
    5 _- r9 v: U) U/ V- t
  81. }
    $ M0 ?: K9 h) d* b

  82. 4 F$ F, `" d) G2 e% j2 f
  83. // 运行所有的worker
    % j" f5 ^  e2 X: J
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');3 s# u2 M+ ^2 N, z4 V& [0 d
  2. ws.onopen = function(){
    + B9 ~1 o8 x8 ]9 N5 [( M5 ~
  3.     var uid = 'uid1';1 W" A  s3 T1 K" _5 K1 Q+ x
  4.     ws.send(uid);
    0 d+ P+ Z2 g) V8 f# N& D9 S
  5. };
    ( v$ F: ~/ e7 y& {  ?" l
  6. ws.onmessage = function(e){5 |1 }# v/ B+ x6 A1 ^) g* p
  7.     alert(e.data);
    , P+ G. O* A2 N6 G9 @
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口8 D- ?. g7 p3 C9 \2 C) ~* p" [
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    9 V) Y$ B2 M2 x! k; F; W3 B& w
  3. // 推送的数据,包含uid字段,表示是给这个uid推送
    2 T. v' h+ I/ E$ X0 k& A4 u# H' `
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');, g. v( x: i$ J* ]
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% u- b$ e5 T5 Y' L) V( q% H
  6. fwrite($client, json_encode($data)."\n");1 l& v8 X/ o: O) e2 @
  7. // 读取推送结果
      M+ J2 I1 t0 L5 a1 r, x: u
  8. echo fread($client, 8192);
复制代码

7 `, h. w# Z& l7 M; s
% P6 S- n* \8 V
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-16 18:41 , Processed in 0.054067 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!