当前位置: 首页 > article >正文

python操作Elasticsearch

使用elasticsearch 6.x版本,操作es数据。

#! -*- coding:utf-8 -*
import time

from elasticsearch import Elasticsearch, helpers


class EstUtil:
    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(EstUtil, cls).__new__(cls, *args, **kwargs)
        return cls._instance

    def __init__(self):
        # todo hosts不传有默认的,可传hosts=[]
        self.es = Elasticsearch(timeout=300, max_retries=3)

    def is_exists_index(self, index_name):
        """
        是否存在索引
        :param index_name: 索引名称
        :return:
        """
        print(index_name)
        return self.es.indices.exists(index=index_name)

    def get_all_indices(self, index_=None):
        """
        获取所有索引
        :param index_name: 根据名称获取所有的索引
        :return:
        """
        if not index_:
            # 只返回所有索引名
            all_indices = self.es.indices.get_alias().keys()
        else:
            # 返回索引的信息,状态等
            all_indices = self.es.cat.indices(index=index_, format="json")
        return all_indices

    def index(self, index_name, body):
        """
        插入单条数据
        :param index_name: 索引名称
        :param body: 请求体数据dict
        :return: boolean
        {
          "name": "aaa",
          "age": 18
        }
        """
        # 如果pip安装的elasticsearch版本低于7.x,加入 doc_type='_doc'
        response = self.es.index(index=index_name, body=body, doc_type='_doc')

        # 检查响应
        if response['result'] == 'created':
            return True
        else:
            return False

    def batch_insert(self, index_name, body_list):
        """
        批量插入数据
        :param index_name: 索引名称
        :param body_list: 请求体数据list
        :return:
        """
        data_list = list()
        for data in body_list:
            action = {
                "_index": index_name,
                "_type": '_doc',
                '_source': data
            }
            data_list.append(action)

        return helpers.bulk(self.es, data_list)

    def update(self, index_name, doc_id, update_body):
        """
        :param index_name: 索引名
        :param doc_id: 记录id
        :param update_body: 更新的dict
        :return: boolean
        """
        # 如果pip安装的elasticsearch版本低于7.x,加入 doc_type='_doc'
        response = self.es.update(index=index_name, id=doc_id, body=update_body, doc_type="_doc")

        # 检查编辑是否成功
        if response['result'] == 'updated':
            return True
        else:
            return False

    def batch_update(self, index_name, body_list):
        """
        批量编辑数据
        :param index_name: 索引名称
        :param body_list: 请求体数据list
        :return:
        """
        data_list = list()
        for data in body_list:
            _id = data.pop("id")
            action = {
                "_op_type": "update",
                "_index": index_name,
                'doc': data,
                "_id": _id
            }
            data_list.append(action)

        return helpers.bulk(self.es, data_list)

    def delete(self, index_name, doc_id):
        """
        删除单条数据
        :param index_name: 索引名称
        :param doc_id: 记录id
        :return:  boolean
        """
        response = self.es.delete(index=index_name, id=doc_id)

        # 检查删除是否成功
        if response['result'] == 'deleted':
            return True
        else:
            return False

    def batch_delete(self, index_name, body_list):
        """
        批量删除数据
        :param index_name: 索引名称
        :param body_list: 请求体数据list
        :return:
        """
        data_list = list()
        for data in body_list:
            _id = data.get("id")
            action = {
                "_op_type": "delete",
                "_index": index_name,
                "_id": _id
            }
            data_list.append(action)
        return helpers.bulk(self.es, data_list)

    def delete_index(self, index_name):
        """
        删除单个索引
        :param index_name: 索引名称
        :return:
        """
        return self.es.indices.delete(index=index_name)['acknowledged']

    def create_index(self, index_name, field_type_dict, number_of_shards, number_of_replicas):
        """
        传入简单的键值对
        :param index_name 索引名称
        :param field_type_dict 字段名称,类型字典
        :param number_of_shards 分片数量
        :param number_of_replicas 副本数量
        :return: 创建成功
        """

        if self.is_exists_index(index_name):
            raise ValueError('索引已存在:%s' % index_name)
        body = {}
        settings = {
            'number_of_shards': number_of_shards,
            'number_of_replicas': number_of_replicas
        }
        mappings = {}
        index_type = {}
        properties = {}
        for key, value in field_type_dict.items():
            properties[key] = {'type': value}
        index_type['properties'] = properties
        mappings['_doc'] = index_type
        body['settings'] = settings
        body['mappings'] = mappings
        response = self.es.indices.create(index=index_name, body=body)
        return response

    def create_index_by_body(self, index_name, body):
        """
        自定义参数创建索引
        :param index_name 索引名称
        :param body 组装好的创建dict
        :return:
        """
        """
        示例参数:
            {
            "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 0
            },
            "aliases": {
                "test_log": {}
            },
            "mappings": {
                "properties": {
                    "create_time": {
                        "type": "date"
                    },
                    "status": {"type": "integer"},
                    "dev_ip": {"type": "ip"},
                    "dev_uuid": {"type": "keyword"},
                    "user": {
                        "properties": {
                            "name": {
                                "type": "text"
                            },
                            "age": {
                                "type": "integer"
                            },
                            "email": {
                                "type": "keyword"
                            }
                        }
                    },
                    "infos": {
                        "type": "nested",
                        "dynamic": False,
                        "properties": {
                            "v_id": {"type": "keyword"},
                            "v_name": {"type": "keyword"},
                            "v_desc": {"type": "text"}
                        }
                    },
                    "remark": {"type": "text"},
                    "show_num": {"type": "long", "doc_values": False, "index": False}
                }
            }
        }
        """
        if self.is_exists_index(index_name):
            raise ValueError('索引已存在:%s' % index_name)

        response = self.es.indices.create(index=index_name, body=body)
        return response

    def search(self, index_name, request_body):
        """
        查询数据
        :param index_name: 索引名称
        :param request_body: 查询dsl
        :return:
        """
        return self.es.search(index=index_name, body=request_body, timeout="5m")

    def search_page(self, index_name, request_body, page, size):
        """
        分页查询数据(数据量超过10000条时,直接使用from和size参数可能会导致性能问题)
        :param index_name: 索引名称
        :param request_body: 查询dsl
        :param page: 页码
        :param size: 每页条数
        :return:
        """
        from_value = (page - 1) * size  # 偏移量
        return self.es.search(index=index_name, body=request_body, from_=from_value, size=size)

    def search_after(self, index_name, search_after_body):
        """
        分页查询(基于上一次查询的最后一个文档的排序值来进行下一次查询,查询中必须包含 sort 字段,并且这个字段的值需要唯一)
        :param index_name: 索引名称
        :param search_after_body 查询dsl
        :return:
        """
        # search_after_body参数,search_after字段,传入最后一次的排序值,第一次不用传
        # search_after_body参数中,需要包含size,sort
        # 排序值必须有唯一性,如果具有相同的排序值,search_after无法正确地定位到下一页的开始位置,导致数据重复或遗漏,如时间排序可能重复
        # 采用多个字段排序,或使用类似_id的唯一值
        # eg:{"size": 1, "sort": [{"create_time": {"order": "desc"}}, {"_id": {"order": "asc"}}]}

        response = self.es.search(index=index_name, body=search_after_body)

        hits = response.get('hits').get('hits')
        search_after_body = {'search_after': hits[-1].get('sort') if hits else []}
        return search_after_body, response

    def search_scroll(self, index_name, scroll, body):
        """
        分页查询,第一次执行(scroll 是一种基于游标的分页方式,每次搜索创建一个快照)
        :param index_name: 索引名称
        :param scroll: scroll保持时间,1m为1分钟
        :param body: 查询条件体dict
        :return:
        """
        # 初始化scroll查询
        response = self.es.search(index=index_name, scroll=scroll, body=body)
        scroll_id = response['_scroll_id']
        return scroll_id, response

    def search_next(self, scroll_id, scroll):
        """
        与上方的search_scroll结合使用
        :param scroll_id: 上一次的快照id
        :param scroll: scroll快照保持时间,1m为1分钟,如果查询数据超过1分钟,会报错
        :return:
        """
        response = self.es.scroll(scroll_id=scroll_id, scroll=scroll)
        scroll_id = response['_scroll_id']
        return scroll_id, response


if __name__ == '__main__':
    es = EstUtil()

    create_index_body = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0
        },
        "aliases": {
            "test_log": {}
        },
        "mappings": {
            "properties": {
                "create_time": {
                    "type": "date"
                },
                "status": {"type": "integer"},
                "dev_ip": {"type": "ip"},
                "dev_uuid": {"type": "keyword"},
                "user": {
                    "properties": {
                        "name": {
                            "type": "text"
                        },
                        "age": {
                            "type": "integer"
                        },
                        "email": {
                            "type": "keyword"
                        }
                    }
                },
                "infos": {
                    "type": "nested",
                    "dynamic": False,
                    "properties": {
                        "v_id": {"type": "keyword"},
                        "v_name": {"type": "keyword"},
                        "v_desc": {"type": "text"}
                    }
                },
                "remark": {"type": "text"},
                "show_num": {"type": "long", "doc_values": False, "index": False}
            }
        }
    }

    index_name = "test_data_index"
    # 创建索引
    create_respone = es.create_index_by_body(index_name, create_index_body)
    print(create_respone)
    # 输出 {u'index': u'test_data_index', u'acknowledged': True, u'shards_acknowledged': True}

    # 查看所有的索引
    indices = es.get_all_indices()
    print(indices)

    # 插入单条数据
    insert_body = {
        "create_time": 1720601022255,
        "status": 201,
        "dev_ip": "192.168.1.101",
        "dev_uuid": "123e4567e89b12d3a456426614174000",
        "user": {
            "name": "战三",
            "age": 30,
            "email": "zhansan@example.com"
        },
        "infos": [
            {
                "v_id": "123e4567e89b12d3a456426614174000",
                "v_name": "战三",
                "v_desc": "描述啦啦啦啦啦啦"
            }
        ],
        "remark": "描述!!!!!",
        "show_num": 6789
    }
    insert_resp = es.index(index_name, insert_body)
    print(insert_resp)

    # 更新数据/给某记录动态增加字段数据(send_time建索引时不存在,可更新增加)
    update_body = {
        "doc": {
            "send_time": int(time.time())
        }
    }
    update_resp = es.update(index_name, "OWps6pEBay2ae5Uuwei-", update_body)
    print(update_resp)

    # 基础查询
    resp2 = es.search(index_name, {"size": 10, "from": 1})
    print(resp2)

    # 分页查询1
    search_body = {
        "query": {
            "bool": {
                "must": [
                    {
                        "term": {
                            "send_time": 1726226611
                        }
                    }
                ]
            }
        }
    }
    resp1 = es.search_page(index_name, search_body, 1, 1)
    print(resp1)

    # 分页查询2
    scroll = "1m"
    scroll_id, response = es.search_scroll(index_name, scroll, {"size": 10})
    print(scroll_id, response)
    while response["hits"]["hits"]:
        scroll_id, response = es.search_next(scroll_id, scroll)
        print(scroll_id, response)

    # after分页两种写法,分页查询3-1
    after_search = {"size": 1, "sort": [{"create_time": {"order": "desc"}}, {"_id": {"order": "asc"}}]}
    search_after_body, resp3 = es.search_after(index_name, after_search)
    print(search_after_body, resp3)
    while resp3 and resp3["hits"]["hits"]:
        after_search["search_after"] = search_after_body.get("search_after")
        search_after_body, resp3 = es.search_after(index_name, after_search)

    # after分页两种写法,分页查询3-2
    search_after = None
    while True:
        if search_after:
            after_search["search_after"] = search_after
        search_after_body, resp3 = es.search_after(index_name, after_search)

        if not resp3["hits"]["hits"]:
            break

        search_after = search_after_body["search_after"]


http://www.kler.cn/a/411751.html

相关文章:

  • .net的winfrom程序 窗体透明打开窗体时出现在屏幕右上角
  • Linux 系统管理
  • 【FPGA】Verilog:利用 4 个串行输入- 串行输出的 D 触发器实现 Shift_register
  • 深入浅出:JVM 的架构与运行机制
  • PGSQL物化视图(Materialized View)
  • 【Google Cloud】Private Service Connect 托管式服务
  • PHP md5函数 生成的字符串是多少位的
  • 一个开源轻量级的服务器资源监控平台,支持告警推送
  • 应用商店双弹窗“APP在向用户申请权限时未同步告知用户申请此权限的理由”驳回uni-app应用上线的解决方法
  • 从零开始学GeoServer源码(二)添加支持arcgis切片功能
  • 小程序-基于java+SpringBoot+Vue的网上花店微信小程序设计与实现
  • Android Toast信息定位分析介绍
  • 基于Matlab实现Gabo滤波器(源码)
  • java虚拟机——JVM中,内存的哪些区域被划分为线程私有、哪些区域是线程共享的
  • 从0开始深度学习(32)——循环神经网络的从零开始实现
  • 常用的数据结构
  • llama-factory 系列教程 (七),Qwen2.5-7B-Instruct 模型微调与vllm部署详细流程实战
  • Springboot 整合 Java DL4J 构建自然语言处理之机器翻译系统
  • 实现一个可配置的TCP设备模拟器,支持交互和解析配置
  • Linux下环境基础开放工具
  • 林业产品推荐系统:Spring Boot解决方案
  • flink学习(7)——window
  • 基于SpringBoot+Vue的智慧社区网站-无偿分享 (附源码+LW+调试)
  • 企业后端多租户管理平台
  • Redis底层数据结构
  • c++音视频学习环境搭建