Dify实现自然语言生成SQL并执行
目录
一、需求分析
二、解决思路
问题1:文字描述生成SQL语句
问题2:执行生成的SQL语句
完整解决方案
三、最终效果展示
四、具体实现
1.Agent提示词
2.知识库数据
3.sql执行器工作流创建
3.1 节点1
3.2 节点2
3.3 节点3
3.4 最终配置界面预览
一、需求分析
-
文字描述生成SQL语句
-
执行生成的SQL语句
二、解决思路
问题1:文字描述生成SQL语句
解决步骤:
-
创建Agent应用 创建一个Agent应用,以便后续集成工具执行SQL语句。
-
导入知识库 导入包含数据库表结构的知识库,以便Agent能够理解用户提供的文字描述。
-
编写提示词 设计有效的提示词,帮助Agent准确地将文字描述转换为SQL语句。
问题2:执行生成的SQL语句
解决步骤:
-
创建工作流应用 创建一个工作流应用,用于定义和执行SQL语句。
-
定义工作流节点
-
节点1:开始节点 定义传入变量sql,该变量将包含由Agent生成的SQL语句。
-
节点2:代码执行节点 使用Python实现数据库连接,并执行上一节点传入的sql变量。
-
节点3:结束节点 定义输出变量result,其值为节点2执行SQL语句的结果。
-
-
发布为工具 将上述工作流应用发布为一个工具,命名为“数据库访问”。
完整解决方案
在创建的Agent应用中,集成“数据库访问”工具,使得Agent不仅能生成SQL语句,还能自动执行这些语句并返回结果。
三、最终效果展示
四、具体实现
1.Agent提示词
# 角色设定
您是一个结构化SQL执行管道,必须严格按以下顺序操作:
1. 解析需求 → 2. 生成参数化SQL → 3. 封装为执行指令 → 4. 返回SQL+原始结果
# 输出规范
```json
{
"sql_package": {
"sql": "SELECT...",
"params": ["value1", "value2"]
},
"execution_result": {
"data": [
{"column1": "value1", "column2": "value2"}
],
"rows_affected": 0,
"error": null
}
}
2.知识库数据
数据库: 用户订单管理系统
表: users
描述: 存储注册的用户信息
字段:
user_id (INT PK): 用户唯一标识
username (VARCHAR(50)): 登录名
email (VARCHAR(100)): 电子邮箱
register_date (DATETIME): 注册时间
vip_status (TINYINT): VIP状态(0-非VIP,1-VIP)
表: products
描述: 商品基本信息
字段:
product_id (INT): 商品ID
product_name(VARCHAR):商品名称
category (ENUM):商品类类别 ['电子产品','家居用品','服装']
price(DECIMAL):商品价格(单位是人民币/元)
表: order
描述: 用户订单表
字段:
order_id (INT PK): 订单ID
user_id (INT FK→users.user_id): 用户ID
product_id (INT FK→products.product_id ): 商品ID
order_date (DATETIME): 下单时间
total_amount (DECIMAL(12,2)): 订单金额
status (VARCHAR(20)): 订单状态(0:已签收 1:已退回)
创建表以及初始化数据
-- 创建用户表
CREATE TABLE users (
user_id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
register_date DATETIME NOT NULL,
vip_status TINYINT NOT NULL DEFAULT 0 COMMENT '0-非VIP,1-VIP',
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_register_date (register_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='存储注册的用户信息';
-- 创建商品表
CREATE TABLE products (
product_id INT AUTO_INCREMENT PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
category ENUM('电子产品','家居用品','服装') NOT NULL,
price DECIMAL(10,2) NOT NULL COMMENT '商品价格(单位是人民币/元)'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品基本信息';
-- 创建订单表
CREATE TABLE `order` (
order_id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
product_id INT NOT NULL,
order_date DATETIME NOT NULL,
total_amount DECIMAL(12,2) NOT NULL COMMENT '订单金额',
status VARCHAR(20) NOT NULL COMMENT '0:已签收 1:已退回',
INDEX idx_user_id (user_id),
INDEX idx_order_date (order_date),
INDEX idx_status (status),
FOREIGN KEY (user_id) REFERENCES users(user_id),
FOREIGN KEY (product_id) REFERENCES products(product_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户订单表';
-- 插入用户数据
INSERT INTO users (username, email, register_date, vip_status) VALUES
('张三', 'zhangsan@example.com', '2022-01-15 10:30:00', 1),
('李四', 'lisi@example.com', '2022-02-20 14:15:00', 0),
('王五', 'wangwu@example.com', '2022-03-10 09:45:00', 1),
('赵六', 'zhaoliu@example.com', '2022-04-05 16:20:00', 0),
('钱七', 'qianqi@example.com', '2022-05-12 11:10:00', 1);
-- 插入商品数据
INSERT INTO products (product_name, category, price) VALUES
('iPhone 14', '电子产品', 6999.00),
('华为MateBook', '电子产品', 5999.00),
('真皮沙发', '家居用品', 4599.00),
('陶瓷餐具套装', '家居用品', 299.00),
('纯棉T恤', '服装', 99.00),
('牛仔裤', '服装', 199.00);
-- 插入订单数据
INSERT INTO `order` (user_id, product_id, order_date, total_amount, status) VALUES
(1, 1, '2022-06-01 10:30:00', 6999.00, '0'),
(1, 3, '2022-06-05 14:15:00', 4599.00, '0'),
(2, 2, '2022-06-10 09:45:00', 5999.00, '1'),
(3, 4, '2022-06-15 16:20:00', 299.00, '0'),
(3, 5, '2022-06-20 11:10:00', 99.00, '0'),
(4, 6, '2022-07-01 13:25:00', 199.00, '0'),
(5, 1, '2022-07-05 15:30:00', 6999.00, '0'),
(5, 2, '2022-07-10 10:45:00', 5999.00, '0');
3.sql执行器工作流创建
3.1 节点1
3.2 节点2
此节点贴入的python代码
注意更改请求url地址
import requests
def main(sql: str) -> dict:
url = "http://xxxx:3000/execute"
payload = {"sql": sql}
try:
response = requests.post(
url,
json=payload,
headers={'Content-Type': 'application/json'},
timeout=10
)
# 先检查响应内容是否为空
if not response.text:
return {"result": "Empty response from server"}
try:
data = response.json()
if response.status_code == 200:
return {"result": data}
else:
return {"result": f"Error: {data.get('error', 'Unknown error')}"}
except ValueError:
return {"result": f"Invalid JSON response: {response.text}"}
except requests.exceptions.RequestException as e:
return {"result": f"Request failed: {str(e)}"}
本地启动的python代码:
from flask import Flask, request, jsonify
import mysql.connector
from functools import wraps
app = Flask(__name__)
# 数据库连接配置
config = {
'user': 'xxx',
'password': 'xxx',
'host': 'xxx',
'database': 'xxx',
'raise_on_warnings': True
}
def handle_errors(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
return jsonify({"error": str(e)}), 500
return wrapper
@app.route('/execute', methods=['POST'])
@handle_errors
def execute_sql():
data = request.get_json()
if not data or 'sql' not in data:
return jsonify({"error": "SQL statement is required"}), 400
sql = data['sql']
try:
conn = mysql.connector.connect(**config)
cursor = conn.cursor(dictionary=True) # 使用字典游标获取列名
cursor.execute(sql)
if sql.strip().lower().startswith("select"):
result = cursor.fetchall()
return jsonify({"data": result})
else:
conn.commit()
return jsonify({"affected_rows": cursor.rowcount})
except mysql.connector.Error as err:
return jsonify({"error": f"Database error: {err}"}), 500
finally:
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3000, debug=True)
注意:
如果将代码贴入到本地后一些数据爆红(需要安装所需环境)
参考执行命令:
pip install flask
python -m pip show flask
pip install mysql-connector-python
3.3 节点3
3.4 最终配置界面预览
至此所有配置结束