利用Python和SQLite进行数据处理与优化——从数据库操作到高级数据压缩
引言
在当今的数据驱动时代,有效地管理和处理数据成为了软件开发中的关键挑战之一。本博客将深入探讨如何使用Python结合SQLite数据库来执行一系列数据处理任务,包括数据库表的创建、数据合并以及针对特定需求的数据优化。特别地,我们将展示如何通过算法实现对数据的有效压缩,以满足内存限制或提高数据传输效率的需求。
数据库基础操作
首先,我们需要了解一些基本的SQLite数据库操作。以下代码片段展示了如何连接到SQLite数据库并获取指定表的所有列信息:
def get_columns_info(cursor, table_name):
cursor.execute(f"PRAGMA table_info({table_name});")
return {info[1]: info[2] for info in cursor.fetchall()}
这里,我们定义了一个get_columns_info
函数,它接收一个游标对象和一个表名作为参数,并返回该表中所有列的信息(如列名和类型)。
接下来,我们创建了两个简单的表格voc_table
和voc_table0
,并分别插入了一些示例数据:
def voc_and_chip_len_to_sql():
conn = sqlite3.connect("voc.db")
pd.DataFrame({"voc": list(range(0, 1111)), "voc_id": list(range(0, 1111))}).to_sql(con=conn, name="voc_table", if_exists="replace")
pd.DataFrame({"chip_len_428": list(range(0, 1111)), "chip_len_53": list(range(0, 1111))}).to_sql(con=conn, name="voc_table0", if_exists="replace")
conn.close()
数据合并
为了将多个表的数据合并成一个新的表,我们可以编写如下函数:
def concat_columns_all_table_to_new_table():
# 实现细节...
这个过程涉及读取每个表的数据,去除可能存在的索引列,然后横向拼接这些数据框,并最终将其写入新表中。
数据压缩与优化
面对大量数据时,如何有效压缩数据变得尤为重要。下面的函数演示了如何根据给定的条件选择合适的芯片长度,并进一步对数据进行压缩:
def chips_to_new_voc_id(input_array=[1, 2, 3, 5, 6, 8, 9, 10, 13, 14, 17, 12121, 121212121]):
# 实现细节...
此函数首先标记输入数组中的连续序列,然后根据不同范围内的数值大小,采用不同的策略对其进行分组和编码,最后生成一个紧凑表示形式。
结论
通过上述步骤,我们不仅能够高效地管理SQLite数据库中的数据,还能运用智能算法对数据进行优化和压缩。这对于提升应用程序性能、减少存储空间消耗具有重要意义。希望这篇博客能为你的数据处理项目提供有价值的参考和灵感。记住,随着数据量的增长,合理设计数据结构和采用先进的压缩技术将成为解决大数据问题的关键所在。
import pandas as pd
import numpy as np
import sqlite3
# 选择chiplen
import polars as pl
def get_columns_info(cursor, table_name):
cursor.execute(f"PRAGMA table_info({table_name});")
return {info[1]: info[2] for info in cursor.fetchall()}
def voc_and_chip_len_to_sql():
conn=sqlite3.connect("voc.db")
# cursor=conn.cursor()
pd.DataFrame({"voc":list(range(0,1111)),"voc_id":list(range(0,1111))}).to_sql(con=conn,name="voc_table",if_exists="replace")
conn.close()
conn=sqlite3.connect("voc.db")
# cursor=conn.cursor()
pd.DataFrame({"chip_len_428":list(range(0,1111)),"chip_len_53":list(range(0,1111))}).to_sql(con=conn,name="voc_table0",if_exists="replace")
conn.close()
def concat_columns_all_table_to_new_table():
# 在这里开始
conn=sqlite3.connect("voc.db")
cursor=conn.cursor()
tables = ["voc_table", "voc_table0"]
# 收集所有表的列信息
all_columns_info = {}
for table in tables:
all_columns_info.update(get_columns_info(cursor, table))
print(all_columns_info)
# 创建
cursor.execute("DROP TABLE IF EXISTS merged_voc_table;")
conn.commit()
create_statement = "CREATE TABLE IF NOT EXISTS merged_voc_table ({});".format(",".join(["{} {}".format(k,v) for k,v in all_columns_info.items() if k!="index"]))
print(create_statement)
cursor.execute(create_statement)
conn.commit()
conn.close()
conn=sqlite3.connect("voc.db")
chunk_size=100
for offset in range(0,1111,chunk_size):
one_list=[]
for table_name in tables:
one=pd.read_sql("SELECT * FROM {} LIMIT {} OFFSET {}".format(table_name,chunk_size,offset),con=conn)
if "index" in one.columns:
one=one.drop("index",axis=1)
one_list.append(one)
two=pd.concat(one_list,axis=1)
two.to_sql(name="merged_voc_table", con=conn, if_exists='append', index=False)
cursor=conn.cursor()
cursor.execute("select * from merged_voc_table limit 100")
print(cursor.fetchall())
conn.close()
def src_token_id_to_new_token_id():
# 这里开始进行 制作新表了
conn=sqlite3.connect("voc.db")
# cursor=conn.cursor()
# 给定的voc_id列表
text_token_id = [2, 21]
# 构造SQL查询
query = "SELECT * FROM merged_voc_table WHERE voc_id IN ({})".format(','.join(['?'] * len(text_token_id)))
# 使用pandas.read_sql执行查询并将结果加载到DataFrame中
chips = pd.read_sql(query, conn, params=text_token_id)
# 输出DataFrame
# print(df)
conn.close()
chips=pl.from_pandas(chips)
chip_lens=['chip_len_428', 'chip_len_53']
selected_chip_len=""
for chip_len in chip_lens:
chips_count=len(chips.unique(chip_len))
if chips_count*int(chip_len.split("_")[-1])<8192:
selected_chip_len=chip_len
break
chips = chips[selected_chip_len].to_numpy().tolist()
chip_len = int(selected_chip_len.split("_")[-1])
# 这里就得到了新的表
conn=sqlite3.connect("voc.db")
# cursor=conn.cursor()
# 构造SQL查询
query = "SELECT * FROM merged_voc_table WHERE {} IN ({})".format(selected_chip_len,','.join(['?'] * len(chips)))
# 使用pandas.read_sql执行查询并将结果加载到DataFrame中
new_voc = pd.read_sql(query, conn, params=text_token_id)
# 输出DataFrame
# print(df)
conn.close()
# 进行关联表
print(new_voc)
new_voc["new_voc_id"]=new_voc.index.values
new_voc = pl.from_pandas(new_voc)
new_voc = new_voc["voc_id","new_voc_id"]
text_token_id = pl.DataFrame({"voc_id":text_token_id})
text_token_id=text_token_id.join(new_voc.filter(new_voc["voc_id"].is_in(text_token_id["voc_id"])),on="voc_id",how="left")
return text_token_id,chips,chip_len
# chips 表示与压缩 表示
# 现在的到了新的 voc_id chips 与chips len
# chips 我们要进行进一步的压缩 使用 一个填充值特殊token 分开前面的是 连续值使用首位表示 后面的是非连续值使用 单独表示
# chips 1位表 ,2位表, 3位表 填充特殊token chips 1位表 ,2位表, 3位表
#
def mark_continuous_sequences(arr):
# Step 1: Sort the array in ascending order
sorted_arr = sorted(arr)
# Initialize the result list with zeros, same length as sorted_arr
marks = [0] * len(sorted_arr)
# Initialize the current group marker
current_group = 0
# Step 2 and 3: Iterate over the sorted array and mark continuous sequences
for i in range(1, len(sorted_arr)):
if sorted_arr[i] - sorted_arr[i - 1] == 1:
# If the difference between consecutive elements is 1, they are continuous
marks[i] = current_group
else:
# If not, increment the group marker and assign it to the current element
current_group += 1
marks[i] = current_group
return marks
def chips_to_new_voc_id(input_array = [1, 2, 3, 5,6, 8, 9, 10, 13, 14, 17,12121,121212121]):
# Example usage 连续性省略
output_marks = mark_continuous_sequences(input_array)
output_marks = np.array(output_marks)
print(output_marks) # Output should be [0, 0, 0, 1, 2, 2, 2, 3, 3, 4]
# 分位表示 8192 8192**2 8192**3
input_array=np.array(input_array)
chips_one=input_array<8192
chips_two=(8192<=input_array)&(input_array<8192**2)
chips_thr=(8192**2<=input_array)&(input_array<8192**3)
chips_one_data=input_array[chips_one]
chips_one_mark= output_marks[chips_one]
range_s1_list=[]
s1_list=[]
for i in sorted(set(chips_one_mark)):
if chips_one_mark.tolist().count(i)>1:
s1=chips_one_data[chips_one_mark==i]
range_s1_list.append(s1[0])
range_s1_list.append(s1[-1])
else:
s1=chips_one_data[chips_one_mark==i]
s1_list+=s1.tolist()
chips_two_data=input_array[chips_two]
chips_two_mark= output_marks[chips_two]
range_s2_list=[]
s2_list=[]
if chips_two_data.size>0:
for i in sorted(set(chips_two_mark)):
if chips_two_mark.tolist().count(i)>1:
s2=chips_two_data[chips_two_mark==i]
range_s2_list.append(s2[0]//8192)
range_s2_list.append(s2[0]%8192)
range_s2_list.append(s2[-1]//8192)
range_s2_list.append(s2[-1]%8192)
else:
s2=chips_two_data[chips_two_mark==i]
for k,v in zip(s2//8192,s2%8192):
s2_list+=[k,v]
chips_thr_data=input_array[chips_thr]
chips_thr_mark= output_marks[chips_thr]
range_s3_list=[]
s3_list=[]
if chips_thr_data.size>0:
for i in sorted(set(chips_thr_mark)):
if chips_thr_mark.tolist().count(i)>1:
s3=chips_thr_data[chips_thr_mark==i]
range_s3_list.append(s3[0]//8192**2)
range_s3_list.append(s3[0]%8192**2//8192)
range_s3_list.append(s3[0]%8192**2%8192)
range_s3_list.append(s3[-1]//8192**2)
range_s3_list.append(s3[-1]%8192**2//8192)
range_s3_list.append(s3[-1]%8192**2%8192)
else:
s3=chips_thr_data[chips_thr_mark==i]
for k,v in zip(s3//8192**2,s3%8192**2):
s3_list+=[k,v//8192,v%8192]
chips=[]
if len(range_s1_list)>0:
chips+=["1位"]
chips+=range_s1_list
if len(range_s2_list)>0:
chips+=["2位"]
chips+=range_s2_list
if len(range_s3_list)>0:
chips+=["3位"]
chips+=range_s3_list
chips+=["填充"]
if len(s1_list)>0:
chips+=["1位"]
chips+=s1_list
if len(s2_list)>0:
chips+=["2位"]
chips+=s2_list
if len(s3_list)>0:
chips+=["3位"]
chips+=s3_list
return chips
print(chips_to_new_voc_id())