【最后203篇系列】004 -Smarklink
说明
这个用来替代nginx。
最初是希望用nginx进行故障检测和负载均衡,花了很多时间,大致的结论是:nginx可以实现,但是是在商业版里。非得要找替代肯定可以搞出来,但是太麻烦了(即使是nginx本身的配置也很烦)。更重要的是,我突然意识到其实我对这方面的需求大幅提升了,nginx无法实现我最核心的功能。
于是我打算自己做一个网关。以下是两者的差别:
我并不是要做一个新的产品来替代nginx,只是需要对我的应用服务进行非常精细的控制,然后我问了下大模型,是否可行:
使用 Tornado 构建自定义应用网关 是一个非常有挑战性但也非常灵活的选择。通过 Tornado 的异步特性和强大的编程能力,你可以实现非常精细的控制,满足特定的业务需求。以下是实现这一目标的思路和建议:
1. Tornado 作为应用网关的核心功能
你可以利用 Tornado 实现以下网关功能:
反向代理
- 使用 Tornado 的
AsyncHTTPClient
或httpclient
模块,将客户端请求转发到后端服务。 - 示例:
from tornado.httpclient import AsyncHTTPClient class ProxyHandler(tornado.web.RequestHandler): async def get(self): http_client = AsyncHTTPClient() response = await http_client.fetch("http://backend-service" + self.request.uri) self.write(response.body)
负载均衡
- 实现自定义的负载均衡算法(如轮询、加权轮询、最小连接数等)。
- 示例:
backend_servers = ["http://backend1", "http://backend2", "http://backend3"] current_server = 0 class LoadBalancerHandler(tornado.web.RequestHandler): async def get(self): global current_server backend_url = backend_servers[current_server] current_server = (current_server + 1) % len(backend_servers) http_client = AsyncHTTPClient() response = await http_client.fetch(backend_url + self.request.uri) self.write(response.body)
动态路由
- 根据请求的路径、头部或其他条件,将请求路由到不同的后端服务。
- 示例:
class RouterHandler(tornado.web.RequestHandler): async def get(self): if self.request.path.startswith("/api/v1"): backend_url = "http://api-v1-service" elif self.request.path.startswith("/api/v2"): backend_url = "http://api-v2-service" else: backend_url = "http://default-service" http_client = AsyncHTTPClient() response = await http_client.fetch(backend_url + self.request.uri) self.write(response.body)
限流和限速
- 使用 Tornado 的异步特性实现请求速率限制。
- 示例:
from tornado.ioloop import IOLoop from tornado.web import RequestHandler from tornado.queues import Queue request_queue = Queue(maxsize=100) # 限制并发请求数 class RateLimitHandler(RequestHandler): async def get(self): await request_queue.put(None) try: # 处理请求 self.write("Hello, world!") finally: request_queue.get_nowait()
身份验证和授权
- 在网关层实现统一的身份验证(如 JWT 验证、OAuth 等)。
- 示例:
import jwt class AuthHandler(tornado.web.RequestHandler): def prepare(self): token = self.request.headers.get("Authorization") if not token: self.set_status(401) self.finish("Unauthorized") try: payload = jwt.decode(token, "secret_key", algorithms=["HS256"]) self.current_user = payload["user"] except jwt.InvalidTokenError: self.set_status(401) self.finish("Invalid token")
日志和监控
- 在网关层记录请求日志,并集成监控工具(如 Prometheus)。
- 示例:
class LoggingHandler(tornado.web.RequestHandler): def on_finish(self): print(f"Request: {self.request.method} {self.request.uri} - Status: {self.get_status()}")
2. Tornado 网关的优势
- 高度定制化:你可以完全控制网关的行为,实现特定的业务逻辑。
- 灵活性:Tornado 的异步特性使其能够高效处理高并发请求。
- 与 Python 生态无缝集成:可以轻松集成其他 Python 库(如数据库驱动、消息队列等)。
3. Tornado 网关的挑战
- 开发成本高:需要从头实现许多功能(如负载均衡、健康检查、限流等)。
- 性能瓶颈:Tornado 是 Python 编写的,性能可能不如 Nginx 这样的 C 语言实现。
- 维护复杂性:自定义网关需要更多的测试和维护工作。
4. 建议的架构
如果你决定使用 Tornado 构建网关,建议采用以下架构:
-
Nginx 作为前置网关:
- 处理 SSL/TLS 终止、静态文件缓存、DDoS 防护等。
- 将动态请求转发到 Tornado 网关。
-
Tornado 作为应用网关:
- 实现精细的流量控制、动态路由、身份验证等功能。
- 将请求转发到后端的微服务或应用服务器。
-
监控和日志:
- 集成 Prometheus、Grafana 等工具,实时监控网关性能。
- 使用 ELK(Elasticsearch、Logstash、Kibana)堆栈进行日志分析。
5. 示例代码
以下是一个简单的 Tornado 网关示例:
import tornado.ioloop
import tornado.web
from tornado.httpclient import AsyncHTTPClient
class ProxyHandler(tornado.web.RequestHandler):
async def get(self):
backend_url = "http://backend-service" + self.request.uri
http_client = AsyncHTTPClient()
response = await http_client.fetch(backend_url)
self.write(response.body)
def make_app():
return tornado.web.Application([
(r"/.*", ProxyHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
总结
使用 Tornado 构建自定义应用网关是可行的,尤其适合需要高度定制化的场景。你可以实现反向代理、负载均衡、动态路由、限流、身份验证等功能,但需要注意开发成本和性能瓶颈。如果可能,建议将 Nginx 作为前置网关,Tornado 作为应用网关,结合两者的优势,构建高性能、高可用的系统。
看看性能是不是ok:
所以,总体上tornado在性能上可能会比nginx低一个量级:
- 1 简单的基础服务可以继续走nginx,或者直连,这部分假设无问题。
- 2 复杂的、核心的服务走smartlink。这部分的处理本身就非常“昂贵”,比如用大模型处理数据,天然的TPS就比较低,smartlink额外的开销比起来不算啥,是可行的(或者说现在的计算资源太多了,费点没啥)。即使是较低配的生产机也应该有8个核,花1个核做网关在我看来是值得的。
smartlink 可以带来什么:
- 1 权限控制。过去我一般不去设置权限,这样开发和使用比较方便,但这一步肯定是要做的。通过smartlink,仅开放一个对外端口,访问其他服务时必须提供对应口令,在smartlink完成鉴权。
- 2 流量及质量。现在有一类服务要求比较高,按照固定周期统计可能会有些滞后。通过smartlink(结合redis), 可以实时的对流量进行控制,获取请求的响应头,将成功(2xx)、失败(5xx)等不同类型的错误进行实时统计和通知。另外可以统计实时的影响速度,由别的算法应用读取,来进行分发优化。
- 3 优化与强化。这个问题是最为核心且重要的,当我们花费成千上万的人民币在进行数据的处理时,我们知道做的不够好。通常来说,算法很难在第一版达到完美。然而当服务进入生产状态时,我们又很难去停下来再次改进;即使我们离线改好了新版算法,又担心上线时出bug。这样的最终结果是:不出问题不改进。smartlink在增加cpu开销时,也给了足够强大的控制逻辑,可以解决这些问题。 这里不讨论细节,只看机制上的可行性。当请求到达时,smarklink会按照实时加载的策略(通过redis)决定请求是否可以多发。比如除了将请求实时转向对应的稳定版服务,也发到kafka。收到请求的处理结果后,除了返回用户,也发一份到kafka。这个过程会增加2次写kafka的操作,对于秒级级的请求来说是可以接受的(假设请求30秒超时)。数据到了kafka之后,可以有多个消费者准实时的执行新版本服务的请求,然后和当前服务的结果一并进行效果评估与比对。首先会排除的问题是bug引起的请求失败,然后新版本服务可以准实时的进行评估、打标(RLHF)。评估结束后,可以采用手动/自动的方法,修改redis中的数据,从而使请求平滑切换到新的服务。这个服务是经过了效果评估和上线测试的,不会有低级错误。
内容
本次的目标先实现几个基本功能
功能:
- 1 转发
- 2 health-check
- 3 负载均衡
1 测试样例
- 1 转发
在一台主机上,启动一个微服务(实体识别),通过smartlink转发服务到此位置
- 2 health-check
当请求报连接错误时,smartlink会将对应服务记为不可用;当服务恢复时,会将其标记为可用。
- 3 负载均衡(least time)
再启动一个服务,加入并在服务中进行时间扰动。要求smartlink在将服务转发时会依据其响应时间动态分配。
2 实验
我发现今天可能写不完全部了,内容有点多,我先把完成的有部分整理上。
2.1 基础配置部分 base_config.py
这里先手动配置了一个测试服务。假设每个服务的可用资源是一个字典, 并且认为一定有一个默认的服务版本(base)
...
base_config.server_dict = {}
base_config.server_dict['test_server'] = {}
base_config.server_dict['test_server']['base'] = "http://IP:PORT/get_lng_lat/"
2.2 数据模型 data_model.py
约定(建议性质)了模型的输入和输出。本来是想强制的,但那样难度太高了,而且作为一个网关,目前的主要任务还是能够进行有效转发。未来可以尽量往这个标准靠。
# 服务输入
class StandardInput(BaseModel):
server_name : str # 要请求的服务名
params : dict = {} # 请求参数
data : list = []# 请求数据 - 一般以lod的方式输入
# 服务输出
class StandardOutput(BaseModel):
server_name: str = 'default'
status : bool = True
msg : str = 'ok'
data : list = [] # 返回数据 - 以lod的方式返回
duration : int = 0 # ms
2.3 服务 server.py
2.3.1 基本部分导入部分
import tornado.httpserver # http服务器
import tornado.ioloop # ?
import tornado.options # 指定服务端口和路径解析
import tornado.web # web模块
from tornado.options import define, options
import os.path # 获取和生成template文件路径
import tornado.httpclient
from Basefuncs import *
from base_config import base_config
from data_model import *
# 配置日志
logger = get_base_logger()
import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, datetime):
return obj.__str__()
if isinstance(obj, dd.timedelta):
return obj.__str__()
else:
return super(MyEncoder, self).default(obj)
2.3.2 基础转发
# 基础转发处理器
class BaseForwardHandler(tornado.web.RequestHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "Content-Type")
self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")
def options(self):
self.set_status(204)
self.finish()
# 被转发的请求
async def forward_request(self, target_url, body=None, headers=None):
# 获取原始请求的 body 和 headers
# 如果没有传递 body 和 headers,则使用原始请求的 body 和 headers
if body is None:
body = self.request.body
if headers is None:
headers = self.request.headers
# 使用 Tornado 的异步 HTTP 客户端转发请求
http_client = tornado.httpclient.AsyncHTTPClient()
try:
logger.info(f"Forwarding request to {target_url}")
response = await http_client.fetch(
target_url,
method="POST",
headers=headers,
body=body
)
self.write(response.body)
except tornado.httpclient.HTTPError as e:
logger.error(f"HTTP Error: {e.code} - {str(e)}")
self.set_status(e.code)
self.write(json.dumps({"error": str(e)}))
# 触发告警通知
await self.trigger_alert(e)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
self.set_status(500)
self.write(json.dumps({"error": "Internal Server Error"}))
# 触发告警通知
await self.trigger_alert(e)
async def trigger_alert(self, error):
"""
触发告警通知的逻辑
"""
alert_message = f"Service Error: {str(error)}"
logger.error(f"Sending alert: {alert_message}")
# 示例:发送告警到内部系统(如邮件、Slack、Webhook 等)
try:
# 假设我们有一个发送告警的函数
await self.send_alert_to_internal_system(alert_message)
except Exception as e:
logger.error(f"Failed to send alert: {str(e)}")
async def send_alert_to_internal_system(self, message):
"""
发送告警到内部系统的逻辑
"""
# 示例:使用 HTTP 客户端发送告警到 Webhook
http_client = tornado.httpclient.AsyncHTTPClient()
alert_url = "http://internal-alert-system/endpoint"
body = json.dumps({"message": message})
try:
await http_client.fetch(
alert_url,
method="POST",
headers={"Content-Type": "application/json"},
body=body
)
logger.info("Alert sent successfully")
except Exception as e:
logger.error(f"Failed to send alert: {str(e)}")
raise e
2.3.3 应用的转发逻辑
import time
class AIServer(BaseForwardHandler):
print('AIServer Working ', get_time_str1())
async def post(self):
standard_output = StandardOutput()
print('解析请求体')
# 1. 解析请求体
try:
request_data = json.loads(self.request.body)
print("Parsed request data:", request_data)
except json.JSONDecodeError:
self.set_status(400)
standard_output.status = False
standard_output.msg = "Invalid JSON format"
self.write(json.dumps(standard_output.dict()))
return
# 2. 验证请求格式
try:
standard_input = StandardInput(**request_data)
except (TypeError, ValueError) as e:
# 如果 request_data 不符合 StandardInput 的要求,捕获 TypeError 或 ValueError
self.set_status(400)
standard_output.status = False
standard_output.msg = f"Invalid input data: {str(e)}"
self.write(json.dumps(standard_output.dict()))
return
# 3.转发请求
target_server_dict = base_config.server_dict.get(standard_input.server_name)
if target_server_dict is None:
self.set_status(400)
standard_output.status = False
standard_output.msg = f"Invalid input data: No Such Service {standard_input.server_name}"
self.write(json.dumps(standard_output.dict()))
return
# 这里控制版本
await self.forward_request(target_server_dict['base'])
然后进行调用
请求端:
import httpx
data_dict = {'server_name':'test_server', 'address': '上海九六广场'}
resp = httpx.post('http://localhost:9001/ai_server/' ,json = data_dict)
In [31]: resp.json()
Out[31]:
{'name': 'get_lng_lat',
'status': True,
'msg': 'ok',
'duration': 86,
'data': {'status': 0,
'lng': 121.5312514401146,
'lat': 31.23340608690243,
'precise': 1,
'confidence': 75,
'comprehension': 100,
'level': '购物'}}
服务端:
...
解析请求体
Parsed request data: {'server_name': 'test_server', 'address': '上海九六广场'}
2025-02-02 01:09:04 - my_logger - INFO - Forwarding request to http://IP/get_lng_lat/
2025-02-02 01:09:04 - tornado.access - INFO - 200 POST /ai_server/ (127.0.0.1) 158.67ms
...
可以看到转发成功了,原始的请求大约80ms,因为转发带来的额外延时大约也是80ms。所以如果对于实时性要求特别高的可能还需要注意。后续再加上其他的额外处理,单个请求最终的延时可能到达200ms,甚至500ms。对于类似大模型之类的调用来说,一般会花费数秒到数十秒来处理一个请求,这种开销就可以接受。