当前位置: 首页 > article >正文

利用Python和SQLite进行数据处理与优化——从数据库操作到高级数据压缩

引言

在当今的数据驱动时代,有效地管理和处理数据成为了软件开发中的关键挑战之一。本博客将深入探讨如何使用Python结合SQLite数据库来执行一系列数据处理任务,包括数据库表的创建、数据合并以及针对特定需求的数据优化。特别地,我们将展示如何通过算法实现对数据的有效压缩,以满足内存限制或提高数据传输效率的需求。

数据库基础操作

首先,我们需要了解一些基本的SQLite数据库操作。以下代码片段展示了如何连接到SQLite数据库并获取指定表的所有列信息:

def get_columns_info(cursor, table_name):
    cursor.execute(f"PRAGMA table_info({table_name});")
    return {info[1]: info[2] for info in cursor.fetchall()}

这里,我们定义了一个get_columns_info函数,它接收一个游标对象和一个表名作为参数,并返回该表中所有列的信息(如列名和类型)。

接下来,我们创建了两个简单的表格voc_tablevoc_table0,并分别插入了一些示例数据:

def voc_and_chip_len_to_sql():
    conn = sqlite3.connect("voc.db")
    pd.DataFrame({"voc": list(range(0, 1111)), "voc_id": list(range(0, 1111))}).to_sql(con=conn, name="voc_table", if_exists="replace")
    pd.DataFrame({"chip_len_428": list(range(0, 1111)), "chip_len_53": list(range(0, 1111))}).to_sql(con=conn, name="voc_table0", if_exists="replace")
    conn.close()
数据合并

为了将多个表的数据合并成一个新的表,我们可以编写如下函数:

def concat_columns_all_table_to_new_table():
    # 实现细节...

这个过程涉及读取每个表的数据,去除可能存在的索引列,然后横向拼接这些数据框,并最终将其写入新表中。

数据压缩与优化

面对大量数据时,如何有效压缩数据变得尤为重要。下面的函数演示了如何根据给定的条件选择合适的芯片长度,并进一步对数据进行压缩:

def chips_to_new_voc_id(input_array=[1, 2, 3, 5, 6, 8, 9, 10, 13, 14, 17, 12121, 121212121]):
    # 实现细节...

此函数首先标记输入数组中的连续序列,然后根据不同范围内的数值大小,采用不同的策略对其进行分组和编码,最后生成一个紧凑表示形式。

结论

通过上述步骤,我们不仅能够高效地管理SQLite数据库中的数据,还能运用智能算法对数据进行优化和压缩。这对于提升应用程序性能、减少存储空间消耗具有重要意义。希望这篇博客能为你的数据处理项目提供有价值的参考和灵感。记住,随着数据量的增长,合理设计数据结构和采用先进的压缩技术将成为解决大数据问题的关键所在。

import  pandas as pd
import numpy as np
import sqlite3
# 选择chiplen
import polars  as pl



def get_columns_info(cursor, table_name):
    cursor.execute(f"PRAGMA table_info({table_name});")
    return {info[1]: info[2] for info in cursor.fetchall()}

def  voc_and_chip_len_to_sql():
        
    conn=sqlite3.connect("voc.db")
    # cursor=conn.cursor()
    pd.DataFrame({"voc":list(range(0,1111)),"voc_id":list(range(0,1111))}).to_sql(con=conn,name="voc_table",if_exists="replace")
    conn.close()



    conn=sqlite3.connect("voc.db")
    # cursor=conn.cursor()
    pd.DataFrame({"chip_len_428":list(range(0,1111)),"chip_len_53":list(range(0,1111))}).to_sql(con=conn,name="voc_table0",if_exists="replace")
    conn.close()

def concat_columns_all_table_to_new_table():
    # 在这里开始  

    conn=sqlite3.connect("voc.db")
    cursor=conn.cursor()
    tables = ["voc_table", "voc_table0"]

    # 收集所有表的列信息
    all_columns_info = {}
    for table in tables:
        all_columns_info.update(get_columns_info(cursor, table))

        
    print(all_columns_info)

    # 创建 

    cursor.execute("DROP TABLE IF EXISTS merged_voc_table;")
    conn.commit()
    create_statement = "CREATE TABLE IF NOT EXISTS merged_voc_table ({});".format(",".join(["{} {}".format(k,v) for k,v in all_columns_info.items() if k!="index"]))
    print(create_statement)
    cursor.execute(create_statement)
    conn.commit()
    conn.close()

    conn=sqlite3.connect("voc.db")
    chunk_size=100
    for offset in range(0,1111,chunk_size):
        one_list=[]
        for table_name in tables:
            one=pd.read_sql("SELECT * FROM {} LIMIT {} OFFSET {}".format(table_name,chunk_size,offset),con=conn)
            if "index" in one.columns:
                one=one.drop("index",axis=1)
            one_list.append(one)
        two=pd.concat(one_list,axis=1)
        two.to_sql(name="merged_voc_table", con=conn, if_exists='append', index=False)
    cursor=conn.cursor()
    cursor.execute("select * from merged_voc_table limit 100")
    print(cursor.fetchall())
    conn.close()





def  src_token_id_to_new_token_id():

    # 这里开始进行 制作新表了
    conn=sqlite3.connect("voc.db")
    # cursor=conn.cursor()
    # 给定的voc_id列表
    text_token_id = [2, 21]

    # 构造SQL查询 
    query = "SELECT * FROM merged_voc_table WHERE voc_id IN ({})".format(','.join(['?'] * len(text_token_id)))

    # 使用pandas.read_sql执行查询并将结果加载到DataFrame中
    chips = pd.read_sql(query, conn, params=text_token_id)
    # 输出DataFrame
    # print(df)
    conn.close()

    chips=pl.from_pandas(chips)
    chip_lens=['chip_len_428', 'chip_len_53']
    selected_chip_len=""

    for chip_len in chip_lens:
        chips_count=len(chips.unique(chip_len))
        if  chips_count*int(chip_len.split("_")[-1])<8192:
            selected_chip_len=chip_len
            break
    chips = chips[selected_chip_len].to_numpy().tolist()
    chip_len = int(selected_chip_len.split("_")[-1])



    # 这里就得到了新的表
    conn=sqlite3.connect("voc.db")
    # cursor=conn.cursor()
    # 构造SQL查询 
    query = "SELECT * FROM merged_voc_table WHERE {} IN ({})".format(selected_chip_len,','.join(['?'] * len(chips)))
    # 使用pandas.read_sql执行查询并将结果加载到DataFrame中
    new_voc = pd.read_sql(query, conn, params=text_token_id)
    # 输出DataFrame
    # print(df)
    conn.close()


    # 进行关联表
    print(new_voc)
    new_voc["new_voc_id"]=new_voc.index.values
    new_voc = pl.from_pandas(new_voc)
    new_voc = new_voc["voc_id","new_voc_id"]

    text_token_id = pl.DataFrame({"voc_id":text_token_id})
    text_token_id=text_token_id.join(new_voc.filter(new_voc["voc_id"].is_in(text_token_id["voc_id"])),on="voc_id",how="left")
    return text_token_id,chips,chip_len


# chips 表示与压缩  表示
# 现在的到了新的 voc_id  chips 与chips len

# chips 我们要进行进一步的压缩  使用  一个填充值特殊token  分开前面的是 连续值使用首位表示 后面的是非连续值使用  单独表示
# chips 1位表 ,2位表,    3位表  填充特殊token  chips 1位表 ,2位表,    3位表 
# 

def mark_continuous_sequences(arr):
    # Step 1: Sort the array in ascending order
    sorted_arr = sorted(arr)
    
    # Initialize the result list with zeros, same length as sorted_arr
    marks = [0] * len(sorted_arr)
    
    # Initialize the current group marker
    current_group = 0
    
    # Step 2 and 3: Iterate over the sorted array and mark continuous sequences
    for i in range(1, len(sorted_arr)):
        if sorted_arr[i] - sorted_arr[i - 1] == 1:
            # If the difference between consecutive elements is 1, they are continuous
            marks[i] = current_group
        else:
            # If not, increment the group marker and assign it to the current element
            current_group += 1
            marks[i] = current_group
    
    return marks

def chips_to_new_voc_id(input_array = [1, 2, 3, 5,6, 8, 9, 10, 13, 14, 17,12121,121212121]):
    # Example usage  连续性省略 
    
    output_marks = mark_continuous_sequences(input_array)
    output_marks = np.array(output_marks)
    print(output_marks)  # Output should be [0, 0, 0, 1, 2, 2, 2, 3, 3, 4]
    # 分位表示  8192   8192**2 8192**3
    
    input_array=np.array(input_array)
    chips_one=input_array<8192
    chips_two=(8192<=input_array)&(input_array<8192**2)
    chips_thr=(8192**2<=input_array)&(input_array<8192**3)

    chips_one_data=input_array[chips_one]
    chips_one_mark= output_marks[chips_one]

    range_s1_list=[]
    s1_list=[]
    for i in  sorted(set(chips_one_mark)):
        if chips_one_mark.tolist().count(i)>1:
            s1=chips_one_data[chips_one_mark==i]
            range_s1_list.append(s1[0])
            range_s1_list.append(s1[-1])
        else:
            s1=chips_one_data[chips_one_mark==i]
            s1_list+=s1.tolist()



    chips_two_data=input_array[chips_two]
    chips_two_mark= output_marks[chips_two]
    range_s2_list=[]
    s2_list=[]
    if  chips_two_data.size>0:

        for i in  sorted(set(chips_two_mark)):
            if chips_two_mark.tolist().count(i)>1:
                s2=chips_two_data[chips_two_mark==i]
                range_s2_list.append(s2[0]//8192)
                range_s2_list.append(s2[0]%8192)
                range_s2_list.append(s2[-1]//8192)
                range_s2_list.append(s2[-1]%8192)
            else:
                s2=chips_two_data[chips_two_mark==i]
                for k,v in zip(s2//8192,s2%8192):
                    s2_list+=[k,v]



    chips_thr_data=input_array[chips_thr]
    chips_thr_mark= output_marks[chips_thr]
    range_s3_list=[]
    s3_list=[]
    if  chips_thr_data.size>0:

        for i in  sorted(set(chips_thr_mark)):
            if chips_thr_mark.tolist().count(i)>1:
                s3=chips_thr_data[chips_thr_mark==i]
                range_s3_list.append(s3[0]//8192**2)
                range_s3_list.append(s3[0]%8192**2//8192)
                range_s3_list.append(s3[0]%8192**2%8192)
                range_s3_list.append(s3[-1]//8192**2)
                range_s3_list.append(s3[-1]%8192**2//8192)
                range_s3_list.append(s3[-1]%8192**2%8192)
            
            else:
                s3=chips_thr_data[chips_thr_mark==i]
                for k,v in zip(s3//8192**2,s3%8192**2):
                    s3_list+=[k,v//8192,v%8192]

    chips=[]
    if len(range_s1_list)>0:
        chips+=["1位"]
        chips+=range_s1_list
    if len(range_s2_list)>0:
        chips+=["2位"]
        chips+=range_s2_list
    if len(range_s3_list)>0:
        chips+=["3位"]
        chips+=range_s3_list
    chips+=["填充"]
    if len(s1_list)>0:
        chips+=["1位"]
        chips+=s1_list
    if len(s2_list)>0:
        chips+=["2位"]
        chips+=s2_list
    if len(s3_list)>0:
        chips+=["3位"]
        chips+=s3_list
    return chips
print(chips_to_new_voc_id()) 

http://www.kler.cn/a/542900.html

相关文章:

  • git本地建的分支,删除后内容还能找回
  • 笔记4——列表list
  • 【DeepSeek服务器繁忙,请稍后再试...如何解决?】
  • Mysql 函数解析
  • 为什么mysql默认RR(repeat read可重复读)隔离级别
  • 网络工程师 (29)CSMA/CD协议
  • 除了try...catch,还有其他处理异步错误的方法吗?
  • clickhouse replicatedmergetree 恢复
  • 自制游戏——斗罗大陆
  • Unity使用iTextSharp导出PDF-02基础结构及设置中文字体
  • 2月10日QT
  • IGBT的两级关断
  • 线程池-抢票系统性能优化
  • python学opencv|读取图像(五十九)使用cv2.dilate()函数实现图像膨胀处理
  • 【Golang学习之旅】Go + MySQL 数据库操作详解
  • Pandas数据填充(fill)中的那些坑:避免机器学习中的数据泄露
  • react国际化配置react-i18next详解
  • 使用WebUI访问本地Deepseek(Ollama集成Open WebUI)
  • office 365 更新后打不开word问题
  • depcheck检查node.js项目中未使用和缺失依赖的工具
  • 免费在腾讯云Cloud Studio部署DeepSeek-R1大模型
  • encodeURI(),encodeURIComponent()区别
  • AF3 gdt函数解读
  • nginx安装并部署前端项目【包括Linux与Windows系统】
  • 前端性能分析常见内容
  • C语言蓝桥杯1003: [编程入门]密码破译