当前位置: 首页 > article >正文

tronado websocket

server代码:

from flask import Flask
from flask_cors import CORS
from tornado import websocket, ioloop, web
from tornado.web import FallbackHandler
from tornado.wsgi import WSGIContainer

from apps.monitor import init_app
# from apps.utils import NumpyJSONProvider
from apps.visualization.views import bp as visualization_bp

# from apps.visualization.websocket import EchoWebSocket

# Flask.json_provider_class = NumpyJSONProvider

app = Flask(__name__)
CORS(app)
init_app(app)

app.register_blueprint(visualization_bp)


class EchoWebSocket(websocket.WebSocketHandler):
    """tornado websocket class"""

    connections = set()

    def open(self):
        print(f"A client connected.{self}")
        self.connections.add(self)

    def on_message(self, message):
        print(message)
        self.write_message("You said: " + message)

    def on_close(self):
        print(f"A client disconnected.{self}")
        self.connections.remove(self)

     @classmethod
    def broadcast(cls, message):
        """broadcast message"""
        for client in cls.connections:
            client.write_message(message)


if __name__ == "__main__":
    server = web.Application(
        [
            (r"/ws", EchoWebSocket),
            (r".*", FallbackHandler, dict(fallback=WSGIContainer(app))),  # 用tronado代理flask
        ]
    )
    server.listen(65500)
    ioloop.IOLoop.current().start()

client 代码:

import tornado
from tornado.websocket import websocket_connect


class WSClient(object):

    def __init__(self, url):
        self.url = url
        self.ioloop = tornado.ioloop.IOLoop.current()

    def start(self):
        websocket_connect(
            self.url,
            # self.ioloop,
            callback=self.on_connected,
            on_message_callback=self.on_message)
        self.ioloop.start()

        def on_connected(self, f):
        try:
            conn = f.result()
            conn.write_message("hello")
        except Exception as e:
            print(str(e))
            self.ioloop.stop()

    def on_message(self, msg):
        print('receive message: ', msg)
        if msg.endswith("hello"):
            self.ioloop.stop()


if __name__=='__main__':
    wsc = WSClient('ws://127.0.0.1:65500/ws')
    wsc.start()

运行结果:image

用tronado 代理falsk 时与websocket 一起使用:http://123.56.139.157:8082/article/22/2379729/detail.html


http://www.kler.cn/a/311084.html

相关文章:

  • 今日 AI 简报 | 开源 RAG 文本分块库、AI代理自动化软件开发框架、多模态统一生成框架、在线图像背景移除等
  • Postman上传图片如何处理
  • MySQL与Oracle对比及区别
  • 系统架构设计师论文
  • SOLIDWORKS代理商鑫辰信息科技
  • Oracle 11g rac 集群节点的修复过程
  • Java基础:Api 文档注释,字符串种类,String字符串创建,特点及常用方法
  • 【洛谷】P1546 [USACO3.1] 最短网络 Agri-Net 的题解
  • SqlServer自定义类型的使用
  • 【数据结构-一维差分】力扣1893. 检查是否区域内所有整数都被覆盖
  • 无人机滑环的核心特点及其应用分析
  • [论文笔记] LLM端侧小模型篇——1、剪枝量化的latency
  • MySQl篇(基本介绍)(持续更新迭代)
  • Leetcode—删除有序数组的重复项
  • 408算法题leetcode--第七天
  • Llama 3.1 大模型指令微调提升中文能力
  • 【系统架构设计师-2019年真题】案例分析-答案及详解
  • Scikit-learn 学习笔记
  • 尚品汇-秒杀商品存入缓存、Redis发布订阅实现状态位(五十一)
  • 全球首个!复旦大学冯建峰团队开发数字孪生脑平台,具备 860 亿神经元规模
  • 旷视轻量化网络shufflenet算法解读
  • MySQL——数据库的高级操作(二)用户管理(3)删除普通用户
  • 机器学习1--概述
  • Linux创建虚拟磁盘并分区格式化
  • 「Netmarble 小镇」活动来了:踏上穿越标志性世界的旅程!
  • OpenHarmony鸿蒙( Beta5.0)智能门铃开发实践