【Python实战】-- csv数据汇总
系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、批量解压
- 二、Summary
-
- 1.源码
- 2.优化(时间复杂度)
- 三、库文件
-
- 1.查看已安装的所有库
- 总结
前言
一、批量解压
文件夹格式
解压后
二、Summary
遍历指定路径下所有满足条件的csv文件,并按需求将其指定区域数据汇总到一个sheet;
1.源码
代码如下(示例):
#xlwt只支持xls格式,xlsx格式需要用openpyxl或pandas
# -*- coding: utf-8 -*-
# coding=gbk
import pandas as pd
import os
import xlrd
import xlwt
import csv
from xlutils.copy import copy
from openpyxl import Workbook
from openpyxl import load_workbook
from os.path import dirname
from decimal import Decimal
from openpyxl.utils.dataframe import dataframe_to_rows
# 读写2007 excel
import openpyxl
from openpyxl.styles import numbers
from openpyxl.styles import Alignment
import glob
def some_function():
global global_var
global global_var_r
global_var = col1
global_var_r = row1
def get_allfile_msg(file_dir):
for root, dirs, files in os.walk(file_dir):
return root, dirs, [file for file in files if file.endswith('.xls') or file.endswith('.xlsx') or file.endswith('.csv')]
def get_allfile_url(root, files):
allFile_url = []
for file_name in files:
file_url = root + "/" + file_name
allFile_url.append(file_url)
return allFile_url
def get_file_name(path, suffix = ['.xlsx', '.xls','.csv']): #'.xlsx', '.xls',
tmp_lst = []
for root,dirs,files in os.walk(path):
for file in files:
tmp_lst.append(os.path.join(root, file))
return tmp_lst
def convert_csv_to_xlsx(csv_file_path,xlsx_file_path):
df = pd.read_csv(csv_file_path)
df.to_excel(xlsx_file_path,index=False)
#定义读取csv_pandas
def read_csv_file(file_path):
#参数:error_bad_lines=False跳过错误的行 delimiter=',',encoding = 'gbk',header = 0, engine='python' sep = r"\s+\s{0}" encoding = "iso-8859-1"
return pd.read_csv(file_path,encoding = 'latin1',sep = r"\s+\s{0}",dtype=object,quotechar="'",delimiter=',',doublequote=True,engine="python",header = 1) #第2行作为表头
#定义一个函数来尝试将字符串转换为十进制数字
def try_convert_to_decimal(value):
try:
return Decimal(value)
except (TypeError, DecimalException):
return None
#定义csv求和
def sum_csv_column(filename,column_index):
total = 0
with open(filename, mode='r', newline='') as csvfile: #mode = 'r'
csvreader = csv.DictReader(csvfile, delimiter=",")
headers = next(csvreader) #跳过标题行
for row in csvreader:
if len(row) > column_index : #and row[column_index].isdigit()
total += int(row[column_index])
return total
def calculate_sum_in_csv(file_path,row_start,row_end,col_start,col_end):
df = pd.read_csv(file_path ,encoding='utf-8')
subset = df.iloc[row_start:row_end,col_start:col_end]
return subset.sum().sum()
def get_top_nine_dirs(file_path):
#分割路径并移除文件名
parts = file_path.split(os.path.sep)
#获取前9级目录
#top_nine_dirs = os.path.sep.join(parts[:9])
#获取第9级目录
top_nine_dirs = os.path.sep.join(parts[8])
return top_nine_dirs
def get_nth_directory_name(file_path,n):
#将路径分割成单个目录
directories = file_path.split(os.path.sep)
if len(directories) < n:
return None
return directories[n - 1]
def extract_last_part_of_path(path):
return os.path.basename(path)
def is_number(string):
try:
Decimal(string) #尝试将字符串转换为数字
return True
except DecimalError:
return False
#查找并输出位置
def search_str(filename,search_char):
global global_var #将读取的函数计算结果设置为全局变量,我们可以先定义全局变量,然后在函数内部通过全局变量声明修改
global global_var_r
result = []
character = "1S"
total = 0
try:
with open(filename,'r') as csvfile:
csvreader = csv.reader(csvfile, delimiter=",")
row_index = 1
for row in csvreader:
col_index = 1
for cell in row:
if character == cell:
result.append((row_index,col_index))
print(result)
print(f"Cell data:{
cell},Row index:{
row_index},Column index:{
col_index}")
print('已查到该值',cell)
for row1,col1 in result:
data_row = list(csvreader)
num_rows = len(data_row)
print(f'"{
character}"在第{
row1}行,第{
col1}列被找到')
print(f'总行数:"{
num_rows}"')
global_var = col1
global_var_r = row1 - 2
col_index += 1
row_index += 1
except:
pass
if __name__ == '__main__':
#新建excel,自定义标题
#*********************************************************************
wb = Workbook()
ws = wb.active
ws.title="Summary"
#设置所有单元格的对齐方式为居中
alignment = Alignment(horizontal='center',vertical='center')
titlesS1 = ['data1','data2','data3']
titlesS2 = ['data4','data5','data6']
#第一列波段设置区域
ws.cell(row = 1,column = 1).value = '判定'
ws.cell(row = 1,column = 1).alignment = alignment
ws.cell(row = 5,column = 1).value = '文件名'
ws.cell(row = 5,column = 1).alignment = alignment
ws.cell(row = 6,column = 1).value = 'wave'
ws.cell(row = 6,column = 1).alignment = alignment
for l in range(380,1051):
ws.cell(l-373,1).value = l
ws.cell(l-373,1).alignment = alignment
continue
#*****************************************************************
#读取指定文件夹
#file_dir = os.getcwd()
file_dir = r"D:\Users\gxcaoty\Desktop\vivo客户数据\901NG"
root, dirs, files = get_allfile_msg(file_dir)
allFile_url = get_allfile_url(root, files)
dir_numbers = len(dirs) #file_dir下的文件夹个数
user_input = input("请输入S1或S2\n")
count = 0
files_chose = []
try:
for root,dirs,files in os.walk(file_dir):
for file_path in glob.glob(os.path.join(root,'*.csv')):
if '39781A-901' in file_path and 'Add' not in file_path:
print(file_path)
count += 1
c = count
m = c - 1
#print(m)
files_chose.append(file_path)
#print(files_chose)
for xl in files_chose:
#print(xl) #xl为路径+文件名
last_part = extract_last_part_of_path(xl)
#print(last_part) #filename为文件名
filename = xl
csv_data = read_csv_file(filename)
df = csv_data
if user_input == "S1":
df = df.iloc[:,1:4]
df = df.astype(float)
#print(df)
#反射率标准
#**********************************************************
wave1start = 430
wave1end =