使用Dify访问数据库(mysql)
1、在本地搭建数据库访问的服务,并使用ngrok暴露到公网。
#sql_tools.py
from flask import Flask, request, jsonify
import mysql.connector
# 数据库连接配置
config = {
'user': 'your_username',
'password': 'your_password',
'host': 'localhost',
'database': 'your_database',
'raise_on_warnings': True
}
# 初始化Flask应用
app = Flask(__name__)
# 连接数据库
def connect_to_database():
try:
conn = mysql.connector.connect(**config)
print("Connected to MySQL database")
return conn
except mysql.connector.Error as err:
print(f"Error: {err}")
return None
# 执行SQL查询
def execute_query(conn, sql):
cursor = conn.cursor()
try:
cursor.execute(sql)
if sql.strip().lower().startswith("select"):
# 如果是查询操作,返回结果
result = cursor.fetchall()
return result
else:
# 如果是插入、更新、删除操作,提交事务并返回受影响的行数
conn.commit()
return cursor.rowcount
except mysql.connector.Error as err:
print(f"Error executing SQL: {err}")
return None
finally:
cursor.close()
# HTTP接口:执行SQL
@app.route('/execute', methods=['POST'])
def execute_sql():
# 获取请求中的SQL语句
data = request.json
if not data or 'sql' not in data:
return jsonify({"error": "SQL statement is required"}), 400
sql = data['sql']
conn = connect_to_database()
if not conn:
return jsonify({"error": "Failed to connect to database"}), 500
# 执行SQL
result = execute_query(conn, sql)
conn.close()
if result is None:
return jsonify({"error": "Failed to execute SQL"}), 500
# 返回结果
return jsonify({"result": result})
# 启动Flask应用
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3000)
2、创建知识库,导入表结构描述。
3、创建数据库访问工作流。
代码执行:
import requests
def main(sql: str) -> dict:
# 定义API的URL
url = "https://xxx.ngrok-free.app/execute"
# 构造请求体
payload = {
"sql": sql
}
# 发送POST请求
try:
response = requests.post(url, json=payload)
# 检查响应状态码
if response.status_code == 200:
# 解析响应数据
result = response.json()
return {
"result": f"{result}"
}
else:
return {
"result": f"请求失败,状态码:{response.status_code},{response.json()}"
}
except requests.exceptions.RequestException as e:
return {
"result": f"请求异常:{e}"
}
4、创建数据库智能体