对接马来西亚、印度、韩国、越南等全球金融数据示例
Python对接StockTV全球金融数据API的封装实现及使用教程:
import requests
import websockets
import asyncio
from typing import Dict, List, Optional, Union
from datetime import datetime
class StockTVClient:
"""
StockTV全球金融数据API客户端
支持股票、外汇、期货、加密货币等市场数据
"""
BASE_URL = "https://api.stocktv.top"
WS_URL = "wss://ws-api.stocktv.top/connect"
def __init__(self, api_key: str):
"""
初始化客户端
:param api_key: API密钥,需通过官方渠道获取
"""
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({"User-Agent": "StockTV-PythonClient/1.0"})
def _handle_response(self, response: requests.Response) -> Union[Dict, List]:
"""统一处理API响应"""
if response.status_code != 200:
raise Exception(f"API请求失败,状态码:{response.status_code},响应:{response.text}")
return response.json()
# ------------------ 股票市场接口 ------------------
def get_stock_markets(
self,
country_id: int,
page: int = 1,
page_size: int = 10
) -> Dict:
"""
获取股票市场列表
:param country_id: 国家ID(例如14代表印度)
:param page: 页码
:param page_size: 每页数量
"""
endpoint = "/stock/stocks"
params = {
"countryId": country_id,
"page": page,
"pageSize": page_size,
"key": self.api_key
}
response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
return self._handle_response(response)
def get_stock_kline(
self,
pid: int,
interval: str = "PT15M",
start_time: Optional[int] = None,
end_time: Optional[int] = None
) -> List[Dict]:
"""
获取股票K线数据
:param pid: 产品ID
:param interval: 时间间隔(PT5M, PT15M, PT1H等)
:param start_time: 开始时间戳(可选)
:param end_time: 结束时间戳(可选)
"""
endpoint = "/stock/kline"
params = {
"pid": pid,
"interval": interval,
"key": self.api_key
}
if start_time:
params["startTime"] = start_time
if end_time:
params["endTime"] = end_time
response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
return self._handle_response(response)
# ------------------ 外汇接口 ------------------
def get_forex_rates(self, base_currency: str = "USD") -> Dict:
"""获取实时外汇汇率"""
endpoint = "/market/currencyList"
params = {"key": self.api_key}
response = self.session.get(f"{self.BASE_URL}{endpoint}", params=params)
data = self._handle_response(response)
return data.get("conversions", {}).get(base_currency, {})
# ------------------ WebSocket实时数据 ------------------
async def websocket_client(self, callback):
"""
WebSocket实时数据客户端
:param callback: 数据处理回调函数
"""
url = f"{self.WS_URL}?key={self.api_key}"
async with websockets.connect(url) as ws:
while True:
try:
data = await ws.recv()
await callback(json.loads(data))
# 发送心跳保持连接
await asyncio.sleep(30)
await ws.send("ping")
except Exception as e:
print(f"WebSocket错误: {str(e)}")
break
# ================== 使用示例 ==================
if __name__ == "__main__":
API_KEY = "YOUR_API_KEY" # 替换为实际API密钥
# 初始化客户端
client = StockTVClient(API_KEY)
# 示例1:获取印度股票市场列表
india_stocks = client.get_stock_markets(country_id=14)
print(f"印度股票市场数据:{india_stocks['data']['records'][0]}")
# 示例2:获取股票K线数据
kline_data = client.get_stock_kline(pid=7310, interval="PT1H")
print(f"最新K线数据:{kline_data['data'][-1]}")
# 示例3:WebSocket实时数据
async def handle_realtime_data(data):
"""处理实时数据回调函数"""
print(f"实时更新:{data}")
async def main():
await client.websocket_client(handle_realtime_data)
# 运行WebSocket客户端
asyncio.get_event_loop().run_until_complete(main())
关键功能说明:
- 模块化设计:
- 使用面向对象封装,方便扩展和维护
- 分离不同市场接口(股票、外汇等)
- 统一响应处理机制
- 类型提示:
- 参数和返回值均使用Python类型提示
- 提高代码可读性和IDE支持
- 错误处理:
- 统一HTTP响应处理
- WebSocket自动重连机制
- 异常捕获和提示
- 高级功能:
- 支持同步HTTP请求和异步WebSocket
- 灵活的时间参数处理(支持时间戳)
- 可配置的分页参数
- 最佳实践:
- 使用requests.Session保持连接池
- 自定义User-Agent标识
- 完善的文档字符串
- 符合PEP8编码规范
扩展建议:
- 缓存机制:
from functools import lru_cache
class StockTVClient:
@lru_cache(maxsize=128)
def get_stock_info(self, pid: int):
"""带缓存的股票信息查询"""
# 实现代码...
- 异步HTTP请求:
import aiohttp
async def async_get_stock_markets(self, country_id: int):
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
return await response.json()
- 数据转换工具:
def convert_kline_to_dataframe(kline_data: List) -> pd.DataFrame:
"""将K线数据转换为Pandas DataFrame"""
return pd.DataFrame(
kline_data,
columns=["timestamp", "open", "high", "low", "close", "volume", "vo"]
).set_index('timestamp')
使用注意事项:
- 申请并妥善保管API密钥
- 遵守API调用频率限制
- 处理时区转换(所有时间戳为UTC)
- 使用try/except块捕获潜在异常
- 生产环境建议添加重试机制
完整项目应包含:
- 单元测试
- 日志记录
- 配置文件管理
- 更完善的类型定义
- API文档生成(使用Sphinx等)
该实现为开发者提供了可靠的数据接入基础,可根据具体需求进一步扩展功能模块。建议配合官方文档使用,及时关注API更新。