当前位置: 首页 > article >正文

Python Flask 数据库开发

Python Flask 数据库开发

  • 引言
  • 环境配置
  • 创建 Flask 应用,连接数据库
  • 定义路由
  • 定义模型
    • 创建表
    • 创建 API
  • 数据库直接操作
  • 启动 Flask 应用
    • app.py 示例
    • 运行 Flask
    • 访问应用
  • 展望

引言

在现代 web 开发中,Python 的 Flask 框架因其轻量和灵活性受到广泛欢迎。结合数据库技术,Flask 可以高效地管理和处理数据,使开发者能够快速构建功能强大的应用程序。无论是选择 MySQL 还是 PostgreSQL,掌握数据库与 Flask 的集成至关重要。本文将探讨如何在 Flask 中设置和使用数据库,涵盖从环境配置到基本操作的各个方面,便于我们去实现数据驱动的应用。

环境配置

在开始之前,需要确保 Python 环境已安装 Flask 模块。可以通过 pip 安装 Flask 和 SQLAlchemy(Flask 的 ORM 工具):

pip install Flask Flask-SQLAlchemy

接下来,根据所选数据库的类型(如 MySQL 或 PostgreSQL),部署相应的数据库服务。
详情可参考:MySQL && PostgreSQL 数据库部署

创建 Flask 应用,连接数据库

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
# 创建了一个 Flask 应用实例。__name__ 是 Python 的一个特殊变量,它指向当前模块的名字。

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://user:password@localhost/dbname'
# mysql://user:password@localhost/dbname 的格式是:
# mysql:数据库类型。
# user:数据库用户名。
# password:数据库密码。
# localhost:数据库服务器地址(这里是本地)。
# dbname:要连接的数据库名称。

db = SQLAlchemy(app)
# 创建了一个 SQLAlchemy 对象,并将 Flask 应用实例传递给它。这样,SQLAlchemy 就可以使用 Flask 应用的配置来管理数据库操作

# @app.route 是 Flask 中的装饰器,用于定义路由。它将一个 URL 路径与一个视图函数关联起来。简单来说,当用户访问指定的 URL 时,Flask 会调用对应的视图函数并返回结果
@app.route('/')
def home():
    return 'Hello, Flask!'

if __name__ == '__main__':
    app.run(debug=True)

定义路由

@app.route 是 Flask 中的装饰器,用于定义路由。它将一个 URL 路径与一个视图函数关联起来。简单来说,当用户访问指定的 URL 时,Flask 会调用对应的视图函数并返回结果。例如:

@app.route('/')
def home():
    return 'Hello, Flask!'

在这个例子中,访问根 URL (/) 时,用户会看到 “Hello, Flask!” 的消息。通过使用不同的路径和方法(如 GET 或 POST),我们可以创建丰富的 Web 应用。

定义模型

使用 SQLAlchemy 的 ORM(对象关系映射)定义数据库模型是与数据库交互的第一步。数据库模型是用于定义和组织数据库中数据结构的抽象框架。它描述了数据的类型、关系、约束以及如何在数据库中存储和检索数据。

一个清晰的数据库模型会大大提高开发效率和数据管理的质量,比如说:

  • 创建表:通过模型定义表的结构(字段类型、约束等)。
  • 执行 CRUD 操作:模型使得创建、读取、更新和删除数据变得简单。
  • 维护数据完整性:模型可以设置约束(如唯一性、外键),确保数据的正确性。
  • 简化查询:通过模型,可以使用 ORM(对象关系映射)库,轻松编写查询。
# 创建一个名为 DiagnosisCategory 的类,继承自 db.Model,这是 SQLAlchemy 中所有模型类的基类
class DiagnosisCategory(db.Model):
	__tablename__ = 'diagnosis_category'  # 设置表名
	
	# 定义一个名为 id 的列,类型为 Integer。primary_key=True 表示这个列是主键,用于唯一标识每个用户。
    id = db.Column(db.Integer, primary_key=True)
    
	# 定义一个名为 name 的列,类型为 String,最大长度为 80。unique=True 表示这个列的值在数据库中必须是唯一的,nullable=False 表示该字段不能为空。
    name = db.Column(db.String(80), unique=True, nullable=False)

    def __repr__(self):
        return f'<User {self.username}>'

创建表

要创建 diagnosis_category这张表,只需在应用上下文中调用 db.create_all(),就会自动生成 User 表。这样,数据库中就会有一个新的 DiagnosisCategory 表 (diagnosis_category) 用于存储用户信息。

class DiagnosisCategory(db.Model):
    __tablename__ = 'diagnosis_category'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80))

    def __repr__(self):
        return f'<User {self.username}>'

# 创建表
with app.app_context():
    db.create_all()

创建 API

使用 Flask-Restless 创建的 API 可以通过 HTTP 请求进行 CRUD 操作。其中,collection_name 定义了访问特定资源的 URL 路径。

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_restless import APIManager

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///yourdatabase.db'
db = SQLAlchemy(app)

# 定义模型
class DiagnosisCategory(db.Model):
    __tablename__ = 'diagnosis_category'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80))

# 创建数据库表
with app.app_context():
    db.create_all()
    
# 创建 API 管理器
manager = APIManager(app, session=db.session)

# 创建 API 端点
manager.create_api(DiagnosisCategory, collection_name='category',
                   methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])

访问这个 API 的路径将是 /api/category,前端代码(ts、js等)可以通过这个路径对后端服务器进行 CRUD 操作。

// 获取所有分类(GET 请求):
GET /api/category

// 创建新分类(POST 请求):
POST /api/category

// 获取特定分类(GET 请求,假设 ID 为 1):
GET /api/category/1

// 更新特定分类(PUT 请求,假设 ID 为 1):
PUT /api/category/1

// 删除特定分类(DELETE 请求,假设 ID 为 1):
DELETE /api/category/1

数据库直接操作

在 Flask 中,也可以使用 SQLAlchemy 提供的 API 直接操作数据库,使用 Python 对象进行 CRUD。

# 创建
new_user = User(username='example')	# 创建新用户实例
db.session.add(new_user) # 将新用户添加到会话
db.session.commit()	# 提交事务以保存更改

# 查询
users = User.query.all() # # 查询所有用户

# 更新
user = User.query.first() # # 查询第一个用户
user.username = 'new_username'
db.session.commit()

# 删除
db.session.delete(user) # 删除用户
db.session.commit()

启动 Flask 应用

app.py 示例

我们以一个实际应用为例:连接远程主机(10.2.0.92)上的 MySQL数据库,创建 t_diag_category、t_diag_model、t_diag_test_suite、t_diag_test_target、t_diag_test_result 五张表,定义相应的 CRUD 接口,实现基于表的SN字段查重、关系表获取等额外操作。

# -*- coding: UTF-8 -*-

import os
import shutil
import sys
import argparse
import json
from flask import Flask, make_response, request, jsonify, send_from_directory
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
from flask_restless import APIManager
from datetime import datetime


CURRENT_PATH = os.path.dirname(os.path.realpath(sys.argv[0]))
DB_SERVER = '10.2.0.92'
DB_PORT = '3306'
DB_USERNAME = 'root'
DB_PASSWORD = '123456'
DB_NAME = 'diagnosisdev'

app = Flask(__name__)
db = None
manager = None

def initialize():
    global app
    global db
    global manager

    # Database
    app.config[
        'SQLALCHEMY_DATABASE_URI'] = f'mysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_SERVER}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    db = SQLAlchemy(app)
    manager = APIManager(app, session=db.session)
    
    class DiagnosisCategory(db.Model):
        __tablename__ = 't_diag_category'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        parent_id = db.Column(db.Integer, nullable=False, default=-1)
        name = db.Column(db.Text)
        create_user = db.Column(db.Integer)
        create_time = db.Column(db.DateTime, default=datetime.utcnow)
        modify_user = db.Column(db.Integer)
        modify_time = db.Column(db.DateTime, default=datetime.utcnow)
        remark = db.Column(db.Text)
        enabled = db.Column(db.Integer, default=1)

    class DiagnosisModel(db.Model):
        __tablename__ = 't_diag_model'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        parent_id = db.Column(db.Integer, nullable=False, default=-1)
        category_id = db.Column(
            db.Integer, db.ForeignKey('t_diag_category.id'))
        category = db.relationship('DiagnosisCategory', backref='t_diag_model')
        name = db.Column(db.Text)
        create_user = db.Column(db.Integer)
        create_time = db.Column(db.DateTime, default=datetime.utcnow)
        modify_user = db.Column(db.Integer)
        modify_time = db.Column(db.DateTime, default=datetime.utcnow)
        remark = db.Column(db.Text)
        enabled = db.Column(db.Integer, default=1)
        image_url = db.Column(db.Text)

    class TestSuite(db.Model):
        '''
        for test:
        curl http://127.0.0.1:50000/api/test_suite -H "Accept: application/vnd.api+json"
        curl http://127.0.0.1:50000/api/test_suite/1 -H "Accept: application/vnd.api+json"
        curl -X POST http://127.0.0.1:50000/api/test_suite -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d '{"data":{"type": "test_suite", "attributes": {"name": "xxx", "data": "123"}}}'
        curl -X PATCH http://127.0.0.1:50000/api/test_suite/1 -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d  '{"data":{"type": "test_suite", "id": "1", "attributes": {"name": "xxx", "data": "123"}}}'
        '''
        __tablename__ = 't_diag_test_suite'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        name = db.Column(db.Text)
        type = db.Column(db.Text)
        model_id = db.Column(db.Integer, db.ForeignKey('t_diag_model.id'))
        model = db.relationship('DiagnosisModel', backref='t_diag_test_suite')
        report_name = db.Column(db.Text)
        data = db.Column(db.Text)
        create_user = db.Column(db.Integer)
        create_time = db.Column(db.DateTime, default=datetime.utcnow)
        modify_user = db.Column(db.Integer)
        modify_time = db.Column(db.DateTime, default=datetime.utcnow)
        remark = db.Column(db.Text)
        enabled = db.Column(db.Integer, default=1)
        sop_url = db.Column(db.Text)
        case_type = db.Column(db.INT)
        # ALTER TABLE t_diag_test_suite ADD COLUMN sop_url TEXT;
        # ALTER TABLE t_diag_test_suite DROP COLUMN sop_url;

    class TestTarget(db.Model):
        __tablename__ = 't_diag_test_target'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        parent_id = db.Column(db.Integer, nullable=False, default=-1)
        model_id = db.Column(db.Integer, db.ForeignKey('t_diag_model.id'))
        model = db.relationship('DiagnosisModel', backref='t_diag_test_target')
        name = db.Column(db.Text)
        sn = db.Column(db.Text)
        is_hardware_changed = db.Column(db.Integer, default=0)
        create_user = db.Column(db.Integer)
        create_time = db.Column(db.DateTime, default=datetime.utcnow)
        modify_user = db.Column(db.Integer)
        modify_time = db.Column(db.DateTime, default=datetime.utcnow)
        remark = db.Column(db.Text)
        enabled = db.Column(db.Integer, default=1)

    class TestResult(db.Model):
        __tablename__ = 't_diag_test_result'
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        target_id = db.Column(
            db.Integer, db.ForeignKey('t_diag_test_target.id'))
        target = db.relationship('TestTarget', backref='t_diag_test_result')
        test_suite_id = db.Column(
            db.Integer, db.ForeignKey('t_diag_test_suite.id'))
        test_suite = db.relationship('TestSuite', backref='t_diag_test_result')
        name = db.Column(db.Text)
        data = db.Column(db.Text)
        create_user = db.Column(db.Integer)
        create_time = db.Column(db.DateTime, default=datetime.now)
        modify_user = db.Column(db.Integer)
        modify_time = db.Column(db.DateTime, default=datetime.now)
        remark = db.Column(db.Text)
        enabled = db.Column(db.Integer, default=1)

    with app.app_context():
        db.create_all()
        manager.create_api(DiagnosisCategory, collection_name='category',
                           methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
        manager.create_api(DiagnosisModel, collection_name='model',
                           methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
        manager.create_api(TestSuite, collection_name='test_suite',
                           methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
        manager.create_api(TestTarget, collection_name='test_target',
                           methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])
        manager.create_api(TestResult, collection_name='test_result',
                           methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH'])


@app.route('/')
def hello():
    return 'Hello, world!'


@app.route('/api/is_scrapped_sn/<sn>', methods=['GET'])
def is_scrapped_sn(sn):
    try:
        result = db.session.execute(
            text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND scrap = 1'),  {'sn': sn})
        return '1' if list(result)[0][0] > 0 else '0'
    except Exception as e:
        print(e)
        return '0'


@app.route('/api/is_duplicate_primary_sn/<sn>', methods=['GET'])
def is_duplicate_primary_sn(sn):
    try:
        result = db.session.execute(
            text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND enabled = 1 AND parent_id = -1'),  {'sn': sn})
        return '1' if list(result)[0][0] > 0 else '0'
    except Exception as e:
        print(e)
        return '0'


@app.route('/api/is_duplicate_relationship_sn/<sn>', methods=['GET'])
def is_duplicate_relationship_sn(sn):
    try:
        result = db.session.execute(
            text('SELECT COUNT(1) FROM t_diag_test_target WHERE sn = :sn AND enabled = 1 AND parent_id != -1'),  {'sn': sn})
        return '1' if list(result)[0][0] > 0 else '0'
    except Exception as e:
        print(e)
        return '0'


@app.route('/api/disable_sn/<sn>', methods=['GET'])
def disable_sn(sn):
    with db.session.begin():
        try:
            db.session.execute(
                text('UPDATE t_diag_test_target SET enabled = 0 WHERE parent_id IN (SELECT id FROM t_diag_test_target WHERE sn = :sn)'),  {'sn': sn})
            db.session.execute(
                text('UPDATE t_diag_test_target SET enabled = 0 WHERE sn = :sn'), {'sn': sn})
            db.session.commit()
            return '0'
        except Exception as e:
            print(e)
            db.session.rollback()
            return '-1'

@app.route('/api/get_relationship_sn/<sn>', methods=['GET'])
def get_relationship_sn(sn):
    try:
        result = db.session.execute(
            text('WITH RECURSIVE cte(id, parent_id, sn, psn) AS (SELECT id, parent_id, sn, psn FROM v_diag_valid_test_target WHERE sn = :sn UNION ALL SELECT t1.id, t1.parent_id, t1.sn, t1.psn FROM v_diag_valid_test_target t1, cte WHERE t1.psn = cte.sn) SELECT * FROM cte'),  {'sn': sn})
        data = [row[2] for row in result.fetchall()]
        print(data)
        return json.dumps(data)
    except Exception as e:
        print(e)
        return jsonify('[]')


@app.route('/api/get_relationship_table/<sn>', methods=['GET'])
def get_relationship_table(sn):
    try:
        result = db.session.execute(
            text("WITH RECURSIVE cte(sn, path) AS (SELECT sn, CAST(sn AS CHAR) FROM v_diag_valid_test_target WHERE sn = :sn UNION ALL SELECT t1.sn, CONCAT(cte.path, '.', t1.sn) FROM v_diag_valid_test_target t1 INNER JOIN cte ON t1.psn = cte.sn) SELECT path FROM cte ORDER BY path"), {'sn': sn})
        output = {}
        for data in result.fetchall():
            keys = data[0].split('.')
            temp = output
            for key in keys:
                temp.setdefault(key, {"info": get_sn_info(key), "subsn": {}})
                temp = temp[key]["subsn"]
        json_output = json.dumps(output)
        return json_output
    except Exception as e:
        print(e)
        return jsonify('[]')


@app.route('/api/sn_scrap', methods=['POST'])
def sn_scrap():
    try:
        sn_lists = request.json
        sql_query = """
            UPDATE t_diag_test_target
            SET enabled = 0 WHERE enabled = 1
            AND parent_id IN (SELECT id FROM (SELECT id FROM t_diag_test_target WHERE sn IN :sn_list) AS T1)
            OR (sn IN :sn_list)
        """
        db.session.execute(text(sql_query), {"sn_list": tuple(sn_lists)})
        sql_query = "UPDATE t_diag_test_target SET scrap = 1 WHERE sn IN :sn_list"
        db.session.execute(text(sql_query),  {"sn_list": tuple(sn_lists)})
        db.session.commit()
        return '0'
    except Exception as e:
        db.session.rollback()
        print(e)
        return '-1'


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('path', nargs='*')
    parser.add_argument('--db_name', type=str, default='diagnosisdev',
                        help="mode: dev or null")
    parser.add_argument('--db_server', type=str, default='10.2.0.92',
                        help="database server ip address")
    parser.add_argument('--db_port', type=str, default='3306',
                        help="database server port")
    parser.add_argument('--db_username', type=str, default='root',
                        help="database server username")
    parser.add_argument('--db_password', type=str, default='123456',
                        help="database server password")
   
    args = parser.parse_args()
    DB_SERVER = args.db_server
    DB_PORT = args.db_port
    DB_USERNAME = args.db_username
    DB_PASSWORD = args.db_password
    DB_NAME = args.db_name
   
    initialize()
    app.run(host='0.0.0.0', port=50000, debug=True)

运行 Flask

在终端中导航到你的 Flask 应用文件所在的目录,然后运行:

python app.py

访问应用

打开浏览器,访问 http://127.0.0.1:5000/,你应该能看到 “Hello, Flask!” 的消息。

使用 curl 工具与运行在本机(127.0.0.1)的50000端口上的API进行交互。

  • 获取测试套件列表

    curl http://127.0.0.1:50000/api/test_suite -H "Accept: application/vnd.api+json"
    

    这个命令用于获取所有测试套件的列表。它向 /api/test_suite 发送一个GET请求,并指定接受的内容类型为 application/vnd.api+json,即JSON API格式。

  • 获取特定ID的测试套件

    curl http://127.0.0.1:50000/api/test_suite/1 -H "Accept: application/vnd.api+json"
    

    这个命令用于获取ID为1的测试套件的详细信息。它向 /api/test_suite/1 发送一个GET请求,同样指定接受的内容类型为 application/vnd.api+json。

  • 创建新的测试套件

    curl -X POST http://127.0.0.1:50000/api/test_suite -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d '{"data":{"type": "test_suite", "attributes": {"name": "xxx", "data": "123"}}}'
    

    这个命令用于创建一个新的测试套件。它向 /api/test_suite 发送一个POST请求,并在请求体中提供要创建的测试套件的数据。这里指定了 Content-Type 为 application/vnd.api+json,表示发送的数据是JSON API格式,同时指定接受的内容类型也是 application/vnd.api+json。请求体中包含了一个 data 对象,其 type 为 test_suite,attributes 中包含了测试套件的名称和数据。

  • 更新特定ID的测试套件

    curl -X PATCH http://127.0.0.1:50000/api/test_suite/1 -H "Content-Type: application/vnd.api+json" -H "Accept: application/vnd.api+json" -d  '{"data":{"type": "test_suite", "id": "1", "attributes": {"name": "xxx", "data": "123"}}}'
    

    这个命令用于更新ID为1的测试套件的信息。它向 /api/test_suite/1 发送一个PATCH请求,并在请求体中提供更新后的测试套件数据。与创建请求类似,这里也指定了 Content-Type 和 Accept 为 application/vnd.api+json。请求体中的 data 对象包含了 type(test_suite)、id(1)以及更新后的 attributes。

这些命令展示了如何使用 curl 工具与遵循JSON API规范的API进行基本的CRUD(创建、读取、更新、删除)操作。注意,实际的API实现可能会对这些请求有不同的要求,比如认证、权限等,这些在示例中没有体现。

展望

通过以上步骤,我们知道了如何在 Flask 应用中集成 MySQL 或 PostgreSQL 数据库。后续,可以深入学习更复杂的查询和数据库迁移等高级主题,以充分利用 Flask 和数据库的强大功能。


http://www.kler.cn/a/371659.html

相关文章:

  • Unity2D初级背包设计后篇 拓展举例与不足分析
  • 【 Verdi实用技巧-Part-3】
  • 如何在 Ubuntu 22.04 上安装 Nagios 服务器教程
  • matlab编写分段Hermite插值多项式
  • Flutter:封装一个自用的bottom_picker选择器
  • Leecode刷题C语言之字符串中最大的3位相同数字
  • Modbus TCP报文协议(ModbusTCP)
  • H5底部输入框点击弹起来的时候被软键盘遮挡bug
  • QT编译报错:-1: error: cannot find -lGL
  • 淘宝商品评价API的获取与应用
  • Prometheus自定义PostgreSQL监控指标
  • 直接删除Github上的文件
  • [flask] flask-mail邮件发送
  • 论区块链技术及应用
  • 网络安全领域推荐职位
  • Data+AI下的数据飞轮:如何重塑企业增长
  • SpringBoot 解析@Value注解型解析注入时机以及原理
  • GPT-4V 是什么?
  • springboot工作原理以及自动装配原理
  • 软考高级架构 - 7.3 - 软件架构风格 - 超详细讲解+精简总结
  • Stable Diffusion 3.5发布:图像生成新纪元,多模态AI的突破!
  • 宽带自动获取ip地址好不好:利与弊的深度剖析
  • 【云原生】云原生后端:监控与观察性
  • STM32 SRAM写入16位数据时死机问题
  • 数据分析案例-苹果品质数据可视化分析+建模预测
  • React核心思维模型(一)