性能测试 - Locust WebSocket client
Max.Bai
2024.10
0. 背景
Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。
1. WebSocket test Client
“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optional
import websocket
from locust import events
logger = logging.getLogger(__name__)
class WebSocketClient:
def __init__(self, host: str, log_messages: bool = False):
self._host: str = host
self._id: str = secrets.token_hex(8)
self._alias: Optional[str] = None
self._ws: Optional[websocket.WebSocketApp] = None
self.log_messages = log_messages
self.count_recv_type = False
self.heartbeat_auto_respond = False
self._recv_messages: list = []
self.messages: list = []
self._sent_messages: list = []
def __enter__(self):
self.connect()
return self
def __exit__(self, type, value, traceback):
self.disconnect()
@property
def tag(self) -> str:
tag = f"{self._host} <{self._id}>"
if self._alias:
tag += f"({self._alias})"
return tag
def connect(
self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None
):
if not self._ws:
self._alias = alias
self._ws = websocket.WebSocketApp(
url=self._host,
header=headers,
on_open=self._on_open,
on_message=on_message if on_message else self._on_message,
on_close=self._on_close,
)
thread = threading.Thread(target=self._ws.run_forever)
thread.daemon = True
thread.start()
time.sleep(3)
else:
logger.warning("An active WebSocket connection is already established.")
def is_connected(self) -> bool:
return self._ws is not None
def disconnect(self):
if self._ws:
self._ws.close()
self._alias = None
else:
logger.warning("No active WebSocket connection established.")
def _on_open(self, ws):
logger.debug(f"[WebSocket] {self.tag} connected.")
events.request.fire(
request_type="ws_client",
name="connect",
response_time=0,
response_length=0,
)
def _on_message(self, ws, message):
recv_time = time.time()
recv_time_ms = int(recv_time * 1000)
recv_time_ns = int(recv_time * 1000000)
logger.debug(f"[WebSocket] {self.tag} message received: {message}")
if self.log_messages:
self._recv_messages.append(message)
self.messages.append(message)
# public/respond-heartbeat
if self.heartbeat_auto_respond:
if "public/heartbeat" in message:
self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))
if self.count_recv_type:
try:
msg = json.loads(message)
id = str(msg.get("id", 0))
if len(id) == 13:
resp_time = recv_time_ms - int(id)
elif len(id) == 16:
resp_time = (recv_time_ns - int(id)) / 1000
elif len(id) > 13:
resp_time = recv_time_ms - int(id[:13])
else:
resp_time = 0
method = msg.get("method", "unknown")
code = msg.get("code", "unknown")
error = msg.get("message", "unknown")
# send_time = int(msg.get("nonce", 0))
if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:
events.request.fire(
request_type="ws_client",
name=f"recv {method}",
response_time=0,
response_length=len(msg),
)
elif code == 0:
events.request.fire(
request_type="ws_client",
name=f"recv {method} {code}",
# response_time=recv_time - send_time,
response_time=resp_time,
response_length=len(msg),
)
else:
events.request.fire(
request_type="ws_client",
name=f"recv {method} {code}",
response_time=resp_time,
response_length=len(msg),
exception=error,
)
except Exception as e:
events.request.fire(
request_type="ws_client",
name="recv error",
response_time=0,
response_length=len(msg),
exception=str(e),
)
def _on_close(self, ws, close_status_code, close_msg):
logger.debug(f"[WebSocket] {self.tag} closed.")
self._ws = None
events.request.fire(
request_type="ws_client",
name="close",
response_time=0,
response_length=0,
)
def set_on_message(self, on_message: Callable):
self._ws.on_message = on_message
def send(self, message: str):
if self._ws:
self._ws.send(data=message)
if self.log_messages:
self._sent_messages.append(message)
logger.debug(f"[WebSocket] {self.tag} message sent: {message}")
else:
logger.warning(f"No active [WebSocket] {self.tag} connection established.")
raise ConnectionError("No active [WebSocket] connection established.")
def clear(self):
self._recv_messages = []
self._sent_messages = []
self.messages = []
def expect_messages(
self,
matcher: Callable[..., bool],
count: int = 1,
timeout: int = 10,
interval: int = 1,
) -> list:
"""Expect to receive one or more filtered messages.
Args:
matcher (Callable): A matcher function used to filter the received messages.
count (int, optional): Number of messages to be expected before timeout. Defaults to 1.
timeout (int, optional): Timeout in seconds. Defaults to 10.
interval (int, optional): Interval in seconds. Defaults to 1.
Returns:
list: A list of messages filtered by the matcher.
"""
deadline: float = time.time() + timeout
result: list = [] # messages filtered by the matcher
seen: list = [] # messages already seen by the matcher to be excluded from further matching
while time.time() < deadline:
snapshot: list = [*self._recv_messages]
for element in seen:
if element in snapshot:
snapshot.remove(element)
result.extend(filter(matcher, snapshot))
if len(result) >= count:
break
seen.extend(snapshot)
time.sleep(interval)
if len(result) < count:
logger.warning(
f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages."
)
return result
2. 如何使用
class PrivateWsUser(User):
def on_start(self):
self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)
self.ws_client.connect()
@task
def send_hello()
self.ws_client.send("hello world")
3. 扩展
可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理