当前位置: 首页 > article >正文

python elasticsearch 8.x通过代理发起请求方法

由于python elasticsearch v8 engine的源码包中并未开放对于请求添加proxies的支持,导致在某些环境下无法连通外网的es服务。目前网上暂无相关的修改内容,我这边提供下自己修改的动态运行时替换elasticsearch包的源码方法demo

import gzip
import ssl
import time
import requests
from elastic_transport._node._http_requests import RequestsHttpNode
from typing import Any, Optional, Union
from elastic_transport._compat import warn_stacklevel
from elastic_transport._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
from elastic_transport._models import ApiResponseMeta, HttpHeaders, NodeConfig
from elastic_transport.client_utils import DEFAULT, DefaultType, client_meta_version
from elastic_transport._node._base import (
    BUILTIN_EXCEPTIONS,
    RERAISE_EXCEPTIONS,
    BaseNode,
    NodeApiResponse,
    ssl_context_from_node_config,
)


def custom_perform_request(
        self,
        method: str,
        target: str,
        body: Optional[bytes] = None,
        headers: Optional[HttpHeaders] = None,
        request_timeout: Union[DefaultType, Optional[float]] = DEFAULT,
    ) -> NodeApiResponse:
        url = self.base_url + target
        headers = HttpHeaders(headers or ())

        request_headers = self._headers.copy()
        if headers:
            request_headers.update(headers)

        body_to_send: Optional[bytes]
        if body:
            if self._http_compress:
                body_to_send = gzip.compress(body)
                request_headers["content-encoding"] = "gzip"
            else:
                body_to_send = body
        else:
            body_to_send = None

        start = time.time()
        proxies_dict = {
            "http": "http://xx.xx.xx.xx:xx",
            "http": "http://xx.xx.xx.xx:xx",
        }
        request = requests.Request(
            method=method, headers=request_headers, url=url, data=body_to_send
        )
        prepared_request = self.session.prepare_request(request)
        send_kwargs = {
            "timeout": (
                request_timeout
                if request_timeout is not DEFAULT
                else self.config.request_timeout
            )
        }
        send_kwargs.update(
            self.session.merge_environment_settings(  # type: ignore[arg-type]
                prepared_request.url, {}, None, None, None
            )
        )
        send_kwargs.pop('proxies')
        try:
            response = self.session.send(prepared_request, proxies=proxies_dict,  **send_kwargs)  # type: ignore[arg-type]
            data = response.content
            duration = time.time() - start
            response_headers = HttpHeaders(response.headers)

        except RERAISE_EXCEPTIONS:
            raise
        except Exception as e:
            err: Exception
            if isinstance(e, requests.Timeout):
                err = ConnectionTimeout(
                    "Connection timed out during request", errors=(e,)
                )
            elif isinstance(e, (ssl.SSLError, requests.exceptions.SSLError)):
                err = TlsError(str(e), errors=(e,))
            elif isinstance(e, BUILTIN_EXCEPTIONS):
                raise
            else:
                err = ConnectionError(str(e), errors=(e,))
            self._log_request(
                method=method,
                target=target,
                headers=request_headers,
                body=body,
                exception=err,
            )
            raise err from None

        meta = ApiResponseMeta(
            node=self.config,
            duration=duration,
            http_version="1.1",
            status=response.status_code,
            headers=response_headers,
        )
        self._log_request(
            method=method,
            target=target,
            headers=request_headers,
            body=body,
            meta=meta,
            response=data,
        )
        return NodeApiResponse(
            meta,
            data,
        )


RequestsHttpNode.perform_request = custom_perform_request


from elasticsearch import Elasticsearch


es = Elasticsearch(hosts=["http://xx.xx.xxx.xxx:9200"], basic_auth=("elastic", "xxxxxxxxxxxx"), node_class=RequestsHttpNode)
query = {"query": {"match_all": {}}}
response = es.search(index="xxxxxx_prod", body= query)
print(response)


http://www.kler.cn/a/446360.html

相关文章:

  • 杂七杂八的网络安全知识
  • go-zero负载均衡实现原理
  • 提炼关键词的力量:AI驱动下的SEO优化策略
  • electron-vite【实战系列教程】
  • 设计模式-读书笔记
  • python飞机大战游戏.py
  • VMware安装Ubuntu24.04以及安装好后初步使用配置!
  • CSS系列(27)- 图形与滤镜详解
  • List深拷贝后,数据还是被串改
  • 监控易:开启摄像头故障监控的卓越之钥
  • 挑战一个月基本掌握C++(第七天)了解指针,引用,时间,输入输出,结构体,vector容器,数据结构 - 通用完结
  • go 聊天系统项目-5 客户端发消息
  • Kubernetes(k8s)离线部署DolphinScheduler3.2.2
  • C# 动态组合判断条件对数据进行筛选
  • 大厂 Java 架构师面试题全解析
  • 【人工智能数学基础篇】——深入详解矩阵与向量运算及矩阵分解技术,打牢人工智能知识基础
  • OpenHarmony-4.HDI 框架
  • Github 2024-12-21 Rust开源项目日报 Top10
  • react websocket 全局访问和响应
  • Flink CDC 生产环境常用参数总结
  • 解决 Jenkins 克隆 GitHub 仓库时的 SSH 公钥认证问题
  • DC-9笔记
  • GitHub Copilot 现在可以免费使用了!
  • Leetcode Hot 100 【二叉树】104. 二叉树的最大深度
  • Docker 安装 禅道-21.2版本-外部数据库模式
  • 【day09】面向对象——静态成员和可变参数