当前位置: 首页 > article >正文

Python教程:使用 Python 和 PyHive 连接 Hive 数据库

目录

1. 引言

2. 类的设计思路

2.1 类的基本结构

3. 连接到 Hive

3.1 连接方法

4. 执行查询

4.1 查询返回 DataFrame

4.2 查询返回列表

5. 基本的数据库操作

5.1 创建表

5.2 插入数据

5.3 更新数据

5.4 删除数据

6. 表的描述信息和数据库操作

6.1 获取表描述

6.2 列出所有数据库

6.3 切换数据库

7. 关闭连接

8. 使用示例

9.完整代码

 


在大数据时代,Hive 提供了一种简便的方式来处理和分析大规模的数据集。本文将通过一个简单的 Python 类 HiveConnectionManager 来展示如何使用 PyHive 库连接到 Hive 数据库,并执行基本的数据库操作。

1. 引言


在数据处理过程中,数据库连接是一个重要的组成部分。Python 提供了许多库来帮助我们快速连接到各种数据库。PyHive 是一个专门为 Hive 设计的 Python 库,它简化了与 Hive 的交互。本文将通过创建一个 HiveConnectionManager 类来展示如何使用 PyHive 进行数据库操作。

首先,需要安装相关依赖库

pip install pyhive thrift pandas

2. 类的设计思路


HiveConnectionManager 类的设计旨在实现 Hive 数据库的基本 CRUD(创建、读取、更新、删除)操作。类的主要功能包括:

  • 建立与 Hive 的连接
  • 执行查询并返回结果
  • 创建表格
  • 插入、更新和删除数据
  • 获取表的描述信息
  • 列出所有数据库
  • 切换当前数据库
  • 关闭连接

2.1 类的基本结构

class HiveConnectionManager:
    def __init__(self, host, port=10000, username=None, database=None):
        # 初始化连接参数
        self.host = host
        self.port = port
        self.username = username
        self.database = database
        self.connection = None
        self.cursor = None

3. 连接到 Hive


连接到 Hive 是进行数据库操作的第一步。我们使用 hive.Connection 函数来建立连接并生成游标。

3.1 连接方法

def connect(self):
    try:
        self.connection = hive.Connection(
            host=self.host,
            port=self.port,
            username=self.username,
            database=self.database
        )
        self.cursor = self.connection.cursor()
        print("Hive 连接成功")
    except Exception as e:
        print(f"连接失败: {e}")

在这个方法中,我们捕捉异常以确保在连接失败时可以得到相应的提示。

4. 执行查询


执行查询是数据库操作的核心部分。我们提供了两个执行查询的方法:一个返回 DataFrame 结果,另一个返回列表。

4.1 查询返回 DataFrame

def execute_query_to_dataframe(self, query):
    try:
        self.cursor.execute(query)
        result = self.cursor.fetchall()
        columns = [desc[0] for desc in self.cursor.description]
        return pd.DataFrame(result, columns=columns)
    except Exception as e:
        print(f"执行查询失败: {e}")
        return None

4.2 查询返回列表

def execute_query(self, query):
    try:
        self.cursor.execute(query)
        return self.cursor.fetchall()
    except Exception as e:
        print(f"执行查询失败: {e}")
        return None

这两个方法允许用户执行任意 SQL 查询并获取结果。DataFrame 格式的返回结果便于后续的数据分析和处理。

5. 基本的数据库操作


5.1 创建表

def create_table(self, table_name, columns):
    query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})"
    self.execute_query(query)
    print(f"表 {table_name} 创建成功")

5.2 插入数据

def insert_data(self, table_name, values):
    query = f"INSERT INTO {table_name} VALUES ({values})"
    self.execute_query(query)
    print(f"数据插入到表 {table_name} 成功")

5.3 更新数据

def update_data(self, table_name, set_statement, where_condition):
    query = f"UPDATE {table_name} SET {set_statement} WHERE {where_condition}"
    self.execute_query(query)
    print(f"表 {table_name} 更新成功")

5.4 删除数据

def delete_data(self, table_name, where_condition):
    query = f"DELETE FROM {table_name} WHERE {where_condition}"
    self.execute_query(query)
    print(f"表 {table_name} 数据删除成功")

6. 表的描述信息和数据库操作


6.1 获取表描述

def describe_table(self, table_name):
    query = f"DESCRIBE {table_name}"
    return self.execute_query_to_dataframe(query)

6.2 列出所有数据库

def list_databases(self):
    query = "SHOW DATABASES"
    return self.execute_query_to_dataframe(query)

6.3 切换数据库

def use_database(self, database_name):
    query = f"USE {database_name}"
    self.execute_query(query)
    print(f"当前数据库切换为 {database_name}")

7. 关闭连接


在操作结束后,务必要关闭数据库连接,以释放资源。

def close(self):
    if self.cursor:
        self.cursor.close()
    if self.connection:
        self.connection.close()
    print("Hive 连接已关闭")

8. 使用示例


下面是如何使用 HiveConnectionManager 类的一个完整示例:

if __name__ == "__main__":
    hive_manager = HiveConnectionManager(host='your_hive_host', username='your_username')
    
    hive_manager.connect()
    hive_manager.use_database('your_database')
    
    # 创建表
    hive_manager.create_table('test_table', 'id INT, name STRING')
    
    # 插入数据
    hive_manager.insert_data('test_table', '(1, "John Doe")')
    
    # 查询数据
    df = hive_manager.execute_query_to_dataframe('SELECT * FROM test_table')
    print(df)
    
    # 更新数据
    hive_manager.update_data('test_table', 'name = "Jane Doe"', 'id = 1')
    
    # 删除数据
    hive_manager.delete_data('test_table', 'id = 1')
    
    hive_manager.close()

9.完整代码


from pyhive import hive
import pandas as pd
import rich


class HiveConnectionManager:
    def __init__(self, host, port=10000, username=None, database=None):
        """
        初始化 HiveConnectionManager 类的实例。

        :param host: Hive 服务器的主机名或 IP 地址。
        :param port: Hive 服务器的端口,默认为 10000。
        :param username: 连接 Hive 的用户名。
        :param database: 连接时要使用的默认数据库。
        """
        self.host = host
        self.port = port
        self.username = username
        self.database = database
        self.connection = None
        self.cursor = None

    def connect(self):
        """
        建立 Hive 连接。

        :return: None
        """
        try:
            self.connection = hive.Connection(
                host=self.host,
                port=self.port,
                username=self.username,
                database=self.database
            )
            self.cursor = self.connection.cursor()
            print("Hive 连接成功")
        except Exception as e:
            print(f"连接失败: {e}")

    def execute_query_to_dataframe(self, query):
        """
        执行查询并返回 DataFrame 结果。

        :param query: 要执行的 SQL 查询字符串。
        :return: 包含查询结果的 Pandas DataFrame,如果执行失败则返回 None。
        """
        try:
            self.cursor.execute(query)
            result = self.cursor.fetchall()
            columns = [desc[0] for desc in self.cursor.description]
            return pd.DataFrame(result, columns=columns)
        except Exception as e:
            print(f"执行查询失败: {e}")
            return None

    def execute_query(self, query):
        """
        执行查询并返回结果。

        :param query: 要执行的 SQL 查询字符串。
        :return: 查询结果的列表,如果执行失败则返回 None。
        """
        try:
            self.cursor.execute(query)
            return self.cursor.fetchall()
        except Exception as e:
            print(f"执行查询失败: {e}")
            return None

    def create_table(self, table_name, columns):
        """
        创建 Hive 表。

        :param table_name: 要创建的表的名称。
        :param columns: 表中列的定义字符串(例如,"id INT, name STRING")。
        :return: None
        """
        query = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})"
        self.execute_query(query)
        print(f"表 {table_name} 创建成功")

    def insert_data(self, table_name, values):
        """
        向表中插入数据。

        :param table_name: 要插入数据的表的名称。
        :param values: 要插入的值的字符串(例如,"(1, 'John Doe')")。
        :return: None
        """
        query = f"INSERT INTO {table_name} VALUES ({values})"
        self.execute_query(query)
        print(f"数据插入到表 {table_name} 成功")

    def update_data(self, table_name, set_statement, where_condition):
        """
        更新表中的数据。

        :param table_name: 要更新的表的名称。
        :param set_statement: 设置的更新语句(例如,"name = 'Jane Doe'")。
        :param where_condition: 更新的条件(例如,"id = 1")。
        :return: None
        """
        query = f"UPDATE {table_name} SET {set_statement} WHERE {where_condition}"
        self.execute_query(query)
        print(f"表 {table_name} 更新成功")

    def delete_data(self, table_name, where_condition):
        """
        从表中删除数据。

        :param table_name: 要删除数据的表的名称。
        :param where_condition: 删除的条件(例如,"id = 1")。
        :return: None
        """
        query = f"DELETE FROM {table_name} WHERE {where_condition}"
        self.execute_query(query)
        print(f"表 {table_name} 数据删除成功")

    def describe_table(self, table_name):
        """
        获取表的描述信息。

        :param table_name: 要描述的表的名称。
        :return: 包含表描述信息的 Pandas DataFrame。
        """
        query = f"DESCRIBE {table_name}"
        return self.execute_query_to_dataframe(query)

    def list_databases(self):
        """
        列出所有数据库。

        :return: 包含所有数据库名称的 Pandas DataFrame。
        """
        query = "SHOW DATABASES"
        return self.execute_query_to_dataframe(query)

    def use_database(self, database_name):
        """
        切换当前数据库。

        :param database_name: 要切换到的数据库名称。
        :return: None
        """
        query = f"USE {database_name}"
        self.execute_query(query)
        print(f"当前数据库切换为 {database_name}")

    def close(self):
        """
        关闭连接。

        :return: None
        """
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()
        print("Hive 连接已关闭")


if __name__ == "__main__":
    hive_manager = HiveConnectionManager(host='your_hive_host', username='your_username')
    
    hive_manager.connect()
    hive_manager.use_database('your_database')
    
    # 创建表
    hive_manager.create_table('test_table', 'id INT, name STRING')
    
    # 插入数据
    hive_manager.insert_data('test_table', '(1, "John Doe")')
    
    # 查询数据
    df = hive_manager.execute_query_to_dataframe('SELECT * FROM test_table')
    print(df)
    
    # 更新数据
    hive_manager.update_data('test_table', 'name = "Jane Doe"', 'id = 1')
    
    # 删除数据
    hive_manager.delete_data('test_table', 'id = 1')
    
    hive_manager.close()

 


http://www.kler.cn/news/289169.html

相关文章:

  • windows核心编程,纤程使用---->UNIX服务器应用程序移植到Windows中
  • 在更新python虚拟环境pip版本时,由于更新失败导致pip丢失的修复
  • GD - EmbeddedBuilder - 给已有工程换MCU
  • 【IPV6从入门到起飞】2-1 获取你的IPV6(手机、CPE等)
  • LVS Keepalived nginx haproxy 区别
  • 设计模式-原型适配器桥接外观
  • 基于大数据分析景区消费行为影响因素研究【消费等级预测、携程,去哪网数据抓取】
  • 亿图图示下载安装教程EdrawMax Pro 13版超详细图文教程
  • 前端面试:webSocket如何兼容低浏览器?
  • 安防监控视频平台LntonAIServer视频智能分析平台新增视频质量诊断功能
  • 机器学习数学公式推导之降维
  • Python加载 TorchScript 格式的 ResNet18 模型分类该模型进行预测并输出预测的类别和置信度
  • 【运维监控】prometheus+node exporter+grafana 监控linux机器运行情况(2)
  • 【wsl2】从C盘迁移到G盘
  • redroid搭建云手机学习笔记(一)
  • C++ ─── List的模拟实现
  • django orm的Q和~Q的数据相加并不一定等于总数
  • Golang | Leetcode Golang题解之第380题O(1)时间插入、删除和获取随机元素
  • [SDK]-按钮静态文本与编辑框控件
  • Vue-cli的使用
  • MySQL三大日志详解
  • 【区块链 + 房产建筑】透明建造系统 | FISCO BCOS应用案例
  • Windows安装docker,启动ollama运行open-webui使用AIGC大模型写周杰伦歌词
  • Unity实战案例 2D小游戏HappyGlass(模拟水珠)
  • 解剖学上合理的分割:通过先验变形显式保持拓扑结构|文献速递--基于深度学习的医学影像病灶分割
  • 域与活动目录
  • Mudo03 vscode配置相应的文件的搜索路径,库文件的搜索路径以及想要的链接库
  • 【Redis之一:下载安装Redis】
  • Java 入门指南:Java 并发编程 —— 并发容器 ConcurrentSkipListMap
  • P7492 [传智杯 #3 决赛] 序列