使用 Excel 实现绩效看板的自动化
引言
在日常工作中,团队的绩效监控和管理是确保项目顺利进行的重要环节。然而,面临着以下问题:
- 数据分散:系统中的数据难以汇总,缺乏一个宏观的团队执行情况视图。
- 看板缺失:系统本身可能无法提供合适的Dashboard,导致数据分析困难。
- 手动操作繁琐:数据采集、汇总和分析过程繁琐且耗时。
本文将介绍如何利用免费的软件和工具(如 Python、MySQL、Excel 等)实现绩效看板的自动化。通过 邮件自动推送和接收 实现数据采集,结合 MySQL 数据库的沉淀和计算,最终在 Excel 中实现数据的定时刷新和展示。
先来看效果, 需要知道整体团队的进度,团队的进度要通过自动化邮件发送接收,并且每半个小时更新数据。
环境要求
为了实现本项目的自动化流程,以下是所需的软件和工具:
-
Python 3.x:
- 用于自动采集、推送、接收数据,并将数据存储到 MySQL 数据库。
- 安装地址:Python 官网(免费开源)
-
MySQL 数据库:
- 用于存储绩效数据,并生成视图供 Excel 使用。
- 安装地址:MySQL 官网(免费开源)
-
Excel:
- 用于制作绩效看板,并通过 ODBC 连接 MySQL 数据库。
- 安装地址:Microsoft Office 官网(免费试用或开源替代方案如 LibreOffice)
-
ODBC 驱动程序:
- 用于连接 MySQL 数据库和 Excel。
- 安装地址:MySQL ODBC Connector(免费开源)
-
邮件客户端(如 Outlook):
- 用于自动发送和接收邮件。
- 安装地址:Microsoft Outlook(免费试用或使用免费开源邮件客户端如 Thunderbird)
-
Python 库:
pandas
:用于数据处理。mysql-connector-python
:用于连接 MySQL 数据库。schedule
:用于定时任务。imaplib
和email
:用于处理邮件接收。win32com
:用于操作 Outlook。- 安装命令:
bash
pip install pandas mysql-connector-python schedule
我们可以利用免费的软件和工具(如 Python、MySQL、Excel 等)实现绩效看板的自动化。
整体流程:
-
数据采集:
- 自动采集:通过 Python 定时任务采集数据。
- 自动推送:通过 Python 调用 Outlook 发送邮件。
- 自动接收:通过 Python 接收邮件并提取数据。
-
数据汇总与存储:
- 使用 Python 将数据存储到 MySQL 数据库。
- 利用 MySQL 的 SQL 功能生成视图。
-
数据计算与整理:
- 在 MySQL 数据库中对数据进行计算和整理。
-
可视化看板制作:
- 在 Excel 中通过 ODBC 连接 MySQL 数据库,制作绩效看板。
-
定时刷新:
- 使用 Excel 的 VBA 宏实现定时刷新数据。
这种方法不仅合法且免费,还能显著提高团队的绩效管理效率。
流程步骤
为了实现 Excel 绩效看板的自动化,整个流程可以分为以下几个步骤:
1. 数据采集
1.1 python定时任务
- 使用 Python 定时任务从系统 API、文件或其他外部数据源中采集绩效数据。
- 数据采集可以通过定时任务(如
schedule
库)自动执行。
示例代码:
代码功能概述
这段 Python 代码的主要功能是通过定时任务调度(使用 schedule
库)来执行一系列的批处理文件(.bat
文件),并对系统窗口进行管理和监控。以下是代码的主要功能模块:
-
窗口管理:
- 检查系统中是否存在特定标题的窗口。
- 关闭特定标题的窗口。
-
定时任务调度:
- 使用
schedule
库设置定时任务,在指定时间运行特定的批处理文件。
- 使用
-
批处理文件执行:
- 使用
subprocess.Popen
执行批处理文件,并在新控制台中运行。
- 使用
-
日期检查:
- 检查当前日期是否符合特定条件,以决定是否运行某些任务。
-
主循环:
- 持续运行调度器,检查是否有任务需要执行。
python schedule.py
import calendar
import time
from datetime import datetime, timedelta
import schedule
import os
import subprocess
import win32gui
import win32con
path_newfssc = r'D:\\FsscProject\\lsh\\newfssc\\'
findtxt = 'C:\Windows\System32\cmd.exe'
findtxt2 = 'C:\WINDOWS\system32\cmd.exe'
def check_window():
hd = win32gui.GetDesktopWindow()
# 获取所有子窗口
hwndChildList = []
# EnumChildWindows 为指定的父窗口枚举子窗口
win32gui.EnumChildWindows(hd, lambda hwnd, param: param.append(hwnd), hwndChildList)
for hwnd in hwndChildList:
title = win32gui.GetWindowText(hwnd)
find = False
# print("句柄:", hwnd, "标题:", win32gui.GetWindowText(hwnd))
if findtxt in title:
find = True
if findtxt2 in title:
find = True
if find == True:
return True
return False
def close_window():
hd = win32gui.GetDesktopWindow()
# 获取所有子窗口
hwndChildList = []
# EnumChildWindows 为指定的父窗口枚举子窗口
win32gui.EnumChildWindows(hd, lambda hwnd, param: param.append(hwnd), hwndChildList)
for hwnd in hwndChildList:
title = win32gui.GetWindowText(hwnd)
find = False
# print("句柄:", hwnd, "标题:", win32gui.GetWindowText(hwnd))
if findtxt in title:
find = True
if findtxt2 in title:
find = True
if 'Chromium' in title:
find = True
if find == True:
try:
print("special句柄:", hwnd, "标题:", win32gui.GetWindowText(hwnd))
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
except:
print("can not erase window")
def TrService():
print_time('dashboard.py is running at ')
p = subprocess.Popen(r'd:\\dashboard.bat',creationflags=subprocess.CREATE_NEW_CONSOLE)
# schedule.every().day.at('09:00').do(TrService)
# schedule.every().day.at('10:00').do(TrService)
# schedule.every().day.at('11:00').do(TrService)
# schedule.every().day.at('12:00').do(TrService)
# schedule.every().day.at('13:00').do(TrService)
# schedule.every().day.at('14:00').do(TrService)
# schedule.every().day.at('15:00').do(TrService)
# schedule.every().day.at('16:00').do(TrService)
# schedule.every().day.at('17:00').do(TrService)
print_time('SCHEDULE is running at ')
while True:
schedule.run_pending()
time.sleep(1)
1.2 dashboard.bat 执行脚本
脚本必须使用另外的 CMD 执行,因为 python schedule.py 会占用一个 cmd, 所以这个 schedule 的设计很重要!每次执行时 ,通过 bat 脚本新增 cmd 视窗执行 另外的python。
python D:\FsscProject\lsh\dashbaord\dashboard.py
1.3 自动推送
- 当bat脚本执行时,使用 Python 通过邮件将绩效数据发送到指定邮箱。
- 邮件内容可以包含附件(如 excel 文件)
示例代码:
python
import datetime
import time
import smtplib
import win32com.client
import schedule
import pymysql
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from prettytable import PrettyTable
import pandas as pd
import os
path = r'D:\\dashboard\\'
time_obj = datetime.datetime.now()
str_time = time_obj.strftime('%Y%m%d %H%M')
excelName_wl = 'wl' + str_time + '.xlsx'
att_file_wl = os.path.join(path, excelName_wl)
excelName_11 = 'nd1100' + str_time + '.xlsx'
att_file_11 = os.path.join(path, excelName_11)
def getdkList():
try:
conn = getECSPrdMysqlConn()
sql = " SELECT * FROM v_db_wl; "
df = pd.read_sql(sql, conn)
df.to_excel(att_file_wl, na_rep='', index=False)
sql11 = " SELECT * FROM v_nd1100_open_list; "
df11 = pd.read_sql(sql11, conn)
df11.to_excel(att_file_11, na_rep='', index=False)
except Exception as e:
print('处理失败:', e)
print(e)
finally:
conn.close()
to_attr = 'fssc-mis.autocn@lsh.com'
subject = 'nd1100监测数据报告'
msg= """尊敬的用户您好:
Dashboard监控数据统计如下,如有异常,详细明细请参考附件。
系统邮件,请勿回复
Best regards
"""
try:
outlook = win32com.client.Dispatch('outlook.application')
for account in outlook.Session.Accounts:
# 赋值发件账户
send_account = account
break
mail = outlook.CreateItem(0)
mail._oleobj_.Invoke(*(64209, 0, 8, 0, send_account))
mail.To = to_attr
mail.Subject = subject
mail.Body = msg
mail.Attachments.Add(att_file_wl)
mail.Attachments.Add(att_file_11)
mail.send
print('发送邮件:' + subject)
# print('收件人:' + item[2] + ' | 被抄送人:' + item[3])
# send_email(to_rep,to_cc, subjectxt, msg, file_excel_name)
print('执行完成')
print("------------------done----------------")
except smtplib.SMTPException as e:
print(e)
print("Error: 无法发送邮件")
def getECSPrdMysqlConn():
"""
获取:RPA数据库连接
:return:
"""
strHost = ''
# strHost = 'xxxx'
strPort = 3306
strUserName = 'xxxx'
strPassWord = 'xxxx'
strdb = 'ecs_fssc'
retry_count = 1000000
init_connect_count = 0
connect_res = True
while connect_res and init_connect_count < retry_count:
try:
conn = pymysql.connect(host=strHost, port=strPort, db=strdb, user=strUserName, password=strPassWord, charset="utf8")
connect_res = False
except pymysql.Error as e:
init_connect_count += 1
print("第:" + str(init_connect_count) + "次数据库连接失败,尝试重连...,错误信息:{0}".format(e))
return conn
getdkList()
代码功能概述
这段 Python 代码的主要功能是从 MySQL 数据库中提取数据,将数据保存为 Excel 文件,并通过 Outlook 发送包含这些 Excel 文件的邮件。以下是代码的主要功能模块:
-
数据库连接与数据提取:
- 使用
pymysql
连接到 MySQL 数据库。 - 执行 SQL 查询,将查询结果保存为 Excel 文件。
- 使用
-
邮件发送:
- 使用
win32com.client
操作 Outlook 应用程序。 - 构造邮件内容,并附加生成的 Excel 文件。
- 使用
-
定时任务(未启用):
- 代码中未启用定时任务,但可以通过
schedule
库实现定时运行。
- 代码中未启用定时任务,但可以通过
-
错误处理:
- 捕获数据库连接和邮件发送中的异常,并打印错误信息。
代码详细解析
1. 导入模块
python
import datetime
import time
import smtplib
import win32com.client
import schedule
import pymysql
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from prettytable import PrettyTable
import pandas as pd
import os
- **
datetime
和time
**:用于处理日期和时间。 - **
smtplib
**:用于发送邮件(未在代码中使用)。 - **
win32com.client
**:用于操作 Windows 系统的 Outlook 应用程序。 - **
schedule
**:用于定时任务调度(未在代码中使用)。 - **
pymysql
**:用于连接 MySQL 数据库。 - **
email.mime
**:用于构造邮件内容(未在代码中使用)。 - **
prettytable
**:用于生成表格(未在代码中使用)。 - **
pandas
**:用于处理和保存数据到 Excel 文件。 - **
os
**:用于文件路径操作。
2. 定义路径和文件名
python
path = r'D:\\dashboard\\'
time_obj = datetime.datetime.now()
str_time = time_obj.strftime('%Y%m%d %H%M')
excelName_wl = 'wl' + str_time + '.xlsx'
att_file_wl = os.path.join(path, excelName_wl)
excelName_11 = 'nd1100' + str_time + '.xlsx'
att_file_11 = os.path.join(path, excelName_11)
- **
path
**:定义 Excel 文件的存储路径。 - **
time_obj
**:获取当前时间。 - **
str_time
**:将当前时间格式化为YYYYMMDD HHMM
的字符串。 - **
excelName_wl
和excelName_11
**:定义两个 Excel 文件的名称,包含时间戳。 - **
att_file_wl
和att_file_11
**:定义两个 Excel 文件的完整路径。
3. 数据库连接函数
python
def getECSPrdMysqlConn():
"""
获取:RPA数据库连接
:return:
"""
strHost = ''
strPort = 3306
strUserName = 'xxxx'
strPassWord = 'xxxx'
strdb = 'ecs_fssc'
retry_count = 1000000
init_connect_count = 0
connect_res = True
while connect_res and init_connect_count < retry_count:
try:
conn = pymysql.connect(host=strHost, port=strPort, db=strdb, user=strUserName, password=strPassWord, charset="utf8")
connect_res = False
except pymysql.Error as e:
init_connect_count += 1
print("第:" + str(init_connect_count) + "次数据库连接失败,尝试重连...,错误信息:{0}".format(e))
return conn
- 功能:连接到 MySQL 数据库。
- 实现:
- 使用
pymysql.connect
方法连接数据库。 - 如果连接失败,会重试最多
retry_count
次。
- 使用
- 返回值:返回数据库连接对象。
4. 数据提取与保存
python
def getdkList():
try:
conn = getECSPrdMysqlConn()
sql = " SELECT * FROM v_db_wl; "
df = pd.read_sql(sql, conn)
df.to_excel(att_file_wl, na_rep='', index=False)
sql11 = " SELECT * FROM v_nd1100_open_list; "
df11 = pd.read_sql(sql11, conn)
df11.to_excel(att_file_11, na_rep='', index=False)
except Exception as e:
print('处理失败:', e)
print(e)
finally:
conn.close()
- 功能:从数据库中提取数据并保存为 Excel 文件。
- 实现:
- 调用
getECSPrdMysqlConn
获取数据库连接。 - 执行 SQL 查询,将结果保存为 Pandas 数据框(
DataFrame
)。 - 使用
to_excel
方法将数据框保存为 Excel 文件。 - 如果发生异常,打印错误信息。
- 无论是否发生异常,最终都会关闭数据库连接。
- 调用
5. 邮件发送
python
to_attr = 'fssc-mis.autocn@lsh.com'
subject = 'nd1100监测数据报告'
msg= """尊敬的用户您好:
Dashboard监控数据统计如下,如有异常,详细明细请参考附件。
系统邮件,请勿回复
Best regards
"""
try:
outlook = win32com.client.Dispatch('outlook.application')
for account in outlook.Session.Accounts:
send_account = account
break
mail = outlook.CreateItem(0)
mail._oleobj_.Invoke(*(64209, 0, 8, 0, send_account))
mail.To = to_attr
mail.Subject = subject
mail.Body = msg
mail.Attachments.Add(att_file_wl)
mail.Attachments.Add(att_file_11)
mail.send
print('发送邮件:' + subject)
except smtplib.SMTPException as e:
print(e)
print("Error: 无法发送邮件")
- 功能:通过 Outlook 发送邮件,并附加生成的 Excel 文件。
- 实现:
- 定义收件人邮箱地址和邮件主题。
- 使用
win32com.client.Dispatch
创建 Outlook 应用程序对象。 - 遍历 Outlook 的账户列表,选择第一个账户作为发件账户。
- 创建邮件对象,设置收件人、主题和正文。
- 使用
Attachments.Add
方法附加 Excel 文件。 - 调用
mail.send
发送邮件。 - 如果发生异常,打印错误信息。
6. 主程序
python
getdkList()
- 功能:调用
getdkList
函数,执行数据提取和邮件发送的完整流程。
1.4 绩效数据提取
在python代码中分别有两段sql 语句,获取绩效看板所需要的
sql = " SELECT * FROM v_db_wl; "
sql11 = " SELECT * FROM v_nd1100_open_list; "
工作量数据
解释如下
CREATE VIEW
v_db_wl
(
审单日,
业务类型,
审单人,
单量
) AS
SELECT
`a`.`审单日` AS `审单日`,
`a`.`业务类型` AS `业务类型`,
`a`.`审单人` AS `审单人`,
COUNT(1) AS `单量`
FROM
`v_dash_wl_nd1100` `a`
WHERE
(
`a`.`审单日` >= (curdate() - interval 31 DAY))
GROUP BY
`a`.`审单日`,
`a`.`业务类型`,
`a`.`审单人`;
说明
这段 SQL 代码的目的是创建一个名为 v_db_wl
的视图(View),用于统计过去 31 天内的审单数据。视图是一个虚拟表,基于 SQL 查询的结果集,用户可以像操作普通表一样操作视图。
以下是代码的详细解析:
1. 创建视图的语法
sql
CREATE VIEW v_db_wl (
审单日,
业务类型,
审单人,
单量
) AS
- **
CREATE VIEW
**:用于创建视图。 - **
v_db_wl
**:视图的名称。 - **
(审单日, 业务类型, 审单人, 单量)
**:定义视图的列名。 - **
AS
**:表示视图的内容由后续的SELECT
查询定义。
2. 查询语句
sql
SELECT
`a`.`审单日` AS `审单日`,
`a`.`业务类型` AS `业务类型`,
`a`.`审单人` AS `审单人`,
COUNT(1) AS `单量`
- **
SELECT
**:从表中选择数据。 - **
a.审单日 AS 审单日
**:从表v_dash_wl_nd1100
中选择列审单日
,并将其命名为视图中的列审单日
。 - **
COUNT(1) AS 单量
**:统计每个分组中的记录数,并将其命名为视图中的列单量
。
3. 数据来源
sql
FROM
`v_dash_wl_nd1100` `a`
- **
v_dash_wl_nd1100
**:数据来源于另一个视图(或表)。 - **
a
**:为表v_dash_wl_nd1100
设置别名,方便后续引用。
4. 数据过滤条件
sql
WHERE
(
`a`.`审单日` >= (curdate() - interval 31 DAY)
)
- **
WHERE
**:用于过滤数据。 - **
a.审单日 >= (curdate() - interval 31 DAY)
**:- **
curdate()
**:返回当前日期(不包含时间部分)。 - **
interval 31 DAY
**:表示 31 天的时间间隔。 - **
curdate() - interval 31 DAY
**:计算当前日期之前的第 31 天。 - **
a.审单日 >= (curdate() - interval 31 DAY)
**:筛选出审单日
在当前日期前 31 天内的记录。
- **
5. 数据分组
sql
GROUP BY
`a`.`审单日`,
`a`.`业务类型`,
`a`.`审单人`;
- **
GROUP BY
**:将数据按指定的列分组。 - **
a.审单日, a.业务类型, a.审单人
**:- 按
审单日
、业务类型
和审单人
进行分组。 - 每个分组对应一行结果,统计每个分组的记录数。
- 按
视图的功能总结
- 视图名称:
v_db_wl
。 - 视图的列:
审单日
:审单日期。业务类型
:业务的类型。审单人
:执行审单的人员。单量
:每个审单日、业务类型和审单人组合的记录数。
- 数据来源:从视图(或表)
v_dash_wl_nd1100
中提取数据。 - 过滤条件:只包含
审单日
在当前日期前 31 天内的记录。 - 分组规则:按
审单日
、业务类型
和审单人
分组,统计每组的记录数。
视图的用途
该视图的主要用途是统计过去 31 天内每天的审单情况,具体包括:
- 每天的审单量。
- 按业务类型和审单人分类的审单量。
通过这个视图,用户可以快速查询过去 31 天内的审单数据,而无需每次都编写复杂的查询语句。
待处理单据
CREATE VIEW
v_nd1100_open_list
(
OUT_DATE,
CREATE_DATE,
BILL_CODE,
表單類型,
流程类型,
业务类型代码,
业务类型,
SHARE_TASK_STATUS
) AS
SELECT
`st`.`OUT_DATE` AS `OUT_DATE`,
`st`.`CREATE_DATE` AS `CREATE_DATE`,
`bm`.`BILL_CODE` AS `BILL_CODE`,
`f_c`(`bm`.`BILL_DEFINE_ID`) AS `表單類型`,
`F_GETN`(`bm`.`F_LCLX`) AS `流程类型`,
concat('YN',`F_GETC`(`bm`.`F_YWLX`)) AS `业务类型代码`,
`F_GETN`(`bm`.`F_YWLX`) AS `业务类型`,
`st`.`SHARE_TASK_STATUS` AS `SHARE_TASK_STATUS`
FROM
(`t_share_runtime_task` `st`
JOIN
`t_bill_main_area` `bm`
ON
((
`st`.`RUN_OBJECT_ID` = `bm`.`BILL_MAIN_ID`)))
WHERE
((
-(-(`bm`.`BILL_CODE`)) = 'PA03250303016194')
AND (
`st`.`SHARE_TASK_TYPE` = 'SHARE_ACCOUNTANT_FIRST_APPROVAL')
AND (
`st`.`SHARE_TASK_STATUS` IN ('DISPATCHING',
'OPEN')));
SQL 代码解析
这段 SQL 代码的目的是创建一个名为 v_nd1100_open_list
的视图(View),用于查询特定条件下的任务数据。视图是一个虚拟表,基于 SQL 查询的结果集,用户可以像操作普通表一样操作视图。
以下是代码的详细解析:
1. 创建视图的语法
sql
CREATE VIEW v_nd1100_open_list (
OUT_DATE,
CREATE_DATE,
BILL_CODE,
表單類型,
流程类型,
业务类型代码,
业务类型,
SHARE_TASK_STATUS
) AS
- **
CREATE VIEW
**:用于创建视图。 - **
v_nd1100_open_list
**:视图的名称。 - **
(OUT_DATE, CREATE_DATE, BILL_CODE, 表單類型, 流程类型, 业务类型代码, 业务类型, SHARE_TASK_STATUS)
**:定义视图的列名。 - **
AS
**:表示视图的内容由后续的SELECT
查询定义。
2. 查询语句
sql
SELECT
`st`.`OUT_DATE` AS `OUT_DATE`,
`st`.`CREATE_DATE` AS `CREATE_DATE`,
`bm`.`BILL_CODE` AS `BILL_CODE`,
`f_c`(`bm`.`BILL_DEFINE_ID`) AS `表單類型`,
`F_GETN`(`bm`.`F_LCLX`) AS `流程类型`,
concat('YN',`F_GETC`(`bm`.`F_YWLX`)) AS `业务类型代码`,
`F_GETN`(`bm`.`F_YWLX`) AS `业务类型`,
`st`.`SHARE_TASK_STATUS` AS `SHARE_TASK_STATUS`
- **
SELECT
**:从表中选择数据。 - 列名映射:
st.OUT_DATE
:从表t_share_runtime_task
中选择列OUT_DATE
,并将其命名为视图中的列OUT_DATE
。st.CREATE_DATE
:从表t_share_runtime_task
中选择列CREATE_DATE
,并将其命名为视图中的列CREATE_DATE
。bm.BILL_CODE
:从表t_bill_main_area
中选择列BILL_CODE
,并将其命名为视图中的列BILL_CODE
。f_c(bm.BILL_DEFINE_ID)
:调用函数f_c
,传入BILL_DEFINE_ID
列的值,结果命名为表單類型
。F_GETN(bm.F_LCLX)
:调用函数F_GETN
,传入F_LCLX
列的值,结果命名为流程类型
。concat('YN', F_GETC(bm.F_YWLX))
:将字符串'YN'
和函数F_GETC
的返回值拼接,结果命名为业务类型代码
。F_GETN(bm.F_YWLX)
:调用函数F_GETN
,传入F_YWLX
列的值,结果命名为业务类型
。st.SHARE_TASK_STATUS
:从表t_share_runtime_task
中选择列SHARE_TASK_STATUS
,并将其命名为视图中的列SHARE_TASK_STATUS
。
3. 数据来源
sql
FROM
(`t_share_runtime_task` `st`
JOIN
`t_bill_main_area` `bm`
ON
(`st`.`RUN_OBJECT_ID` = `bm`.`BILL_MAIN_ID`))
- **
FROM
**:指定数据来源。 - **
t_share_runtime_task
**:主表,别名为st
。 - **
t_bill_main_area
**:从表,别名为bm
。 - **
JOIN
**:表示内连接(INNER JOIN),只返回两个表中匹配的记录。 - **
ON st.RUN_OBJECT_ID = bm.BILL_MAIN_ID
**:连接条件,表示t_share_runtime_task
表的RUN_OBJECT_ID
列与t_bill_main_area
表的BILL_MAIN_ID
列相等。
4. 数据过滤条件
sql
WHERE
((-(-(`bm`.`BILL_CODE`)) = 'PA03250303016194')
AND (`st`.`SHARE_TASK_TYPE` = 'SHARE_ACCOUNTANT_FIRST_APPROVAL')
AND (`st`.`SHARE_TASK_STATUS` IN ('DISPATCHING', 'OPEN')));
- **
WHERE
**:用于过滤数据。 - 过滤条件:
- **
-(-bm.BILL_CODE) = 'PA03250303016194'
**:- 这里的
-(-bm.BILL_CODE)
是多余的,等价于bm.BILL_CODE = 'PA03250303016194'
。 - 过滤出
BILL_CODE
列值为'PA03250303016194'
的记录。
- 这里的
- **
st.SHARE_TASK_TYPE = 'SHARE_ACCOUNTANT_FIRST_APPROVAL'
**:- 过滤出
SHARE_TASK_TYPE
列值为'SHARE_ACCOUNTANT_FIRST_APPROVAL'
的记录。
- 过滤出
- **
st.SHARE_TASK_STATUS IN ('DISPATCHING', 'OPEN')
**:- 过滤出
SHARE_TASK_STATUS
列值为'DISPATCHING'
或'OPEN'
的记录。
- 过滤出
- **
视图的功能总结
- 视图名称:
v_nd1100_open_list
。 - 视图的列:
OUT_DATE
:任务完成日期。CREATE_DATE
:任务创建日期。BILL_CODE
:单据代码。表單類型
:单据类型(通过函数f_c
计算)。流程类型
:流程类型(通过函数F_GETN
计算)。业务类型代码
:业务类型代码(通过函数F_GETC
和字符串拼接计算)。业务类型
:业务类型(通过函数F_GETN
计算)。SHARE_TASK_STATUS
:任务状态。
- 数据来源:
- 主表:
t_share_runtime_task
。 - 从表:
t_bill_main_area
。 - 连接条件:
st.RUN_OBJECT_ID = bm.BILL_MAIN_ID
。
- 主表:
- 过滤条件:
BILL_CODE = 'PA03250303016194'
。SHARE_TASK_TYPE = 'SHARE_ACCOUNTANT_FIRST_APPROVAL'
。SHARE_TASK_STATUS IN ('DISPATCHING', 'OPEN')
。
视图用途
该视图的主要用途是查询特定单据代码(BILL_CODE = 'PA03250303016194'
)下,任务类型为 'SHARE_ACCOUNTANT_FIRST_APPROVAL'
且任务状态为 'DISPATCHING'
或 'OPEN'
的任务数据。
通过这个视图,用户可以快速获取符合条件的任务信息,而无需每次都编写复杂的查询语句。
1.1 -1.4 的过程 是在远端的主机,获取数据推送
1.5 自动接收
- 为了能自动接收,在dashboard,这段也要使用定时任务,与1.1 - 1.2 的方式相同,定时呼叫 Python 从邮箱中接收邮件并提取附件或正文中的数据。
定时任务示例代码,这里就不展开说明了:
import calendar
import time
from datetime import datetime, timedelta
import schedule
import os
import subprocess
import win32gui
import win32con
path_newfssc = r'/lsh/newfssc\\'
findtxt = 'C:\Windows\System32\cmd.exe'
findtxt2 = 'C:\WINDOWS\system32\cmd.exe'
def check_window():
hd = win32gui.GetDesktopWindow()
# 获取所有子窗口
hwndChildList = []
# EnumChildWindows 为指定的父窗口枚举子窗口
win32gui.EnumChildWindows(hd, lambda hwnd, param: param.append(hwnd), hwndChildList)
for hwnd in hwndChildList:
title = win32gui.GetWindowText(hwnd)
find = False
# print("句柄:", hwnd, "标题:", win32gui.GetWindowText(hwnd))
if findtxt in title:
find = True
if findtxt2 in title:
find = True
if find == True:
return True
return False
def close_window():
hd = win32gui.GetDesktopWindow()
# 获取所有子窗口
hwndChildList = []
# EnumChildWindows 为指定的父窗口枚举子窗口
win32gui.EnumChildWindows(hd, lambda hwnd, param: param.append(hwnd), hwndChildList)
for hwnd in hwndChildList:
title = win32gui.GetWindowText(hwnd)
find = False
# print("句柄:", hwnd, "标题:", win32gui.GetWindowText(hwnd))
if findtxt in title:
find = True
if findtxt2 in title:
find = True
if 'Chromium' in title:
find = True
if find == True:
try:
print("special句柄:", hwnd, "标题:", win32gui.GetWindowText(hwnd))
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
except:
print("can not erase window")
def print_time(task):
print(task, datetime.now())
def TrService():
print_time('dashboard.py is running at ')
p = subprocess.Popen(r'd:\\dashboard.bat',creationflags=subprocess.CREATE_NEW_CONSOLE)
def window_close():
print_time('window_close is running at ')
r = check_window()
if r == True:
close_window()
schedule.every().day.at('09:10').do(TrService)
schedule.every().day.at('10:10').do(TrService)
schedule.every().day.at('11:10').do(TrService)
schedule.every().day.at('12:10').do(TrService)
schedule.every().day.at('13:10').do(TrService)
schedule.every().day.at('14:10').do(TrService)
schedule.every().day.at('15:10').do(TrService)
schedule.every().day.at('16:10').do(TrService)
schedule.every().day.at('17:10').do(TrService)
schedule.every().day.at('09:40').do(TrService)
schedule.every().day.at('10:40').do(TrService)
schedule.every().day.at('11:40').do(TrService)
schedule.every().day.at('12:40').do(TrService)
schedule.every().day.at('13:40').do(TrService)
schedule.every().day.at('14:40').do(TrService)
schedule.every().day.at('15:40').do(TrService)
schedule.every().day.at('16:40').do(TrService)
schedule.every().day.at('17:40').do(TrService)
schedule.every().day.at('18:40').do(TrService)
print_time('SCHEDULE is running at ')
while True:
schedule.run_pending()
time.sleep(1)
代码功能概述
这段 Python 代码的主要功能是通过定时任务调度(使用 schedule
库)来执行特定的批处理文件(.bat
文件),并对系统窗口进行管理和监控。以下是代码的主要功能模块:
-
窗口管理:
- 检查系统中是否存在特定标题的窗口。
- 关闭特定标题的窗口。
-
定时任务调度:
- 使用
schedule
库设置定时任务,在指定时间运行特定的批处理文件。
- 使用
-
批处理文件执行:
- 使用
subprocess.Popen
执行批处理文件,并在新控制台中运行。
- 使用
-
主循环:
- 持续运行调度器,检查是否有任务需要执行。
接收邮件的Python
代码功能概述
这段 Python 代码的主要功能是从 Outlook 邮箱中提取邮件及其附件,并根据附件的内容将其存储到 MySQL 数据库中。以下是代码的主要功能模块:
-
日志记录:
- 使用全局变量
logtext
记录程序运行日志。
- 使用全局变量
-
邮件配置:
- 从 Excel 文件中读取邮件配置信息(如邮箱账号、文件夹名称等)。
-
邮件处理:
- 使用
win32com.client
操作 Outlook 邮箱。 - 根据主题、时间等条件过滤邮件。
- 提取邮件的附件并保存到本地文件夹。
- 使用
-
附件处理:
- 根据附件文件名判断其类型(如
wl
或nd1100
)。 - 将附件中的数据导入到 MySQL 数据库的相应表中。
- 根据附件文件名判断其类型(如
-
数据库操作:
- 使用
mysql.connector
连接到 MySQL 数据库。 - 执行 SQL 语句,将数据插入或更新到相应的表中。
- 使用
-
错误处理:
- 捕获并记录程序运行中的异常。
代码详细解析
1. 日志记录
python
logtext = ''
def log(intext):
global logtext
logtext = logtext + '\n' + str(datetime.now())[:19] + ' : ' + intext
- 功能:记录程序运行日志。
- 实现:通过全局变量
logtext
存储日志信息,每次调用log
函数时追加新的日志内容。
2. 邮件配置
python
from_adr = 'fssc-mis.autocn@lsh.com'
fld_itm1 = 'dashboard'
flg_itm1 = 'Y'
# ... 其他配置
- 功能:定义邮件处理的配置参数。
- 问题:部分配置(如文件夹名称、是否启用子文件夹等)被注释掉,改为硬编码值。
3. 邮件处理
python
outlook = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
- 功能:连接 Outlook 应用程序。
- 实现:使用
win32com.client
操作 Outlook 的 MAPI 接口。
3.1 邮箱文件夹选择
python
if flg_itm1 == 'Y':
try:
root_folder = outlook.Folders.Item(from_adr).Folders.Item(fld_itm1)
except Exception as e:
log('1.1填写资料夹名称有误:' + folder_name + str(e))
flag_err = 'Y'
- 功能:选择邮箱的主文件夹。
- 错误处理:如果文件夹不存在,记录错误日志并设置错误标志。
3.2 邮件过滤
python
if flg_tim == 'Y' and datetime.strptime(str(message.ReceivedTime)[:19], "%Y-%m-%d %H:%M:%S") <= dt1 : continue
if flg_sub == 'Y' and not filt_sub in message.Subject : continue
- 功能:根据时间和主题过滤邮件。
- 实现:
- 如果启用了时间过滤,跳过早于指定时间的邮件。
- 如果启用了主题过滤,跳过不包含指定关键字的邮件。
3.3 附件保存
python
for x in range(1, num_attach + 1):
attachment = attachments.Item(x)
file_name = f"{str(message.ReceivedTime)[:10]}_{attachment.FileName}"
full_path = os.path.join(folder_a, file_name)
attachment.SaveASFile(full_path)
attachment_paths.append(full_path)
- 功能:保存邮件附件到本地文件夹。
- 实现:
- 遍历邮件的所有附件。
- 使用邮件的接收时间生成文件名前缀。
- 将附件保存到指定文件夹,并记录完整路径。
4. 数据库操作
python
cursor = conn.cursor()
for excel_path in attachment_paths:
try:
df = pd.read_excel(excel_path, sheet_name='Sheet1')
- 功能:将附件中的数据导入到 MySQL 数据库。
- 实现:
- 使用
pandas.read_excel
读取附件中的 Excel 文件。 - 根据文件名判断数据类型(如
wl
或nd1100
)。 - 执行相应的 SQL 语句,将数据插入到数据库中。
- 使用
4.1 数据表 t_db_wl
的处理
python
if 'wl' in excel_path.lower():
cursor.execute("TRUNCATE TABLE t_db_wl")
insert_sql = """INSERT INTO t_db_wl
(审单日, 业务类型, 审单人, 单量)
VALUES (%s, %s, %s, %s)"""
for _, row in df.iterrows():
cursor.execute(insert_sql, (
row['审单日'].date(),
row['业务类型'],
row['审单人'],
int(row['单量'])
))
- 功能:将
wl
文件中的数据插入到t_db_wl
表中。 - 实现:
- 清空表
t_db_wl
。 - 遍历 Excel 文件的每一行,执行插入操作。
- 清空表
4.2 数据表 t_db_nd1100_open
的处理
python
elif 'nd' in excel_path.lower():
cursor.execute("TRUNCATE TABLE t_db_nd1100_open")
insert_sql = """INSERT INTO t_db_nd1100_open
(OUT_DATE, CREATE_DATE, BILL_CODE, 表單類型,
流程类型, 业务类型代码, 业务类型, SHARE_TASK_STATUS)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""
for _, row in df.iterrows():
cursor.execute(insert_sql, (
row['OUT_DATE'],
row['CREATE_DATE'],
row['BILL_CODE'],
row['表單類型'],
row['流程类型'],
row['业务类型代码'],
row['业务类型'],
row['SHARE_TASK_STATUS']
))
- 功能:将
nd1100
文件中的数据插入到t_db_nd1100_open
表中。 - 实现:
- 清空表
t_db_nd1100_open
。 - 遍历 Excel 文件的每一行,执行插入操作。
- 清空表
5. 错误处理
python
except Exception as e:
log(f'处理附件失败 {excel_path}: {str(e)}')
- 功能:捕获并记录附件处理中的异常。
6. 提交数据库操作
python
conn.commit()
cursor.close()
- 功能:提交数据库事务并关闭数据库连接。
完整代码展示
import os
from datetime import datetime, timedelta
import logging
import numpy as np
import pandas as pd
import win32com.client
import mysql.connector
from mysql.connector import Error
# 配置日志记录
logging.basicConfig(
filename='mail_processing.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 全局变量
attachment_paths = [] # 用于存储附件路径
file_log = r'E:\Project\FSSC\dashboard\mail_log.xlsx' # 邮件日志文件
folder_a = r'E:\Project\FSSC\dashboard' # 附件存储文件夹
# 数据库连接配置
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '123456', # 请修改为实际密码
'database': 'ofssc' # 请修改为实际数据库名
}
def log_info(message):
"""记录信息日志"""
logging.info(message)
def log_error(message):
"""记录错误日志"""
logging.error(message)
def connect_to_db():
"""连接到 MySQL 数据库"""
try:
conn = mysql.connector.connect(**DB_CONFIG)
log_info("成功连接到数据库")
return conn
except Error as e:
log_error(f"数据库连接失败: {e}")
raise
def process_emails(outlook, from_adr, fld_itm1, flg_itm1, fld_itm2, flg_itm2, flag_err):
"""处理邮件"""
root_folder = None
if flg_itm1 == 'Y':
try:
root_folder = outlook.Folders.Item(from_adr).Folders.Item(fld_itm1)
except Exception as e:
log_error(f"1.1 邮箱文件夹名称有误: {e}")
flag_err = 'Y'
if flg_itm2 == 'Y' and not flag_err:
try:
root_folder = root_folder.Folders.Item(fld_itm2)
except Exception as e:
log_error(f"1.2 邮箱子文件夹名称有误: {e}")
flag_err = 'Y'
if flag_err:
return None
messages = root_folder.Items
messages.Sort("[ReceivedTime]", True)
return messages
def save_attachments(message, folder_a, attachment_paths):
"""保存邮件附件"""
num_attach = len(message.Attachments)
attach_file = ''
for x in range(1, num_attach + 1):
attachment = message.Attachments.Item(x)
file_name = f"{message.ReceivedTime.date()}_{attachment.FileName}"
full_path = os.path.join(folder_a, file_name)
attachment.SaveASFile(full_path)
attach_file += f"|{full_path}"
attachment_paths.append(full_path)
log_info(f"附件已保存: {full_path}")
if num_attach > 0:
log_info(f"下载附件: {attach_file}")
def process_attachments(attachment_paths):
"""处理附件并将数据导入数据库"""
conn = connect_to_db()
cursor = conn.cursor()
try:
for excel_path in attachment_paths:
try:
df = pd.read_excel(excel_path, sheet_name='Sheet1')
log_info(f"处理附件: {excel_path}")
if 'wl' in excel_path.lower():
process_wl_file(df, excel_path, cursor)
elif 'nd' in excel_path.lower():
process_nd_file(df, excel_path, cursor)
else:
log_error(f"文件 {excel_path} 不符合任何处理条件")
except Exception as e:
log_error(f"处理附件失败 {excel_path}: {e}")
conn.commit()
except Exception as e:
conn.rollback()
log_error(f"数据库操作失败: {e}")
finally:
cursor.close()
conn.close()
def process_wl_file(df, excel_path, cursor):
"""处理 wl 文件"""
try:
# 清空表
cursor.execute("TRUNCATE TABLE t_db_wl")
# 提取时间戳并插入更新记录
base_name = os.path.splitext(os.path.basename(excel_path))[0]
wl_index = base_name.lower().index('wl')
time_str = base_name[wl_index + 2:wl_index + 15]
cursor.execute("INSERT INTO t_db_updatetime (lastupdate) VALUES (%s)", (time_str,))
# 插入数据
insert_sql = """INSERT INTO t_db_wl
(审单日, 业务类型, 审单人, 单量)
VALUES (%s, %s, %s, %s)"""
for _, row in df.iterrows():
cursor.execute(insert_sql, (
row['审单日'].date() if isinstance(row['审单日'], datetime) else row['审单日'],
row['业务类型'],
row['审单人'],
int(row['单量'])
))
except Exception as e:
log_error(f"处理 wl 文件失败 {excel_path}: {e}")
def process_nd_file(df, excel_path, cursor):
"""处理 nd 文件"""
try:
# 清空表
cursor.execute("TRUNCATE TABLE t_db_nd1100_open")
# 插入数据
insert_sql = """INSERT INTO t_db_nd1100_open
(OUT_DATE, CREATE_DATE, BILL_CODE, 表單類型,
流程类型, 业务类型代码, 业务类型, SHARE_TASK_STATUS)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"""
for _, row in df.iterrows():
cursor.execute(insert_sql, (
row['OUT_DATE'].strftime("%Y-%m-%d %H:%M:%S") if isinstance(row['OUT_DATE'], pd.Timestamp) else row['OUT_DATE'],
row['CREATE_DATE'].strftime("%Y-%m-%d %H:%M:%S") if isinstance(row['CREATE_DATE'], pd.Timestamp) else row['CREATE_DATE'],
row['BILL_CODE'],
row['表單類型'],
row['流程类型'],
row['业务类型代码'],
row['业务类型'],
row['SHARE_TASK_STATUS']
))
except Exception as e:
log_error(f"处理 nd 文件失败 {excel_path}: {e}")
def main():
"""主程序"""
try:
# 连接 Outlook
outlook = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
# 邮件配置
from_adr = 'your email account'
fld_itm1 = '收件箱名称'
flg_itm1 = 'Y'
fld_itm2 = ''
flg_itm2 = ''
flag_err = ''
# 处理邮件
messages = process_emails(outlook, from_adr, fld_itm1, flg_itm1, fld_itm2, flg_itm2, flag_err)
if not messages:
log_error("邮件处理失败,检查配置和文件夹名称")
return
# 遍历邮件并保存附件
for message in messages:
save_attachments(message, folder_a, attachment_paths)
# 处理附件
process_attachments(attachment_paths)
log_info("执行完成")
except Exception as e:
log_error(f"程序运行失败: {e}")
if __name__ == "__main__":
main()
2. 数据汇总与存储
2.1 自动存储
- 使用 Python 的
pandas
库将接收到的数据存储到 MySQL 数据库。
table layout
CREATE TABLE
t_db_wl
(
审单日 DATE,
业务类型 VARCHAR(20) COLLATE utf8mb4_general_ci NOT NULL,
审单人 VARCHAR(20),
单量 DECIMAL(42,0),
id INT(-1) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (id)
)
CREATE TABLE
t_db_nd1100_open
(
OUT_DATE DATETIME,
CREATE_DATE DATETIME,
BILL_CODE VARCHAR(50) COLLATE utf8mb4_general_ci,
表單類型 VARCHAR(50) COLLATE utf8mb4_general_ci,
流程类型 VARCHAR(50) COLLATE utf8mb4_general_ci,
业务类型代码 VARCHAR(50) COLLATE utf8mb4_general_ci,
业务类型 VARCHAR(50) COLLATE utf8mb4_general_ci,
SHARE_TASK_STATUS VARCHAR(50) COLLATE utf8mb4_general_ci,
ID INT(-1) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (ID)
)
python
CREATE TABLE
t_db_updatetime
(
lastupdate VARCHAR(50)
)
3. 数据计算与整理
- 在 MySQL 数据库中对数据进行必要的计算和整理,生成适合展示的报表。
利用数据库 SQL 自动统计产生视图
- 在 MySQL 数据库中创建视图,便于后续计算和展示。
v_db_vl_by_30day
示例 SQL:
v_db_vl_by_30day
的视图(View)。视图是基于一个或多个表的预定义查询,可以像表一样被查询和使用。以下是对该语句的详细说明:
1. 视图名称
sql
CREATE VIEW v_db_vl_by_30day
- v_db_vl_by_30day:这是新建视图的名称。按照命名习惯,前缀
v_
通常表示这是一个“视图”(view),后面的db_vl_by_30day
可能代表“单量按30天统计”(具体含义需结合业务背景)。
2. 视图列定义
sql
(
审单日,
单据量,
折算单量,
标准工时人力
)
- 审单日:显示审核日期。
- 单据量:统计一定时间范围内的单据总数。
- 折算单量:对某些单据量进行换算后的值,保留一位小数。
- 标准工时人力:基于工时量计算的标准工时所需的人力,保留一位小数。
3. 视图查询语句
a. 选择字段并设置别名
sql
SELECT
`a`.`审单日` AS `审单日`,
SUM(`a`.`单据量`) AS `单据量`,
ROUND(SUM(`a`.`折算单量`),1) AS `折算单量`,
ROUND((SUM(`a`.`工时量`) / 420),1) AS `标准工时人力`
- **
a
.审单日
AS审单日
**:选择v_db_wl_time
视图中的审单日
字段,并保持相同的列名。 - SUM(
a
.单据量
) AS单据量
:计算过去30天内每天的单据总量。 - ROUND(SUM(
a
.折算单量
),1) AS折算单量
:计算过去30天内每天折算后的单据总量,并保留一位小数。 - ROUND((SUM(
a
.工时量
) / 420),1) AS标准工时人力
:计算过去30天内每天的总工时量除以420(假设420为标准工时,如每小时的工时量),得到所需的标准工时人力,并保留一位小数。
b. 数据来源
sql
FROM `v_db_wl_time` `a`
- **
v_db_wl_time
**:这是数据来源的视图或表,名称前加了别名a
,便于在查询中引用。 - 注意:视图是基于另一个视图 (
v_db_wl_time
) 创建的,这在实际应用中是可行的,但需要确保底层视图的数据准确性和性能。
c. 过滤条件
sql
WHERE
(
`a`.`审单日` >= (CURDATE() - INTERVAL 30 DAY)
)
- **
CURDATE()
**:返回当前日期(不包含时间部分)。 - **
INTERVAL 30 DAY
**:表示时间间隔为30天。 - 整体含义:筛选出
审单日
在当前日期前30天及以后的记录,即过去30天内的数据。
d. 分组
sql
GROUP BY `a`.`审单日`;
- **
GROUP BY
**:按照审单日
进行分组,以便对每个日期分别计算单据量
、折算单量
和标准工时人力
。
4. 完整的视图创建语句
sql
CREATE VIEW v_db_vl_by_30day (
审单日,
单据量,
折算单量,
标准工时人力
) AS
SELECT
`a`.`审单日` AS `审单日`,
SUM(`a`.`单据量`) AS `单据量`,
ROUND(SUM(`a`.`折算单量`),1) AS `折算单量`,
ROUND((SUM(`a`.`工时量`) / 420),1) AS `标准工时人力`
FROM
`v_db_wl_time` `a`
WHERE
`a`.`审单日` >= CURDATE() - INTERVAL 30 DAY
GROUP BY
`a`.`审单日`;
SQL:v_db_vl_yesterday
CREATE VIEW
v_db_vl_yesterday
(
审单日
) AS
SELECT
MAX(`v_db_wl_time`.`审单日`) AS `审单日`
FROM
`v_db_wl_time`
WHERE
((
`v_db_wl_time`.`审单日` < curdate())
AND (
`v_db_wl_time`.`单据量` > 0));
该视图 v_db_vl_yesterday
的主要功能是获取 v_db_wl_time
中最新的一个日期,该日期满足以下两个条件:
- 早于当前日期:即历史数据,不包括今天的数据。
- 存在单据量:即在该日期至少有一条记录的
单据量
大于零。
通过 MAX(v_db_wl_time.审单日)
,视图返回满足上述条件的最大日期,通常可以理解为“昨天”或最近的一个有活动的日期。
SQL: v_db_wl_time_last_2days
CREATE VIEW
v_db_wl_time_last_2days
(
审单人,
审单日,
单据量,
业务类型名称,
折算单量
) AS
SELECT
`a`.`审单人` AS `审单人`,
`a`.`审单日` AS `审单日`,
`a`.`单据量` AS `单据量`,
`y`.`业务类型名称` AS `业务类型名称`,
`a`.`折算单量` AS `折算单量`
FROM
((`v_db_wl_time` `a`
JOIN
`t_db_ywlx` `y`
ON
((
`y`.`业务类型` = `a`.`业务类型`)))
JOIN
`v_db_vl_yesterday` `d`
ON
((
`a`.`审单日` >= `d`.`审单日`)));
这段 SQL 语句试图创建一个视图 v_db_wl_time_last_2days
,通过连接多个视图和表来获取审核人员、审核日期、单据量、业务类型名称及折算单量等信息。
SQL v_db_td_wl_time
该视图 v_db_td_wl_time
的主要功能是:
- 汇总任务信息:从
t_db_nd1100_open
和t_dash_ppp_checker_std_v
表中提取任务的基本信息和标准时间。 - 过滤特定任务:仅包含
PPP_MONTH
为'202502'
、TIME_BREAK
不为'Y'
且TASK_DEF_ID
为'ND1100'
的任务。 - 计算到期天数:根据当前时间和任务的到期时间,计算剩余的工作天数,考虑了工作时间和非工作时间的影响。
CREATE VIEW
v_db_td_wl_time
(
单据,
表單類型,
流程类型,
业务类型,
任务日期,
到期时间,
到期日,
std_time,
到期天数
) AS
SELECT
`a`.`BILL_CODE` AS `单据`,
`a`.`表單類型` AS `表單類型`,
`a`.`流程类型` AS `流程类型`,
`a`.`业务类型` AS `业务类型`,
`a`.`CREATE_DATE` AS `任务日期`,
`a`.`OUT_DATE` AS `到期时间`,
str_to_date(`a`.`OUT_DATE`,'%Y-%m-%d') AS `到期日`,
`ex`.`STD_TIME` AS `std_time`,
(
CASE
WHEN (CAST(`a`.`OUT_DATE` AS DATE) = curdate())
THEN ROUND((timestampdiff(HOUR,now(),`a`.`OUT_DATE`) / 9),1)
ELSE ((((to_days(`a`.`OUT_DATE`) - to_days(now())) - 1) + ROUND((timestampdiff(MINUTE,
now(),concat(curdate(),' 18:00:00')) / 540),1)) + (
CASE
WHEN (CAST(`a`.`OUT_DATE` AS TIME) BETWEEN '09:00:00' AND '18:00:00')
THEN ROUND((timestampdiff(MINUTE,concat(CAST(`a`.`OUT_DATE` AS DATE),
' 09:00:00'),`a`.`OUT_DATE`) / 540),1)
WHEN (CAST(`a`.`OUT_DATE` AS TIME) < '09:00:00')
THEN 0
ELSE 1
END))
END) AS `到期天数`
FROM
(`t_db_nd1100_open` `a`
JOIN
`t_dash_ppp_checker_std_v` `ex`
ON
((
`a`.`业务类型代码` = `ex`.`BUSINESS_TYPE`)))
WHERE
((
`ex`.`PPP_MONTH` = '202502')
AND (
`ex`.`TIME_BREAK` <> 'Y')
AND (
`ex`.`TASK_DEF_ID` = 'ND1100'));
到期天数计算
- 复杂逻辑:
-
当天到期:
sql
WHEN (CAST(`a`.`OUT_DATE` AS DATE) = CURDATE()) THEN ROUND((TIMESTAMPDIFF(HOUR, NOW(), `a`.`OUT_DATE`) / 9), 1)
- 说明:如果
OUT_DATE
是今天,计算当前时间到到期时间的小时差,除以9(可能是标准工时小时数),并保留一位小数,得到剩余的工作天数。
- 说明:如果
-
非当天到期:
sql
ELSE ( ((((to_days(`a`.`OUT_DATE`) - to_days(now())) - 1) + ROUND((timestampdiff(MINUTE, now(), concat(curdate(),' 18:00:00')) / 540),1)) + ( CASE WHEN (CAST(`a`.`OUT_DATE` AS TIME) BETWEEN '09:00:00' AND '18:00:00') THEN ROUND((timestampdiff(MINUTE, concat(CAST(`a`.`OUT_DATE` AS DATE), ' 09:00:00'), `a`.`OUT_DATE`) / 540),1) WHEN (CAST(`a`.`OUT_DATE` AS TIME) < '09:00:00') THEN 0 ELSE 1 END)) )
- 说明:
- 计算从当前时间到到期日期之间的工作日天数,考虑了工作时间(9:00-18:00)和非工作时间。
- 使用
to_days
计算日期差,并结合分钟差来计算剩余的工作时间。 540
分钟可能代表标准工作小时(9小时 * 60分钟)。
- 说明:
-
4. 可视化看板制作
4.1 配置 ODBC 数据源
- 在 Excel 中配置 ODBC 数据源,连接 MySQL 数据库。
4.2 加载数据
- 在 Excel 中加载 MySQL 数据库中的视图,生成动态数据表。
4.3 制作看板
- 使用 Excel 的图表功能(如柱状图、折线图)和公式(如
SUMIF
、COUNTIF
)展示绩效数据。
5. 定时刷新
5.1 使用 VBA 宏实现定时刷新
- 在 Excel 中编写 VBA 宏,定时刷新数据连接。
示例 VBA 代码:
vba
Sub RefreshQuery()
' 声明工作簿对象变量
Dim wb As Workbook
' 检查名为 "dashboard.xlsm" 的工作簿是否已打开
On Error Resume Next ' 忽略错误,继续执行下一行
Set wb = Workbooks("dashboard.xlsm") ' 尝试设置 wb 为 "dashboard.xlsm"
On Error GoTo 0 ' 恢复正常的错误处理
' 如果工作簿未打开,显示消息框提示用户
If wb Is Nothing Then
MsgBox "工作簿 'dashboard.xlsm' 未打开!", vbExclamation
Else
' 如果工作簿已打开,设置 ws 为该工作簿的活动工作表
Set ws = wb.ActiveSheet
End If
' 设置下一次刷新的时间为当前时间加上 30 分钟
NewTime = Now + TimeValue("00:30:00")
' 获取当前日期的整点时间
today = Round(Now, 0)
' 设置 n1 为当前时间加 1 天,并设置为早上 8:15 的时间
n1 = Round(Now + 1, 0) + TimeValue("08:15:00")
' 如果当前时间小于等于 18:00,则设置刷新时间为 NewTime
If Time <= "18:00:00" Then
timeset = NewTime
Else
' 如果当前时间大于 18:00,则设置刷新时间为第二天的早上 8:15
timeset = n1
End If
' 刷新工作簿中的所有查询
ActiveWorkbook.RefreshAll
' 遍历当前工作簿中的所有查询,并刷新指定的查询
Dim qry As WorkbookQuery
For Each qry In ThisWorkbook.Queries
' 检查查询名称是否在指定的列表中
If qry.Name = "查询3" Or qry.Name = "v_db_wl_time_last_2days" Or qry.Name = "v_db_productivity" Or _
qry.Name = "raw_user" Or qry.Name = "raw_sla" Or qry.Name = "更新时间" Then
qry.Refresh ' 刷新指定的查询
End If
Next qry
' 禁用所有警告消息,防止弹出提示框干扰自动化流程
Application.DisplayAlerts = False
' 暂停执行 10 秒,等待刷新操作完成
Application.Wait Now + TimeValue("00:00:10")
' 刷新多个数据透视表
With Worksheets("pivot单量与人力推移")
.PivotTables("数据透视表13").PivotCache.Refresh
End With
With Worksheets("pivot SLA")
.PivotTables("数据透视表20").PivotCache.Refresh
End With
With Worksheets("pivot人力需求")
.PivotTables("数据透视表16").PivotCache.Refresh
End With
With Worksheets("v_db_wl_time_last_2days")
.PivotTables("数据透视表39").PivotCache.Refresh
End With
' 返回 "dashborad" 工作表并刷新其数据透视表
With Worksheets("dashborad")
.PivotTables("数据透视表26").PivotCache.Refresh
' 选择特定区域并设置背景颜色为特定值(6299648 为某种颜色的代码)
With .Range("AN24:AZ50").Interior
.Pattern = xlSolid
.Color = 6299648
End With
.Range("A1").Select ' 选择 A1 单元格
End With
' 注释掉重复的刷新和格式化代码(建议删除或移除以优化性能)
' (以下部分代码被重复,通常不需要多次执行相同的操作)
' 应用程序将在指定的 timeset 时间再次调用 RefreshQuery 过程
Application.OnTime timeset, "RefreshQuery"
End Sub
详细注释说明
-
工作簿检查:
- 代码首先尝试检查名为
"dashboard.xlsm"
的工作簿是否已经打开。如果未打开,则显示警告消息并停止后续操作。
- 代码首先尝试检查名为
-
设置刷新时间:
- 根据当前时间决定下一次刷新的时间:
- 如果当前时间在 18:00 之前,刷新时间设置为当前时间的 30 分钟后。
- 如果当前时间已超过 18:00,刷新时间设置为第二天的早上 8:15。
- 根据当前时间决定下一次刷新的时间:
-
刷新查询:
- 使用
ActiveWorkbook.RefreshAll
刷新工作簿中的所有查询。 - 遍历所有查询,并仅刷新名称匹配特定列表的查询,以节省时间和资源。
- 使用
-
禁用警告和等待:
Application.DisplayAlerts = False
禁用所有警告消息,防止在刷新过程中弹出提示框中断自动化流程。Application.Wait
暂停代码执行 10 秒,给刷新操作足够的时间完成。
-
刷新数据透视表:
- 对多个指定的工作表中的数据透视表进行缓存刷新,以确保数据是最新的。
- 在
"dashborad"
工作表中,选择特定区域并设置其背景颜色,可能是为了高亮显示某些数据或标记刷新完成。
-
安排下次刷新:
- 使用
Application.OnTime
方法,在之前设定的timeset
时间点再次调用RefreshQuery
过程,实现定时自动刷新功能。
- 使用
5.2 运行 VBA 宏
- 按
Alt + F11
打开 VBA 编辑器,插入模块并运行RefreshQuery
。 就会 每隔30分钟刷新数据了