当前位置: 首页 > article >正文

使用 SQL 和表格数据进行问答和 RAG(7)—将表格数据(CSV 或 Excel 文件)加载到向量数据库(ChromaDB)中

将表格数据(CSV 或 Excel 文件)加载到向量数据库(ChromaDB)中。这里定义的类 PrepareVectorDBFromTabularData,它的主要功能是读取表格数据文件到DataFrame中、生成嵌入向量、并将这些数据存储在向量数据库的集合中,同时对注入的数据进行验证。


代码结构与功能分析

1. 类的简介
  • 目标:将 CSV 或 Excel 文件的数据转换为向量并存储到 ChromaDB 中。
  • 主要方法
    • run_pipeline:运行整个数据处理管道。
    • _load_dataframe:加载数据文件为 Pandas DataFrame。
    • _prepare_data_for_injection:根据文件内容生成向量和相关元数据。
    • _inject_data_into_chromadb:将数据注入到 ChromaDB 中。
    • _validate_db:验证向量数据库的集合内容。

2. 初始化 (__init__)
def __init__(self, file_directory:str) -> None:
    self.APPCFG = LoadConfig()
    self.file_directory = file_directory
  • 参数
    • file_directory:待处理文件的路径。
  • 功能
    • 加载配置对象 self.APPCFG,其中包含数据库和嵌入生成器的实例。
    • 初始化文件路径 file_directory

3. 运行数据处理管道 (run_pipeline)
def run_pipeline(self):
    self.df, self.file_name = self._load_dataframe(file_directory=self.file_directory)
    self.docs, self.metadatas, self.ids, self.embeddings = self._prepare_data_for_injection(df=self.df, file_name=self.file_name)
    self._inject_data_into_chromadb()
    self._validate_db()
  • 功能
    • 加载数据文件(调用 _load_dataframe)。
    • 准备向量和相关元数据(调用 _prepare_data_for_injection)。
    • 将数据注入到 ChromaDB 集合中(调用 _inject_data_into_chromadb)。
    • 验证注入是否成功(调用 _validate_db)。

4. 加载数据文件 (_load_dataframe)
def _load_dataframe(self, file_directory: str):
    file_names_with_extensions = os.path.basename(file_directory)
    file_name, file_extension = os.path.splitext(file_names_with_extensions)
    if file_extension == ".csv":
        df = pd.read_csv(file_directory)
        return df, file_name
    elif file_extension == ".xlsx":
        df = pd.read_excel(file_directory)
        return df, file_name
    else:
        raise ValueError("The selected file type is not supported")
  • 参数
    • file_directory:待加载的文件路径。
  • 功能
    • 根据文件扩展名(.csv.xlsx),将文件加载为 Pandas DataFrame。
    • 返回加载的数据和文件名(不带扩展名)。
  • 异常
    • 如果文件类型不被支持,则抛出 ValueError

5. 准备数据 (_prepare_data_for_injection)
def _prepare_data_for_injection(self, df:pd.DataFrame, file_name:str):
    docs = []
    metadatas = []
    ids = []
    embeddings = []
    for index, row in df.iterrows():
        output_str = ""
        for col in df.columns:
            output_str += f"{col}: {row[col]},\n"
        response = self.APPCFG.OpenAIEmbeddings.embed_documents(output_str)[0]
        embeddings.append(response)
        docs.append(output_str)
        metadatas.append({"source": file_name})
        ids.append(f"id{index}")
    return docs, metadatas, ids, embeddings
  • 参数
    • df:待处理的 Pandas DataFrame。
    • file_name:文件名,用于生成元数据。
  • 功能
    • 遍历 DataFrame 的每一行,将行数据格式化为字符串 output_str
    • 使用 OpenAIEmbeddings.embed_documents 为字符串生成向量。
    • 保存生成的文档、元数据、唯一 ID 和向量。
  • 返回值
    • 文档列表、元数据列表、ID 列表和向量列表。

6. 注入数据到 ChromaDB (_inject_data_into_chromadb)
def _inject_data_into_chromadb(self):
    chroma_client = self.APPCFG.chroma_client
    existing_collections = chroma_client.list_collections()
    collection_name = self.APPCFG.collection_name
    existing_collection_names = [collection.name for collection in existing_collections]
    
    if collection_name in existing_collection_names:
        collection = chroma_client.get_collection(name=collection_name)
        print(f"Retrieved existing collection: {collection_name}")
    else:
        collection = chroma_client.create_collection(name=collection_name)
        print(f"Created new collection: {collection_name}")

    collection.add(
        documents=self.docs,
        metadatas=self.metadatas,
        embeddings=self.embeddings,
        ids=self.ids
    )
    print("Data is stored in ChromaDB.")
  • 功能
    • 检查集合是否已存在。如果存在,则获取;否则,创建新集合。
    • 将文档、元数据、嵌入向量和 ID 添加到集合中。
  • 异常处理
    • 避免重复创建集合。

7. 验证数据库内容 (_validate_db)
def _validate_db(self):
    vectordb = self.APPCFG.chroma_client.get_collection(name=self.APPCFG.collection_name)
    print("Number of vectors in vectordb:", vectordb.count())
  • 功能
    • 获取集合并打印其中向量的数量,确认数据是否注入成功。

代码运行结果:
在这里插入图片描述

总结

这段代码的整体流程如下:

  1. 加载 CSV 或 Excel 文件,转换为 Pandas DataFrame。
  2. 遍历 DataFrame 的每一行,生成文档、元数据和嵌入向量。
  3. 将生成的数据注入到 ChromaDB 的集合中。
  4. 验证数据库集合中的向量数量,确保注入成功。

需要注意文件格式支持、嵌入生成器和 ChromaDB 客户端的兼容性问题。

完整代码:

import os
import pandas as pd
from utils.load_config import LoadConfig
import pandas as pd


class PrepareVectorDBFromTabularData:
    """
    This class is designed to prepare a vector database from a CSV and XLSX file.
    It then loads the data into a ChromaDB collection. The process involves
    reading the CSV file, generating embeddings for the content, and storing 
    the data in the specified collection.
    
    Attributes:
        APPCFG: Configuration object containing settings and client instances for database and embedding generation.
        file_directory: Path to the CSV file that contains data to be uploaded.
    """
    def __init__(self, file_directory:str) -> None:
        """
        Initialize the instance with the file directory and load the app config.
        
        Args:
            file_directory (str): The directory path of the file to be processed.
        """
        self.APPCFG = LoadConfig()
        self.file_directory = file_directory
        
        
    def run_pipeline(self):
        """
        Execute the entire pipeline for preparing the database from the CSV.
        This includes loading the data, preparing the data for injection, injecting
        the data into ChromaDB, and validating the existence of the injected data.
        """
        self.df, self.file_name = self._load_dataframe(file_directory=self.file_directory)
        self.docs, self.metadatas, self.ids, self.embeddings = self._prepare_data_for_injection(df=self.df, file_name=self.file_name)
        self._inject_data_into_chromadb()
        self._validate_db()


    
    def _load_dataframe(self, file_directory: str):
        """
        Load a DataFrame from the specified CSV or Excel file.
        
        Args:
            file_directory (str): The directory path of the file to be loaded.
            
        Returns:
            DataFrame, str: The loaded DataFrame and the file's base name without the extension.
            
        Raises:
            ValueError: If the file extension is neither CSV nor Excel.
        """
        file_names_with_extensions = os.path.basename(file_directory)
        print(file_names_with_extensions)
        file_name, file_extension = os.path.splitext(
                file_names_with_extensions)
        if file_extension == ".csv":
            df = pd.read_csv(file_directory)
            return df, file_name
        elif file_extension == ".xlsx":
            df = pd.read_excel(file_directory)
            return df, file_name
        else:
            raise ValueError("The selected file type is not supported")
        

    def _prepare_data_for_injection(self, df:pd.DataFrame, file_name:str):
        """
        Generate embeddings and prepare documents for data injection.
        
        Args:
            df (pd.DataFrame): The DataFrame containing the data to be processed.
            file_name (str): The base name of the file for use in metadata.
            
        Returns:
            list, list, list, list: Lists containing documents, metadatas, ids, and embeddings respectively.
        """
        docs = []
        metadatas = []
        ids = []
        embeddings = []
        for index, row in df.iterrows():
            output_str = ""
            # Treat each row as a separate chunk
            for col in df.columns:
                output_str += f"{col}: {row[col]},\n"
            response = self.APPCFG.OpenAIEmbeddings.embed_documents(output_str)[0]
            embeddings.append(response)
            docs.append(output_str)
            metadatas.append({"source": file_name})
            ids.append(f"id{index}")
        return docs, metadatas, ids, embeddings

    def _inject_data_into_chromadb(self):
        """
        Inject the prepared data into ChromaDB.
        
        Raises an error if the collection_name already exists in ChromaDB.
        The method prints a confirmation message upon successful data injection.
        """
        chroma_client = self.APPCFG.chroma_client
        # 列出所有集合的名称
        existing_collections = chroma_client.list_collections()

        collection_name = self.APPCFG.collection_name #"titanic_small"

        # 获取所有集合
        existing_collections = chroma_client.list_collections()

        # 提取集合名称
        existing_collection_names = [collection.name for collection in existing_collections]

        if collection_name in existing_collection_names:
            # 如果集合存在,获取它
            collection = chroma_client.get_collection(name=collection_name)
            print(f"Retrieved existing collection: {collection_name}")
        else:
            # 如果集合不存在,创建它
            collection = chroma_client.create_collection(name=collection_name)
            print(f"Created new collection: {collection_name}")

        collection.add(
            documents=self.docs,
            metadatas=self.metadatas,
            embeddings=self.embeddings,
            ids=self.ids
        )
        print("==============================")
        print("Data is stored in ChromaDB.")     

    def _validate_db(self):
        """
        Validate the contents of the database to ensure that the data injection has been successful.
        Prints the number of vectors in the ChromaDB collection for confirmation.
        """
        vectordb =  self.APPCFG.chroma_client.get_collection(name=self.APPCFG.collection_name)
        print("==============================")
        print("Number of vectors in vectordb:", vectordb.count())
        print("==============================")

http://www.kler.cn/a/473514.html

相关文章:

  • 攻防世界 ics-07
  • SpringCloud系列教程:微服务的未来(十)服务调用、注册中心原理、Nacos注册中心
  • C语言基本知识复习浓缩版:标识符、函数、进制、数据类型
  • 腾讯云AI代码助手编程挑战赛-凯撒密码解码编码器
  • 【Qt】QtConcurrent
  • spring boot 多数据源集成mysql、postgresql、phoenix、doris等
  • MySql---进阶篇(十一)----游标,条件处理程序,存储函数
  • Bash语言的计算机基础
  • 【优选算法】Binary-Blade:二分查找的算法刃(下)
  • 一款FPGA芯片开发的核心板(EP4CE6核心板)
  • WebRTC 的优缺点详细解析
  • 怎麼在iPhone iOS(Wi-Fi/蜂窩數據)上查找IP地址?
  • vue js实现时钟以及刻度效果
  • HTML5 波动动画(Pulse Animation)详解
  • 微信小程序中使用weui组件库
  • 基于知识蒸馏的跨模态目标检测方法总结
  • 【问题记录】npm create vue@latest报错
  • 后勤管理系统|Java|SSM|VUE| 前后端分离
  • 系统分析师笔记
  • 上门按摩系统架构与功能分析
  • PHP语言的正则表达式
  • 面向强化学习的状态空间建模:RSSM的介绍和PyTorch实现
  • STM32之一种双通路CAN总线消息备份冗余处理方法(十三)
  • 工业级千兆路由器 5G+WIFI6 高速稳定串口采集
  • 计算机毕业设计hadoop+spark知网文献论文推荐系统 知识图谱 知网爬虫 知网数据分析 知网大数据 知网可视化 预测系统 大数据毕业设计 机器学习
  • 系统架构设计师考点—软件工程基础知识