当前位置: 首页 > article >正文

【最后203篇系列】005 -QTV200 Online

说明

借着春节休假,把这部分完工,然后2025年将正式的把量化研究的成果进行产品化输出。

首先,我会将策略的执行从脚本挪到服务。做法是将策略的逻辑放在微服务里,作为一个接口,而由sniffer来触发策略执行。我想这样策略不会因为任务太多而搞丢或者搞乱。之前QTV100的代码,我已经不知道放在哪了,在运行,但是我都无法去维护。而无数个微服务,哪怕是老版的,我也可以很容易找出来。也就是从"拉"式转为"推"式。

其次,使用数据库来支持策略的断点保存。程序的执行将按时间不断推进,直到最新时间或者指定时间,只需要为程序维持时间轴即可。策略在产生新的变化时,才会发起数据库存储需求。存储分为元数据和数据两部分,以日志的方式存储,这样在回退的时候,可以确保是干净的。

最后,使用mattermost的webhook作为事件消息的载体。当策略产生行动(买/卖),或者是巡检评估产生结论时,通过webhook发送即时消息,同时也将事件消息备份在kafka中。之后,也可考虑基于kafka的消息进行统一信息发送。

内容

1 简单测试一下FastAPI

引入基础模块,定义异步接口,然后进行简单测试。

from fastapi import FastAPI, HTTPException

class Test(BaseModel):
    """
    用于文档:这是一个测试模型,用于演示 FastAPI 的自动文档功能。
    """
    pass 

@app.post('/test/')
async def test(the_test:Test):
    """
    这是一个测试接口。

    - **the_test**: 接收一个 `Test` 模型作为输入。
    - **返回值**: 返回一个包含状态和消息的字典。

    示例:
    - 输入: `{}`
    - 输出: `{"status": true, "msg": "这是一个测试"}`
    """
    webmsg1 = WebMsg1('test')
    webmsg1.status = True 
    webmsg1.msg = '这是一个测试'
    return webmsg1.to_dict()
    

测试

import httpx 

input_dict  ={}
resp = httpx.post('http://localhost:24541/test/', json = input_dict).json()
{'name': 'test', 'status': True, 'msg': '这是一个测试', 'duration': 0, 'data': []}

fastapi自带的自动接口文档还是挺好的

在这里插入图片描述

2 核心接口

主要逻辑如下:

  • 1 元数据对象StrategyTrigger。

    • 包含了策略名(name)和标的名(code),这个用来标志一个唯一的可执行策略
    • 限定了策略的区间,如果不设定终止区间(end_dt),那么代表持续运行(每次取最新时间)
    • 明确了每次的执行最大步长(step)
    • 步长的时间单位(slot_unit)
    • 最新一个(已处理)数据时间(last_dt)
    • 当前开放的订单列表(open_orders):存在这里(redis)的原因是不会很多,而且频繁读取会比较快一些
    • 策略指定的金额(cash)
    • 是否忽略缓存(is_force)
    • 每次买入的比例(buy_cap_pct)
  • 2 元数据载入

    • cache_name: 存放StrategyTrigger元数据的内存变量
    • cache_name_data: 存放历史数据,如capital和close_orders的地方。这些数据之后会被存在mongo中,按照checkpoint(create_time)的方式存放。少量必要的元数据仍然存在次变量中,也是某种形式的redis or mongo。
    • 尝试从redis中恢复数据,如果没有数据,则制造默认的数据格式,在一次迭代执行后保存
  • 3 开始时间

    • 这个时间是用来进行数据端过滤筛选的时间
    • 当数据有last_dt时,说明已经是运行中的某一次迭代,使用这个时间
    • 如果没有last_dt,那么使用 start_dt,这个是策略定义的开始时间
  • 4 构造数据筛选时间段

    • 1 通过确定好的起始时间,转化为utc时间格式(the_utc_dt)
    • 2 最大时间由3个时间点决定:取最小值(stop_dt)
      • 1 策略指定的最大时间
      • 2 当前世界的最新时间
      • 3 步长指定的区间终点
  • 5 发起查询

    • 1 通过时间段、标的来获取对应的数据段(含信号特征)
  • 6 执行回测:遍历每一条时隙记录

    • 1 根据open_orders的数量和当前资金的状况决定是否可以买(is_ok_to_buy)/卖(is_ok_to_sell)
    • 2 买卖事件:在同一时刻,只会发生一种事件。事件触发后,即形成checkpoint,数据必须要保存。
      • 买:当收到买信号时
        • 1 查询资金池,看能买多少。
        • 2 如果能买,那么做一个订单,附加到StrategyTrigger元数据。
      • 卖:当收到卖信号
        • 1 遍历每一个卖单,重新初始化订单对象
        • 2 执行sell,然后将交易(close order)数据附加到trade_orders中
        • 3 清空open_orders列表

3 效果

随机挑了21只ETF实验,每笔按固定金额买入,总共发生466次交易。
效果还是比我预期的要好很多,近10年来只有2018和2023年亏损了,其他年份都明显大幅胜出,整体盈亏比肯定在2以上了。
在这里插入图片描述
资金占用上,看起来也还算好,理论上最高是420k,实际峰值大约360k。如果按天进行滑动,还是可以进一步利用(对冲交易)。
在这里插入图片描述

更有意思的是,2024.10 ~2024.12的持仓量是降低的。现在看来,这是比较明智的。
在这里插入图片描述

总体上,这比我预期的要好,而且是在合理的范围之内。我会用来作为基准策略。

4 技术点

为了简便起见,尽量采用了少量的工具。

  • 1 Mongo[1]: 用于存储原始的数据
  • 2 Influxdb: 用于存储生成的特征(这个还是为了将来考虑,否则可以只使用Mongo)
  • 3 FastAPI: 用来进行策略的回测与持续执行
  • 4 Mongo[2]:用于策略执行状态缓存、交易订单持久化存储。 交易单在之后应该转到clickhouse,可以更快的进行分析。
  • 5 Webhook+Mattermost: 交易事件的消息通知
  • 6 Prefect 、APScheduler: 用于周期性执行程序,包括爬取、特征生成和持续交易。

这样数据链条是完整的:1 数据- 2 特征-3 策略-4 回测- 5 消息,而且由于低频,所以手工操作完全没压力。

还有一些后续需要做的:

  • 1 前端可视化。计划使用Dash来做表格,简单点的话可能先和Gradio或者Streamlit结合,快速上线;或者晚些可以整合到Flask的蓝图框架下。这部分的功能在于快速的进行交易操作,KPI观察和分析。
  • 2 数据流监控。主要通过Kafka方式收集各流程数据,辅以大模型解释和质量抽检。确保:及时性(数据链条在周期内的活动)、完整性(必要字段是否齐全)、正确性(逻辑判断、多数据源交叉检验)。
  • 3 架构功能监控。考虑使用ELK方式,确保程序逻辑性错误得以实时发现、资源会产生预警(不过我一般都是给到非常充裕的硬件资源)比肝功能采集错误样本供快速调试。
  • 4 异步定时任务调度系统。基于Celery协程的方式,来完成数据的获取和流转。所有非高计算负载的都会走这种方式。
  • 5 分布式计算系统。基于Ray和Dask,来完成大量的分布式计算。现在的计算摊在脚本中零星计算,后续要考虑非常大的空间计算。

5 部署

采用docker镜像方式部署。

6 下一次迭代

假期还有一段时间,我应该可以做的更完善一些。

业务角度

现在的基准策略标的是之前根据规模随机挑的,我需要根据业务的对冲性,调整结构,使之表现更加稳定。

例如:国债类ETF、银行类ETF、原油类ETF、黄金类ETF,总体上应该使得整体策略结构更加稳定。

需要完成的工程任务:

  • 1 将现有的本地定时任务项目进行修改,使之更灵活。(git自动同步,只要修改一个txt就可以改变执行项目)
  • 2 将新增的etf类融入,并完成独立和合并测试。最终明确第一版的投资组合。

非结构化数据源

以事件和情感为中心,获取并提炼时事件和情感价值,后续会使用。

增强策略

在基准策略之上进行简单的分类模型,进行二次增强。其执行区间是在基准策略之内的:标的的基准策略买入,增强策略才实行;基准策略卖出,增强策略也清空。

算法类

可以开始考虑如何使用强化学习对策略进行自动优化,做原理性探索和思考。可以先做一个poc。


http://www.kler.cn/a/525241.html

相关文章:

  • Helm Chart 实战指南
  • STM32标准库移植RT-Thread nano
  • mysql.sock.lock 导致mysql重启失败
  • 设计模式-建造者模式、原型模式
  • SpringBoot或SpringAI对接DeekSeek大模型
  • stack 和 queue容器的介绍和使用
  • 【llm对话系统】大模型源码分析之 LLaMA 模型的 Masked Attention
  • 春节主题c语言代码
  • 关于产品和技术架构的思索
  • LCR 139.训练计划 I
  • 使用Java提取Word文档表格数据
  • 论文阅读(十四):贝叶斯网络在全基因组DNA甲基化研究中的应用
  • java 正则表达式匹配Matcher 类
  • C# Dynamic关键字
  • 东方博宜25年1月-B组(才俊)- 农田作物
  • Kafka的内部通信协议
  • 什么是心跳
  • 怎么样控制API的访问速率,防止API被滥用?
  • 动态规划DP 最长上升子序列模型 最长上升子序列(题目分析+C++完整代码)
  • Android NDK
  • “AI视频智能分析系统:让每一帧视频都充满智慧
  • 寻找旋转数组中的最小元素:C语言实现与分析
  • SSM开发(七) MyBatis解决实体类(model)的字段名和数据库表的列名不一致方法总结(四种方法)
  • Baklib引领企业内容中台建设的新思路与应用案例
  • 更新被联想限制更新的intel集成显卡UHD 630驱动,想让老显卡也支持到4K显示器
  • pandas(一)创建文件、写入数据