当前位置: 首页 > article >正文

【最后203篇系列】004 -Smarklink

说明

这个用来替代nginx。

最初是希望用nginx进行故障检测和负载均衡,花了很多时间,大致的结论是:nginx可以实现,但是是在商业版里。非得要找替代肯定可以搞出来,但是太麻烦了(即使是nginx本身的配置也很烦)。更重要的是,我突然意识到其实我对这方面的需求大幅提升了,nginx无法实现我最核心的功能。

于是我打算自己做一个网关。以下是两者的差别:
在这里插入图片描述

我并不是要做一个新的产品来替代nginx,只是需要对我的应用服务进行非常精细的控制,然后我问了下大模型,是否可行:

使用 Tornado 构建自定义应用网关 是一个非常有挑战性但也非常灵活的选择。通过 Tornado 的异步特性和强大的编程能力,你可以实现非常精细的控制,满足特定的业务需求。以下是实现这一目标的思路和建议:


1. Tornado 作为应用网关的核心功能

你可以利用 Tornado 实现以下网关功能:

反向代理
  • 使用 Tornado 的 AsyncHTTPClienthttpclient 模块,将客户端请求转发到后端服务。
  • 示例:
    from tornado.httpclient import AsyncHTTPClient
    
    class ProxyHandler(tornado.web.RequestHandler):
        async def get(self):
            http_client = AsyncHTTPClient()
            response = await http_client.fetch("http://backend-service" + self.request.uri)
            self.write(response.body)
    
负载均衡
  • 实现自定义的负载均衡算法(如轮询、加权轮询、最小连接数等)。
  • 示例:
    backend_servers = ["http://backend1", "http://backend2", "http://backend3"]
    current_server = 0
    
    class LoadBalancerHandler(tornado.web.RequestHandler):
        async def get(self):
            global current_server
            backend_url = backend_servers[current_server]
            current_server = (current_server + 1) % len(backend_servers)
            http_client = AsyncHTTPClient()
            response = await http_client.fetch(backend_url + self.request.uri)
            self.write(response.body)
    
动态路由
  • 根据请求的路径、头部或其他条件,将请求路由到不同的后端服务。
  • 示例:
    class RouterHandler(tornado.web.RequestHandler):
        async def get(self):
            if self.request.path.startswith("/api/v1"):
                backend_url = "http://api-v1-service"
            elif self.request.path.startswith("/api/v2"):
                backend_url = "http://api-v2-service"
            else:
                backend_url = "http://default-service"
            http_client = AsyncHTTPClient()
            response = await http_client.fetch(backend_url + self.request.uri)
            self.write(response.body)
    
限流和限速
  • 使用 Tornado 的异步特性实现请求速率限制。
  • 示例:
    from tornado.ioloop import IOLoop
    from tornado.web import RequestHandler
    from tornado.queues import Queue
    
    request_queue = Queue(maxsize=100)  # 限制并发请求数
    
    class RateLimitHandler(RequestHandler):
        async def get(self):
            await request_queue.put(None)
            try:
                # 处理请求
                self.write("Hello, world!")
            finally:
                request_queue.get_nowait()
    
身份验证和授权
  • 在网关层实现统一的身份验证(如 JWT 验证、OAuth 等)。
  • 示例:
    import jwt
    
    class AuthHandler(tornado.web.RequestHandler):
        def prepare(self):
            token = self.request.headers.get("Authorization")
            if not token:
                self.set_status(401)
                self.finish("Unauthorized")
            try:
                payload = jwt.decode(token, "secret_key", algorithms=["HS256"])
                self.current_user = payload["user"]
            except jwt.InvalidTokenError:
                self.set_status(401)
                self.finish("Invalid token")
    
日志和监控
  • 在网关层记录请求日志,并集成监控工具(如 Prometheus)。
  • 示例:
    class LoggingHandler(tornado.web.RequestHandler):
        def on_finish(self):
            print(f"Request: {self.request.method} {self.request.uri} - Status: {self.get_status()}")
    

2. Tornado 网关的优势

  • 高度定制化:你可以完全控制网关的行为,实现特定的业务逻辑。
  • 灵活性:Tornado 的异步特性使其能够高效处理高并发请求。
  • 与 Python 生态无缝集成:可以轻松集成其他 Python 库(如数据库驱动、消息队列等)。

3. Tornado 网关的挑战

  • 开发成本高:需要从头实现许多功能(如负载均衡、健康检查、限流等)。
  • 性能瓶颈:Tornado 是 Python 编写的,性能可能不如 Nginx 这样的 C 语言实现。
  • 维护复杂性:自定义网关需要更多的测试和维护工作。

4. 建议的架构

如果你决定使用 Tornado 构建网关,建议采用以下架构:

  1. Nginx 作为前置网关

    • 处理 SSL/TLS 终止、静态文件缓存、DDoS 防护等。
    • 将动态请求转发到 Tornado 网关。
  2. Tornado 作为应用网关

    • 实现精细的流量控制、动态路由、身份验证等功能。
    • 将请求转发到后端的微服务或应用服务器。
  3. 监控和日志

    • 集成 Prometheus、Grafana 等工具,实时监控网关性能。
    • 使用 ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志分析。

5. 示例代码

以下是一个简单的 Tornado 网关示例:

import tornado.ioloop
import tornado.web
from tornado.httpclient import AsyncHTTPClient

class ProxyHandler(tornado.web.RequestHandler):
    async def get(self):
        backend_url = "http://backend-service" + self.request.uri
        http_client = AsyncHTTPClient()
        response = await http_client.fetch(backend_url)
        self.write(response.body)

def make_app():
    return tornado.web.Application([
        (r"/.*", ProxyHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

总结

使用 Tornado 构建自定义应用网关是可行的,尤其适合需要高度定制化的场景。你可以实现反向代理、负载均衡、动态路由、限流、身份验证等功能,但需要注意开发成本和性能瓶颈。如果可能,建议将 Nginx 作为前置网关,Tornado 作为应用网关,结合两者的优势,构建高性能、高可用的系统。

看看性能是不是ok:
在这里插入图片描述

在这里插入图片描述

所以,总体上tornado在性能上可能会比nginx低一个量级:

  • 1 简单的基础服务可以继续走nginx,或者直连,这部分假设无问题。
  • 2 复杂的、核心的服务走smartlink。这部分的处理本身就非常“昂贵”,比如用大模型处理数据,天然的TPS就比较低,smartlink额外的开销比起来不算啥,是可行的(或者说现在的计算资源太多了,费点没啥)。即使是较低配的生产机也应该有8个核,花1个核做网关在我看来是值得的。

smartlink 可以带来什么:

  • 1 权限控制。过去我一般不去设置权限,这样开发和使用比较方便,但这一步肯定是要做的。通过smartlink,仅开放一个对外端口,访问其他服务时必须提供对应口令,在smartlink完成鉴权。
  • 2 流量及质量。现在有一类服务要求比较高,按照固定周期统计可能会有些滞后。通过smartlink(结合redis), 可以实时的对流量进行控制,获取请求的响应头,将成功(2xx)、失败(5xx)等不同类型的错误进行实时统计和通知。另外可以统计实时的影响速度,由别的算法应用读取,来进行分发优化。
  • 3 优化与强化。这个问题是最为核心且重要的,当我们花费成千上万的人民币在进行数据的处理时,我们知道做的不够好。通常来说,算法很难在第一版达到完美。然而当服务进入生产状态时,我们又很难去停下来再次改进;即使我们离线改好了新版算法,又担心上线时出bug。这样的最终结果是:不出问题不改进。smartlink在增加cpu开销时,也给了足够强大的控制逻辑,可以解决这些问题。 这里不讨论细节,只看机制上的可行性。当请求到达时,smarklink会按照实时加载的策略(通过redis)决定请求是否可以多发。比如除了将请求实时转向对应的稳定版服务,也发到kafka。收到请求的处理结果后,除了返回用户,也发一份到kafka。这个过程会增加2次写kafka的操作,对于秒级级的请求来说是可以接受的(假设请求30秒超时)。数据到了kafka之后,可以有多个消费者准实时的执行新版本服务的请求,然后和当前服务的结果一并进行效果评估与比对。首先会排除的问题是bug引起的请求失败,然后新版本服务可以准实时的进行评估、打标(RLHF)。评估结束后,可以采用手动/自动的方法,修改redis中的数据,从而使请求平滑切换到新的服务。这个服务是经过了效果评估和上线测试的,不会有低级错误。

内容

本次的目标先实现几个基本功能

功能:

  • 1 转发
  • 2 health-check
  • 3 负载均衡

1 测试样例

  • 1 转发

在一台主机上,启动一个微服务(实体识别),通过smartlink转发服务到此位置

  • 2 health-check

当请求报连接错误时,smartlink会将对应服务记为不可用;当服务恢复时,会将其标记为可用。

  • 3 负载均衡(least time)

再启动一个服务,加入并在服务中进行时间扰动。要求smartlink在将服务转发时会依据其响应时间动态分配。

2 实验

我发现今天可能写不完全部了,内容有点多,我先把完成的有部分整理上。

2.1 基础配置部分 base_config.py

这里先手动配置了一个测试服务。假设每个服务的可用资源是一个字典, 并且认为一定有一个默认的服务版本(base)

...
base_config.server_dict = {}
base_config.server_dict['test_server'] = {}
base_config.server_dict['test_server']['base'] = "http://IP:PORT/get_lng_lat/" 

2.2 数据模型 data_model.py

约定(建议性质)了模型的输入和输出。本来是想强制的,但那样难度太高了,而且作为一个网关,目前的主要任务还是能够进行有效转发。未来可以尽量往这个标准靠。

# 服务输入
class StandardInput(BaseModel):
    server_name : str  # 要请求的服务名
    params : dict = {} #  请求参数
    data : list = []# 请求数据 - 一般以lod的方式输入


# 服务输出
class StandardOutput(BaseModel):
    server_name: str = 'default'
    status : bool  = True
    msg : str  = 'ok'
    data : list = [] # 返回数据 - 以lod的方式返回
    duration : int = 0 # ms 

2.3 服务 server.py

2.3.1 基本部分导入部分

import tornado.httpserver  # http服务器
import tornado.ioloop  # ?
import tornado.options  # 指定服务端口和路径解析
import tornado.web  # web模块
from tornado.options import define, options
import os.path  # 获取和生成template文件路径
import tornado.httpclient  
from Basefuncs import * 
from base_config import base_config
from data_model import * 
# 配置日志
logger = get_base_logger()

import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, datetime):
            return obj.__str__()
        if isinstance(obj, dd.timedelta):
            return obj.__str__()
        else:
            return super(MyEncoder, self).default(obj)

2.3.2 基础转发

# 基础转发处理器
class BaseForwardHandler(tornado.web.RequestHandler):
    def set_default_headers(self):
        self.set_header("Access-Control-Allow-Origin", "*")
        self.set_header("Access-Control-Allow-Headers", "Content-Type")
        self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")

    def options(self):
        self.set_status(204)
        self.finish()

    # 被转发的请求
    async def forward_request(self, target_url, body=None, headers=None):
        # 获取原始请求的 body 和 headers
        # 如果没有传递 body 和 headers,则使用原始请求的 body 和 headers
        if body is None:
            body = self.request.body
        if headers is None:
            headers = self.request.headers

        # 使用 Tornado 的异步 HTTP 客户端转发请求
        http_client = tornado.httpclient.AsyncHTTPClient()
        try:
            logger.info(f"Forwarding request to {target_url}")
            response = await http_client.fetch(
                target_url,
                method="POST",
                headers=headers,
                body=body
            )
            self.write(response.body)

        except tornado.httpclient.HTTPError as e:
            logger.error(f"HTTP Error: {e.code} - {str(e)}")
            self.set_status(e.code)
            self.write(json.dumps({"error": str(e)}))
            # 触发告警通知
            await self.trigger_alert(e)
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            self.set_status(500)
            self.write(json.dumps({"error": "Internal Server Error"}))
            # 触发告警通知
            await self.trigger_alert(e)


    async def trigger_alert(self, error):
        """
        触发告警通知的逻辑
        """
        alert_message = f"Service Error: {str(error)}"
        logger.error(f"Sending alert: {alert_message}")
        
        # 示例:发送告警到内部系统(如邮件、Slack、Webhook 等)
        try:
            # 假设我们有一个发送告警的函数
            await self.send_alert_to_internal_system(alert_message)
        except Exception as e:
            logger.error(f"Failed to send alert: {str(e)}")

    async def send_alert_to_internal_system(self, message):
        """
        发送告警到内部系统的逻辑
        """
        # 示例:使用 HTTP 客户端发送告警到 Webhook
        http_client = tornado.httpclient.AsyncHTTPClient()
        alert_url = "http://internal-alert-system/endpoint"
        body = json.dumps({"message": message})
        try:
            await http_client.fetch(
                alert_url,
                method="POST",
                headers={"Content-Type": "application/json"},
                body=body
            )
            logger.info("Alert sent successfully")
        except Exception as e:
            logger.error(f"Failed to send alert: {str(e)}")
            raise e

2.3.3 应用的转发逻辑

import time 
class AIServer(BaseForwardHandler):
    print('AIServer Working ', get_time_str1())
    async def post(self):
        standard_output = StandardOutput()
        print('解析请求体')
        # 1. 解析请求体
        try:
            request_data = json.loads(self.request.body)
            print("Parsed request data:", request_data)
        except json.JSONDecodeError:
            self.set_status(400)
            standard_output.status = False 
            standard_output.msg  = "Invalid JSON format"

            self.write(json.dumps(standard_output.dict()))
            return

        # 2. 验证请求格式
        try:
            standard_input = StandardInput(**request_data)
        except (TypeError, ValueError) as e:
            # 如果 request_data 不符合 StandardInput 的要求,捕获 TypeError 或 ValueError
            self.set_status(400)
            standard_output.status = False 
            standard_output.msg  = f"Invalid input data: {str(e)}"

            self.write(json.dumps(standard_output.dict()))
            return

        # 3.转发请求
        target_server_dict =  base_config.server_dict.get(standard_input.server_name)
        if target_server_dict is None:
            self.set_status(400)
            standard_output.status = False 
            standard_output.msg  = f"Invalid input data: No Such Service {standard_input.server_name}"
            self.write(json.dumps(standard_output.dict()))
            return

        # 这里控制版本
        await self.forward_request(target_server_dict['base'])

然后进行调用

请求端:

import httpx
data_dict =   {'server_name':'test_server', 'address': '上海九六广场'}
resp = httpx.post('http://localhost:9001/ai_server/' ,json = data_dict)
In [31]: resp.json()
Out[31]:
{'name': 'get_lng_lat',
 'status': True,
 'msg': 'ok',
 'duration': 86,
 'data': {'status': 0,
  'lng': 121.5312514401146,
  'lat': 31.23340608690243,
  'precise': 1,
  'confidence': 75,
  'comprehension': 100,
  'level': '购物'}}

服务端:

...
解析请求体
Parsed request data: {'server_name': 'test_server', 'address': '上海九六广场'}
2025-02-02 01:09:04 - my_logger - INFO - Forwarding request to http://IP/get_lng_lat/
2025-02-02 01:09:04 - tornado.access - INFO - 200 POST /ai_server/ (127.0.0.1) 158.67ms
...

可以看到转发成功了,原始的请求大约80ms,因为转发带来的额外延时大约也是80ms。所以如果对于实时性要求特别高的可能还需要注意。后续再加上其他的额外处理,单个请求最终的延时可能到达200ms,甚至500ms。对于类似大模型之类的调用来说,一般会花费数秒到数十秒来处理一个请求,这种开销就可以接受。


http://www.kler.cn/a/530463.html

相关文章:

  • 深入浅出并查集(不相交集合实现思路)
  • ASP.NET Core 启动并提供静态文件
  • 【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】1.27 线性代数王国:矩阵分解实战指南
  • 工具的应用——安装copilot
  • Mysql的主从复制及扩展功能
  • 从0到1:C++ 开启游戏开发奇幻之旅(二)
  • 蓝桥杯C语言程序设计赛备赛指南
  • 2025年2月2日(tcp3次握手4次挥手)
  • 【UE】 APlayerState
  • elasticsearch8.15 高可用集群搭建(含认证Kibana)
  • 代码讲解系列-CV(一)——CV基础框架
  • 如何运行Composer安装PHP包 安装JWT库
  • 面试题整理:Java多线程(二)多线程、死锁、乐观锁悲观锁、线程池
  • 002 mapper代理开发方式-xml方式
  • ArkTS渲染控制
  • 31不同整数的技术问题-青训营刷题
  • LeetCode 404.左叶子之和
  • MySQL 进阶专题:笛卡尔积内连接外连接详解(JOIN ON/ OUTER JOIN)
  • C++进阶: 红黑树及map与set封装
  • 智慧物业管理系统实现社区管理智能化提升居民生活体验与满意度
  • 垃圾回收之弱引用+实例说明+weakSet+weakMap
  • 硕成C语言4
  • 【Conda 和 虚拟环境详细指南】
  • 飞行汽车与人形机器人,无刷外转子电机与无框力矩电机,接小鹏飞行汽车后续
  • Rust 数据类型详解:Scalar 与 Compound 类型
  • Hot100之二分查找