当前位置: 首页 > article >正文

Flask中创建多线程和多进程

创建多进程

import multiprocessing
from applications.create_app import create_app
from applications.models.app.app import AppData
from common.model.base_model import db
from sqlalchemy.orm.attributes import flag_modified


app = create_app()


def worker(record_id, new_id):
    with app.app_context():
        # 读取数据并加锁
        # record = db.session.query(AppData).filter_by(id=record_id).with_for_update().first()
        record = db.session.query(AppData).filter(AppData.id == record_id).first()

        # 更新数据
        if not record.data.get("updated_ids"):
            record.data["updated_ids"] = []
        record.data["updated_ids"].append(new_id)
        flag_modified(record, "data")

        # 提交事务
        db.session.commit()


def test_thread():

    # 创建多个进程,模拟多个worker同时更新同一条数据
    processes = []
    for i in range(1, 9):
        p = multiprocessing.Process(target=worker, args=(1, i))
        processes.append(p)
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    # 检查更新结果
    with app.app_context():
        record = db.session.query(AppData).filter(AppData.id == 1).first()
        print(record.data["updated_ids"])


if __name__ == "__main__":
    test_thread()

创建多线程

def test_():
    test_thread()


def test_thread():
    # 创建两个线程,模拟两个worker同时更新同一条数据
    thread1 = threading.Thread(target=worker, args=(1, 1))
    thread2 = threading.Thread(target=worker, args=(1, 2))
    thread3 = threading.Thread(target=worker, args=(1, 3))
    thread4 = threading.Thread(target=worker, args=(1, 4))
    thread5 = threading.Thread(target=worker, args=(1, 5))
    thread6 = threading.Thread(target=worker, args=(1, 6))
    thread7 = threading.Thread(target=worker, args=(1, 7))
    thread8 = threading.Thread(target=worker, args=(1, 8))

    # 启动线程
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
    thread5.start()
    thread6.start()
    thread7.start()
    thread8.start()

    # 等待线程完成
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    thread5.join()
    thread6.join()
    thread7.join()
    thread8.join()

    # 检查更新结果
    record = AppData.query.filter(AppData.id == 1).first()
    print(record.data["updated_ids"])  # [1, 2]


def worker(record_id, new_id):
    with db.app.app_context():
        # 读取数据并加锁
        # record = db.session.query(AppData).filter_by(id=record_id).first()
        record = db.session.query(AppData).filter_by(id=record_id).with_for_update().first()

        # 更新数据
        if not record.data.get("updated_ids"):
            record.data["updated_ids"] = []
        record.data["updated_ids"].append(new_id)
        flag_modified(record, "data")

        # 提交事务
        db.session.commit()

http://www.kler.cn/a/321786.html

相关文章:

  • C语言编程练习:验证哥德巴赫猜想 进制转换 rand函数
  • 容器技术在持续集成与持续交付中的应用
  • 基于Java Springboot快递物流管理系统
  • 【Rust 编程语言工具】rustup-init.exe 安装与使用指南
  • 【MySQL 保姆级教学】事务的自动提交和手动提交(重点)--上(13)
  • 基于Spring Boot的计算机课程管理:工程认证的实践
  • 黑龙江等保托管:全面解析与实践指南
  • IOS-IPA签名工具 request_post 任意文件读取复现
  • Stable Diffusion 使用详解(13)--- 3D纹理增强
  • C#邮件发送:实现自动化邮件通知完整指南!
  • 【Verilog学习日常】—牛客网刷题—Verilog企业真题—VL62
  • 软考高级:敏捷开发 SCRUM
  • 后端Java-SpringBoot整合MyBatisPlus步骤(超详细)
  • LabVIEW界面输入值设为默认值
  • 基于SSM+小程序的英语学习交流平台管理系统(学习3)(源码+sql脚本+视频导入教程+文档)
  • OpenHarmony(鸿蒙南向开发)——小型系统内核(LiteOS-A)【用户态内存调测】
  • RabbitMQ下载安装运行环境搭建
  • zTasker自动化任务的小工具
  • Sql Server时间转换之查询时间格式不对--CONVERT(NVARCHAR,CreateTime,23) 转换出来有时分秒
  • React Native、Uni-app、Flutter优缺点对比
  • 【个人笔记】线程和线程池的状态以及转换方式
  • html,js,react三种方法编写helloworld理解virtual dom
  • 【重学 MySQL】三十九、Having 的使用
  • 地平线静态目标检测 MapTR 参考算法-V1.0
  • 手写WBXslider 组件 (标签为微博小程序,需要改成对应的标签,或方法)
  • 80%的程序员当不了架构师?那考软考作用在哪?