当前位置: 首页 > article >正文

php:使用Ratchet类实现分布式websocket服务

一、前言

        最近需要做一个有关聊天的小程序,逻辑很简单,所以不打算用Swoole和workerman之类的,最后选择了Ratchet,因为简单易用,适合小型websocket服务。

二、问题

        但是目前我的项目是分布式环境,统一通过Nginx的反向代理分配到多个不同服务器,那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢?这就涉及到了分布式websocket服务,但是我不希望太复杂,所以采用了消息队列的方式实现效果。

三、安装Ratchet类

直接使用composer安装,我用版本是"cboden/ratchet": "^0.2.8",比较老了。

composer require cboden/ratchet

四、创建websocket服务

因为php是同步阻塞型语言,通常每次请求都会从头到尾执行完成,在PHP中,所有的代码都按照顺序执行,直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列,就需要用到ratchet中的事件循环机制,实现异步非阻塞通信效果。

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;

set_time_limit(0);
ini_set('default_socket_timeout', -1);
/**
 * Websocket_Server
 */
class ControllerWebsocket_Server
{
    public function indexAction(){

        try {

            $port = 8083;

            // 创建事件循环(使用该机制实现异步非阻塞通信)
            $loop = LoopFactory::create();

            // 创建 React Socket 服务器
            $socket = new ReactSocket($loop);
            $socket->listen($port, '0.0.0.0'); // 指定监听的端口和地址

            // 启动 WebSocket 服务器
            $server = new IoServer(
                new WsServer(
                    new \ModelWebsocket_Handler($loop)
                ),
                $socket,
                $loop
            );

            // 启动事件循环
            $loop->run();

        } catch (\Exception $e) {
            echo $e->getMessage();
        }
    }
}

其中ModelWebsocket_Handler是封装好的websocket操作类

<?php
//载入Ratchet类库
require_once APP_PATH.'vendor/autoload.php';
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;

/**
 * websocket服务端-相关操作
 */
class ModelWebsocket_Handler implements MessageComponentInterface {

    //数据缓存
    const REDIS_KEY_RESOURCE_DATA_MAP = 'h:websocket:resource:data:map';

    //客户端
    public $clients;

    public function __construct($loop) {
        $this->clients = new \SplObjectStorage();
        $this->subscribeMessage($loop);
    }

    /**
     * 连接建立时的逻辑
     */
    public function onOpen(ConnectionInterface $conn) {
        $this->clients->attach($conn);
        echo "New connection! ({$conn->resourceId})\n";

        //获取连接请求的参数
        $params = [];
        $queryString = $conn->WebSocket->request->getQuery();
        parse_str($queryString, $params);

        //存储资源id相关数据
        $this->setResourceDataMap($conn->resourceId, $params);
    }

    /**
     * 收到消息时的逻辑
     */
    public function onMessage(ConnectionInterface $from, $msg) {
        echo "Received message: {$msg}\n";
        foreach ($this->clients as $client) {
            if ($client === $from) {
                continue;
            }

            //发送消息
            $client->send($msg);
        }
    }

    /**
     * 连接关闭时的逻辑
     */
    public function onClose(ConnectionInterface $conn) {
        $this->delResourceDataMap($conn->resourceId);
        $this->clients->detach($conn);
        echo "Connection {$conn->resourceId} has disconnected\n";
    }

    /**
     * 错误处理逻辑
     */
    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "An error occurred: {$e->getMessage()}\n";
        $this->delResourceDataMap($conn->resourceId);
        $conn->close();
    }

    /**
     * 存储资源id相关数据
     * 
     * @param  string  $resourceId
     * @param  array   $data
     * @return bool
     */
    public function setResourceDataMap($resourceId, $data) {
        $redis = Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);
        $rs = $redis->hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));
        return $rs;
    }

    /**
     * 获取资源id相关数据
     * 
     * @param  string  $resourceId
     * @return array
     */
    public function getResourceDataMap($resourceId) {
        $redis = Comm_Redis::init(true);
        $rs = $redis->hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);
        return json_decode($rs, true) ?: [];
    }

    /**
     * 删除资源id相关数据
     * 
     * @param  string  $resourceId
     * @return bool
     */
    public function delResourceDataMap($resourceId) {
        $redis = Comm_Redis::init(true);
        $rs = $redis->hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);
        return $rs;
    }

    /**
     * 订阅消息
     */
    public function subscribeMessage($loop){
        $loop->addPeriodicTimer(1, function () {
            //在这里可以使用redis订阅消息、也可以使用kafka消费消息,然后再比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用
            foreach ($this->clients as $client) {
                $client->send("测试");
            } 
        });
    }
}

其中:subscribeMessage方法监听消息队列,收到消息之后比对自身是否存在相应用户的连接,如果存在则发送,不存在则过滤,达到分布式webSocket服务的作用。

当然如果你能直接找到用户所连接的服务器,并且可以直接推给相应的服务器,那更好,可以节省流量开销和一些额外的逻辑处理。


http://www.kler.cn/a/407325.html

相关文章:

  • leetcode 50个简单和中等难度的题
  • Maven maven项目构建的生命周期 Maven安装配置 IDEA 配置 Maven
  • xiaolin coding 图解网络笔记——HTTP篇
  • 数据结构-7.Java. 对象的比较
  • 自制游戏:监狱逃亡
  • ElasticSearch学习篇17_《检索技术核心20讲》最邻近检索-局部敏感哈希、乘积量化PQ思路
  • 第三百二十八节 Java网络教程 - Java网络TCP客户端套接字
  • PLC的指令全集1+TIA PORTAL仿真(西门子S7 1200)
  • 浮点数的表示—IEEE754标准
  • c#:winform引入bartender
  • 【大数据技术基础】 课程 第5章 HBase的安装和基础编程 大数据基础编程、实验和案例教程(第2版)
  • Windows之使用putty软件以ssh的方式连接Linux中文显示乱码
  • Django+Nginx+uwsgi网站使用Channels+redis+daphne实现简单的多人在线聊天及消息存储功能
  • 大疆上云api开发
  • /etc/sudoers 文件格式解读
  • VM虚拟机装MAC后无法联网,如何解决?
  • 飞凌嵌入式旗下教育品牌ElfBoard与西安科技大学共建「科教融合基地」
  • android 性能分析工具(03)Android Studio Profiler及常见性能图表解读
  • 绝世唐门:雨浩黑发泪痣形象,王东无新建模,动画漫画对比凸显
  • 百度在下一盘大棋
  • 简述C++STL-队列
  • PHP 二分法查找算法
  • React.memo 的使用
  • [Redis#4] string | 常用命令 | + mysql use:cache | session
  • 摄影:相机控色
  • Linux系统Docker部署开源在线协作笔记Trilium Notes与远程访问详细教程