当前位置: 首页 > article >正文

Flask 与 SocketIO 正确初始化及最佳实践调试

1、问题

我使用Flask和Flask-SocketIO 来做 Websocket 链接。前期正常使用,但是后期布置修改什么导致Websocket连接失败。排查需求,才发现初始化不正常导致。

SocketIO 和 Flask 应用的初始化顺序和引用循环的问题

2、环境

python-engineio==4.11.1
python-socketio==5.12.0
Flask-SocketIO==5.3.6

3、正常初始化【单文件】

from flask import Flask, render_template
from flask_socketio import SocketIO

app = Flask(__name__)
socketio = SocketIO(app)  # 初始化 SocketIO

# 默认路由,用于渲染 HTML 页面
@app.route('/')
def index():
    return render_template('index.html')

# 出现消息后,率先执行此处
@socketio.on("message", namespace="/ws")
def socket(message):
    print(f"接收到消息: {message}")
    for i in range(1, 10):
        socketio.sleep(1)
        print(f"发送消息: {i}")
        socketio.emit("response",           # 绑定通信
                      {"data": i},           # 返回socket数据
                      namespace="/ws")


# 当websocket连接成功时,自动触发connect默认方法
@socketio.on("connect", namespace="/ws")
def connect():
    print("链接建立成功..")


# 当websocket连接失败时,自动触发disconnect默认方法
@socketio.on("disconnect", namespace="/ws")
def disconnect():
    print("链接建立失败..")

if __name__ == '__main__':
    socketio.run(app, debug=True, host='0.0.0.0', port=5000,allow_unsafe_werkzeug=True)

4、多文件初始化【两个文件】

主要是 manage.py 和 init.py 两个文件。
在这里插入图片描述
app/init.py

from flask import Flask
from flask_socketio import SocketIO
from flasgger import Swagger
from flask_cors import CORS
from flask_migrate import Migrate
from app.config import config
from app.extension import db

# 创建一个全局的 socketio 对象,但不立即初始化
socketio = SocketIO()

# 其他 swagger 配置保持不变...

def create_app(DevelopmentConfig=None):
    if DevelopmentConfig is None:
        DevelopmentConfig = 'development'

    app = Flask(__name__)
    app.config['SECRET_KEY'] = '123456'
    
    # 加载配置项
    app.config.from_object(config.get(DevelopmentConfig))
    
    from app.api import config_blueprint
    # 注册蓝图
    config_blueprint(app)
    
    # 数据库配置初始化
    config_extensions(app)
    
    # 数据库迁移相关代码...
    from app.api.models.EsModels import sysEsLogs,sysCretitLogs
    from app.api.models.NetModels import sysNetLogs
    from app.api.models.UpsModels import sysUpsLogs
    migrate = Migrate(app, db)
    
    # Swagger初始化
    Swagger(app, config=swagger_config, template=swagger_template)
    
    # CORS配置
    CORS(app, resources={r'/*': {'origins': '*'}}, supports_credentials=True)
    
    # 初始化 SocketIO
    socketio.init_app(app, cors_allowed_origins="*")
    
    return app

socket_events.py

from app import socketio

# Socket.IO 事件处理
@socketio.on("message", namespace="/ws")
def socket(message):
    print(f"接收到消息: {message}")
    for i in range(1, 10):
        socketio.sleep(1)
        print(f"发送消息: {i}")
        socketio.emit("response",
                     {"data": i},
                     namespace="/ws")

@socketio.on("connect", namespace="/ws")
def connect():
    print("链接建立成功..")

@socketio.on("disconnect", namespace="/ws")
def disconnect():
    print("链接建立失败..") 

manage.py

# -*- coding: utf-8 -*-
# @Time    : 2024/5/2 12:01
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : manage.py
# @Software: PyCharm
import atexit
import os
import sys

from apscheduler.schedulers.background import BackgroundScheduler

from app import create_app, socketio
from app.common.util.LogHandler import log
from app.crontab.NetCronTab import NetworkMonitor
from app.crontab.UpsCronTab import UPSMonitor
from app.socket_events import *
# 默认为开发环境,按需求修改 production  development
config_name = 'development'

app = create_app(config_name)
# 解决中文乱码
# app.json.ensure_ascii = False

# from skywalking import agent, config
# config.init(agent_collector_backend_services='192.168.82.105:11800', agent_name='python-flask@devTenant',agent_instance_name="python-flask")
# agent.start()

# 数据库迁移
def start_scheduler():
    # 创建一个后台调度器
    scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
    from app.crontab.EsCronTab import EsIndexCron
    scheduler.add_job(func=EsIndexCron, trigger='interval', minutes=1)
    log.info("EsIndexCron 定时任务启动成功")


    # scheduler.add_job(func=send_alert, trigger="interval", seconds=20)
    # 启动调度器
    scheduler.start()
    atexit.register(lambda: scheduler.shutdown())


if __name__ == '__main__':
    # 获取当前文件的绝对路径
    current_file = os.path.abspath(__file__)
    base_dir = os.path.dirname(current_file)
    # 将项目目录添加到 sys.path
    if base_dir not in sys.path:
        sys.path.append(base_dir)
    if not app.debug or app.testing:
        start_scheduler()
        log.info("定时任务启动成功")
    socketio.run(app, debug=True, use_reloader=True, host='0.0.0.0', port=5001)
    # use_reloader=False, threaded=True,

在这里插入图片描述

在postman输入地址和监听事件

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述


http://www.kler.cn/a/459260.html

相关文章:

  • 【代码】Python|Windows 批量尝试密码去打开加密的 Word 文档(docx和doc)
  • GeoTrust True BusinessID Wildcard
  • 【C++】: std::tie 用法详解
  • vim里搜索关键字
  • 使用连字符容易出错,尽量使用驼峰式的
  • 谷粒商城项目125-spring整合high-level-client
  • 解读目前AI就业岗位——大语言模型(LLM)应用工程师学习路线、就业前景及岗位全解析
  • 电脑缺失libcurl.dll怎么解决?详解电脑libcurl.dll文件丢失问题
  • Rocky9网络基本连接配置
  • SpringBoot开发——整合 Elasticsearch 实现数据高效搜索
  • 【数据结构】线性数据结构——队列
  • 如何用jmeter工具进行性能测试
  • .net core 的网络编程
  • 线性代数概念整理笔记
  • python去水印
  • HAL 库 HAL_UARTEx_ReceiveToIdle_IT 函数解析
  • 《深入挖掘Python加解密:自定义加密算法的设计与实现》
  • 2-200基于Matlab-GUI的SVM和ANN的废弃金属分类、分等级系统
  • 力扣面试题 41 - 魔术索引 C语言解法 二分查找
  • 2024-12-30-g++
  • PawSQL性能巡检平台 (3) - 慢查询采集和优化
  • Python入门系列二-控制结构与函数
  • 在WSL的系统中配置免密和GitHub传输数据(SSH)
  • 自研国产零依赖前端UI框架实战008 用户表单以及随机ID
  • 网络原理(六): UDP 协议
  • nacos-gateway动态路由