当前位置: 首页 > article >正文

VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶二

VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶二

本方案基于LSTM神经网络构建多时间尺度股票收益率预测模型,结合VectorBT进行策略回测。核心原理是通过不同时间窗口(5/10/20/30日)捕捉股价的短期、中期、长期模式,使用注意力机制融合多尺度特征,最终生成交易信号。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

Backtest Strategy

本文是进阶指南🚀,推荐先阅读了解基础知识‼️

  • VectorBT:Python量化交易策略开发与回测评估详解 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一 🔥

一、知识点总结

在A股市场量化中低频交易背景下,单窗口和多窗口滑动各有优劣,以下是两者的总结:

单窗口滑动

  • 含义:使用一个固定大小的窗口进行滑动,每次移动一个时间步,利用当前窗口内的数据预测下一个时间步的值。
  • 适用场景
    • 短期波动捕捉:A股市场波动较大时,单窗口滑动可以更及时地捕捉到短期变化,快速做出反应。
    • 短期套利交易:在短期套利机会中,单窗口滑动能够提供更精准的短期价格信息,帮助交易者迅速判断并采取行动。
  • 优缺点
    • 优点:预测结果更准确,能更好地捕捉短期波动和趋势反转;计算成本相对较低,适合高频交易。
    • 缺点:对于多步预测,需要多次调用模型,计算成本较高;可能忽略更长时间尺度上的趋势和特征。

多窗口滑动

  • 含义:同时使用多个不同大小的窗口进行滑动,从不同时间尺度上提取特征。
  • 适用场景
    • 趋势跟踪:在市场呈现出明显趋势时,多窗口滑动能够从不同时间尺度上捕捉到趋势的延续性和稳定性。
    • 风险管理:当风险管理是交易的重点时,多窗口滑动可以提供更全面的市场信息,帮助交易者从多个时间维度评估风险。
  • 优缺点
    • 优点:能够从不同时间尺度上提取特征,使模型对数据的理解更加全面;可以一次性得到多个预测值,减少模型调用次数,提高效率。
    • 缺点:预测的准确性可能不如单窗口滑动,尤其是对于较远的未来值;计算成本较高,适合中低频交易。

综合建议

  • 结合使用:在实际应用中,可以考虑结合单窗口和多窗口滑动策略。例如,在市场波动较大时,以单窗口滑动为主,快速捕捉短期机会;在市场趋势明显时,以多窗口滑动为主,把握长期趋势。
  • 根据交易目标选择:如果交易目标是短期套利或高频交易,单窗口滑动可能更合适;如果是中低频交易或需要全面的风险管理,多窗口滑动则更具优势。

二、具体实现

1. 方案特点

  • 分层特征工程:价格/技术指标/成交量特征分层处理
  • 动态特征选择:互信息+随机森林+SFS三级筛选机制
  • 多尺度建模:并行LSTM处理不同时间窗口数据
  • 自适应策略:波动率调整仓位+动态分位数阈值

架构图

模型结构
特征处理
价格特征
技术指标
成交量
5日窗口LSTM
多尺度LSTM
10日窗口LSTM
20日窗口LSTM
30日窗口LSTM
注意力融合
RobustScaler
分层特征工程
MinMaxScaler
MinMaxScaler
原始数据
动态特征选择
收益率预测
自适应策略
VectorBT回测
绩效分析

2. 关键步骤讲解

2.1 完整流程

  1. 数据预处理:对齐时间序列,计算收益率
  2. 特征工程:生成技术指标+统计特征
  3. 特征选择:三级筛选保留有效特征
  4. 数据标准化:分层处理不同特征类型
  5. 模型训练:多窗口LSTM联合训练
  6. 策略生成:动态阈值+仓位控制
  7. 回测验证:VectorBT模拟交易

2.2 序列图

数据源 特征工程 特征选择 LSTM模型 交易策略 VectorBT 用户 原始OHLCV数据 增强特征矩阵 筛选后特征 多窗口特征融合 loop [训练周期] 收益率预测 交易信号 模拟交易 绩效报告 数据源 特征工程 特征选择 LSTM模型 交易策略 VectorBT 用户

3. 代码实现

3.1 环境设置

import numpy as np  # 导入NumPy库,用于数值计算
import optuna  # 导入Optuna库,用于超参数优化
import pandas as pd  # 导入Pandas库,用于数据处理
import torch  # 导入PyTorch库,用于深度学习
import torch.nn as nn  # 导入PyTorch的神经网络模块
import torch.optim as optim  # 导入PyTorch的优化器模块
import vectorbt as vbt  # 导入VectorBT库,用于金融回测
from sklearn.ensemble import ExtraTreesRegressor, RandomForestRegressor  # 导入Scikit-Learn的集成回归模型
from sklearn.feature_selection import SequentialFeatureSelector, mutual_info_regression  # 导入特征选择工具
from sklearn.preprocessing import MinMaxScaler, RobustScaler  # 导入数据预处理工具
from torch.utils.data import DataLoader, Dataset  # 导入PyTorch的数据加载工具
from tqdm.auto import tqdm  # 导入tqdm库,用于进度条显示

print(f"PyTorch版本: {torch.__version__}")  # 打印PyTorch版本
print(f"VectorBT版本: {vbt.__version__}")  # 打印VectorBT版本

vbt.settings.array_wrapper["freq"] = "D"  # 设置VectorBT的时间频率为日
vbt.settings.plotting["layout"]["template"] = "vbt_dark"  # 设置VectorBT的绘图模板为暗色
vbt.settings.plotting["layout"]["width"] = 1200  # 设置VectorBT绘图宽度为1200
vbt.settings.portfolio["init_cash"] = 100000.0  # 设置初始资金为100000 CNY
vbt.settings.portfolio["fees"] = 0.0025  # 设置交易费用为0.25%
vbt.settings.portfolio["slippage"] = 0.0025  # 设置滑点为0.25%

device = torch.device(
    "cuda"
    if torch.cuda.is_available()  # 如果CUDA可用,使用GPU
    else "mps" if torch.backends.mps.is_available()  # 如果MPS可用,使用MPS
    else "cpu"  # 否则使用CPU
)

3.2 特征工程模块

def feature_engineering(df):
    """特征增强与分层处理"""
    
    # 基础特征
    price_features = ["open", "high", "low", "close"]  # 价格特征
    tech_features = [
        "ma3",
        "ma5",
        "ma10",
        "ma20",
        "ma30",
        "rsi",
        "macd",
        "macdsignal",
        "macdhist",
        "bb_upper",
        "bb_middle",
        "bb_lower",
        "momentum",
        "roc",
        "atr",
    ]  # 技术指标特征
    volume_features = ["vol", "obv"]  # 成交量特征

    # 新增特征(滞后、统计量、技术指标增强)
    for window in [3, 5, 10, 20, 30]:  # 不同窗口大小
        df[f"return_{window}d"] = df["close"].pct_change(window)  # 计算不同窗口的收益率
        df[f"vol_ma_{window}"] = df["vol"].rolling(window).mean()  # 计算成交量的移动平均
        df[f"close_ma_{window}"] = df["close"].rolling(window).mean()  # 计算收盘价的移动平均

    df["ma5_velocity"] = df["ma5"].pct_change(3)  # 计算MA5的速度
    df["vol_zscore"] = (df["vol"] - df["vol"].rolling(20).mean()) / df["vol"].rolling(20).std()  # 计算成交量的Z分数
    df["range_ratio"] = (df["high"] - df["low"]) / df["close"].shift(1)  # 计算波动范围比率
    df["volume_ma_ratio"] = df["vol"] / df["vol"].rolling(5).mean()  # 计算成交量与移动平均的比率
    df["close_velocity"] = df["close"].pct_change(3)  # 计算收盘价的速度
    df["volatility"] = df["close"].rolling(20).std() / df["close"].rolling(20).mean()  # 计算波动率
    df["price_volume_corr"] = df["close"].rolling(10).corr(df["vol"])  # 计算价格与成交量的相关性

    # 新增特征分组
    enhanced_features = [
        "ma5_velocity",
        "vol_zscore",
        "range_ratio",
        "volume_ma_ratio",
        "close_velocity",
        "volatility",
        "price_volume_corr",
    ]
    for window in [3, 5, 10, 20, 30]:
        enhanced_features.extend(
            [f"return_{window}d", f"vol_ma_{window}", f"close_ma_{window}"]
        )

    # 分层标准化
    price_scaler = RobustScaler().fit(df[price_features])  # 价格特征的RobustScaler
    tech_scaler = MinMaxScaler().fit(df[tech_features])  # 技术指标特征的MinMaxScaler
    volume_scaler = MinMaxScaler().fit(df[volume_features])  # 成交量特征的MinMaxScaler
    enhanced_scaler = RobustScaler().fit(df[enhanced_features])  # 新增特征的RobustScaler

    processed_features = np.hstack(
        [
            price_scaler.transform(df[price_features]),  # 标准化价格特征
            tech_scaler.transform(df[tech_features]),  # 标准化技术指标特征
            volume_scaler.transform(df[volume_features]),  # 标准化成交量特征
            enhanced_scaler.transform(df[enhanced_features]),  # 标准化新增特征
        ]
    )

    return pd.DataFrame(processed_features, index=df.index)  # 返回处理后的特征数据框

3.3 动态特征选择

# %%
# 动态特征选择
def feature_selection(X, y):
    """混合特征选择策略"""
    # 第一阶段:互信息筛选
    mi_scores = mutual_info_regression(X, y)  # 计算每个特征与目标变量的互信息
    selected = mi_scores > np.quantile(mi_scores, 0.2)  # 选择互信息大于0.2分位数的特征
    X_mi = X[:, selected].astype(np.float32)  # 将选择的特征转换为float32类型
    print(f"[Stage 1] 互信息筛选后特征数: {X_mi.shape[1]}")  # 打印筛选后的特征数量

    # 第二阶段:随机森林重要性
    rf = RandomForestRegressor(n_estimators=100, n_jobs=-1)  # 初始化随机森林回归模型
    rf.fit(X_mi, y)  # 使用选择的特征训练随机森林模型
    importances = rf.feature_importances_  # 获取特征重要性
    rf_selected = importances > np.mean(importances)  # 选择重要性大于平均值的特征
    X_rf = X_mi[:, rf_selected].astype(np.float32)  # 将选择的特征转换为float32类型
    print(f"[Stage 2] 随机森林筛选后特征数: {X_rf.shape[1]}")  # 打印筛选后的特征数量

    ## 获取筛选后的二维特征矩阵
    X_selected = X_rf.astype(np.float32)  # 将选择的特征转换为float32类型
    y_tensor = y.astype(np.float32).ravel()  # 将目标变量转换为一维数组并转换为float32类型

    # 第三阶段:使用随机森林进行递归特征消除 SFS/RFE
    # estimator = RandomForestRegressor(n_estimators=100, n_jobs=-1)  # 随机森林回归模型
    estimator = ExtraTreesRegressor(n_estimators=100, n_jobs=-1)  # 初始化ExtraTrees回归模型
    n_features_to_select = min(12, X_selected.shape[1])  # 选择最多12个特征
    if n_features_to_select < X_selected.shape[1]:  # 如果选择的特征数小于当前特征数
        sfs = SequentialFeatureSelector(
            estimator,
            n_features_to_select=n_features_to_select,
            direction="forward",
            scoring="neg_mean_squared_error",
            n_jobs=-1,
        )  # 初始化顺序特征选择器
        final_features = sfs.fit_transform(X_selected, y_tensor)  # 进行特征选择
        # rfe = RFE(estimator, n_features_to_select=n_features_to_select)  # RFE特征选择器
        # final_features = rfe.fit_transform(X_selected, y_tensor)  # RFE特征选择
        print(f"[Stage 3] SFS筛选后特征数: {final_features.shape[1]}")  # 打印筛选后的特征数量
    else:
        final_features = X_selected  # 如果不需要进一步选择,直接使用当前特征
        print(f"[Stage 3] SFS未执行,特征数: {final_features.shape[1]}")  # 打印特征数量

    return final_features.astype(np.float32)  # 返回最终选择的特征

3.4 多尺度LSTM模型

class MultiScaleLSTM(nn.Module):
    """多尺度LSTM模型"""

    def __init__(self, input_dim, windows=[5, 10, 20, 30], hidden_dim=128):
        super().__init__()  # 调用父类的初始化方法
        self.lstms = nn.ModuleList(
            [nn.LSTM(input_dim, hidden_dim, batch_first=True) for _ in windows]
        )  # 初始化多个LSTM层,每个窗口一个
        self.attention = nn.Sequential(
            nn.Linear(hidden_dim, 16),  # 线性层,将隐藏维度映射到16
            nn.Tanh(),  # Tanh激活函数
            nn.Linear(16, 1),  # 线性层,将16映射到1
            nn.Softmax(dim=1)  # Softmax激活函数,用于计算注意力权重
        )
        self.fc = nn.Sequential(
            nn.Linear(len(windows) * hidden_dim, 128),  # 全连接层,将多个LSTM输出拼接后映射到128
            nn.ReLU(),  # ReLU激活函数
            nn.Dropout(0.3),  # Dropout层,防止过拟合
            nn.Linear(128, 1)  # 最终全连接层,映射到1
        )

    def forward(self, x_multi):
        """输入为不同窗口长度的数据列表"""
        contexts = []  # 存储每个窗口的上下文向量
        for lstm, x in zip(self.lstms, x_multi):  # 遍历每个LSTM和对应的输入数据
            out, _ = lstm(x)  # 前向传播LSTM
            attn = self.attention(out)  # 计算注意力权重
            context = torch.sum(attn * out, dim=1)  # 计算加权后的上下文向量
            contexts.append(context)  # 将上下文向量添加到列表中
        return self.fc(torch.cat(contexts, dim=1))  # 拼接所有上下文向量并通过全连接层

3.5 数据处理模块

def prepare_data(df):
    """数据处理"""
    # 特征工程
    df_engineered = feature_engineering(df)  # 对输入的DataFrame进行特征工程
    df_engineered.dropna(inplace=True)  # 删除含有NaN值的行

    # 确保索引对齐
    X = df_engineered  # 将特征工程后的DataFrame赋值给X
    y = df.reindex(df_engineered.index)["returns"].values.ravel()  # 重新索引并提取返回值

    # 动态特征选择
    selected_features = feature_selection(X.values, y)  # 对特征进行动态选择

    # 数据标准化
    scaler = MinMaxScaler(feature_range=(-1, 1))  # 初始化MinMaxScaler,范围为-1到1
    scaled_data = scaler.fit_transform(selected_features)  # 对选择后的特征进行标准化

    # 使用固定窗口列表(与MultiScaleLSTM设计一致)
    fixed_windows = [5, 10, 20, 30]  # 定义固定窗口列表

    # 生成每个窗口的独立序列
    def create_window_sequences(data, window):
        sequences = []
        for i in range(window, len(data)):
            seq = data[i - window : i]  # 提取当前窗口的数据
            sequences.append(seq)  # 将窗口数据添加到序列列表中
        return np.array(sequences, dtype=np.float32)  # 返回numpy数组,确保数据类型为float32

    # 为每个窗口生成序列
    X_windows = []
    for w in fixed_windows:
        X_win = create_window_sequences(scaled_data, w)  # 为每个窗口生成序列
        X_windows.append(X_win)  # 将生成的序列添加到X_windows列表中

    # 统一样本数量(以最小窗口样本数为准)
    min_samples = min(len(x) for x in X_windows)  # 找出所有窗口中的最小样本数
    X_windows = [x[-min_samples:] for x in X_windows]  # 对齐尾部数据
    y = y[-min_samples:]  # 对齐y标签

    # 数据分割
    split = int(0.8 * min_samples)  # 计算训练集和测试集的分割点
    X_train = [x[:split] for x in X_windows]  # 分割训练集
    X_test = [x[split:] for x in X_windows]  # 分割测试集
    y_train, y_test = y[:split], y[split:]  # 分割y标签

    print(
        f"训练集维度: 窗口7{X_train[0].shape} 窗口14{X_train[1].shape} 窗口30{X_train[2].shape}"
    )  # 打印训练集维度信息
    return (X_train, y_train), (X_test, y_test), scaler  # 返回训练集、测试集和缩放器

3.6 自定义多窗口数据集

class MultiWindowDataset(Dataset):
    """自定义多窗口数据集"""

    def __init__(self, X_windows, y):
        """
        初始化多窗口数据集
        X_windows: 包含多个窗口数据的列表,每个元素形状为 (num_samples, window_size, num_features)
        y: 标签数据,形状 (num_samples,)
        """
        self.X_windows = [x.astype(np.float32) for x in X_windows]  # 显式转换为float32类型
        self.y = y.astype(np.float32)  # 将标签数据转换为float32类型

    def __len__(self):
        """返回数据集的长度"""
        return len(self.y)  # 返回标签数据的长度

    def __getitem__(self, idx):
        """
        返回一个样本的所有窗口数据和对应标签
        输出格式:([窗口1数据, 窗口2数据,...], 标签)
        """
        # 获取每个窗口的对应样本数据
        window_samples = [
            torch.from_numpy(x[idx]).float() for x in self.X_windows  # 明确指定数据类型
        ]
        label = torch.tensor(self.y[idx], dtype=torch.float32)  # 将标签转换为torch张量
        return window_samples, label  # 返回窗口数据和标签

3.6 自定义批处理函数

def collate_fn(batch):
    """
    处理批次数据:
    输入:batch是包含多个元组的列表,每个元组为 (window_samples, label)
    输出:([窗口1批次数据, 窗口2批次数据,...], 标签批次)
    """
    # 解压批次数据
    windows_data, labels = zip(*batch)  # 解压批次数据为窗口数据和标签
    # 重新组织窗口数据维度
    # 将每个窗口的样本堆叠成 (batch_size, window_size, num_features)
    transposed_windows = list(zip(*windows_data))  # 转置窗口数据
    batched_windows = [
        torch.stack(window_batch, dim=0).float()
        for window_batch in transposed_windows  # 明确指定数据类型
    ]
    # 堆叠标签 (batch_size,)
    batched_labels = torch.stack(labels, dim=0).float()  # 明确指定数据类型
    return batched_windows, batched_labels  # 返回批量窗口数据和标签

3.7 训练模型

def train_model(config, train_data):
    """训练模型"""
    X_train, y_train = train_data  # 解包训练数据

    # 创建多窗口数据集和数据加载器
    train_dataset = MultiWindowDataset(X_train, y_train)  # 初始化多窗口数据集
    train_loader = DataLoader(
        train_dataset,
        batch_size=config["batch_size"],  # 设置批量大小
        shuffle=True,  # 打乱数据
        collate_fn=collate_fn,  # 使用自定义批处理函数
        drop_last=True,  # 丢弃最后一个不完整的批次
    )

    # 初始化模型(输入维度从数据中自动获取)
    input_dim = X_train[0].shape[-1]  # 获取特征维度
    windows = (
        config["windows"]
        if isinstance(config["windows"], (list, tuple))
        else [config["windows"]]
    )  # 确保窗口是一个列表或元组
    hidden_dim = config["hidden_dim"]  # 获取隐藏层维度
    model = MultiScaleLSTM(
        input_dim=input_dim, windows=windows, hidden_dim=hidden_dim
    ).to(device)  # 初始化多尺度LSTM模型并移动到指定设备

    # 配置优化器
    optimizer = optim.AdamW(
        model.parameters(), lr=config["lr"], weight_decay=config["weight_decay"]
    )  # 使用AdamW优化器
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)  # 使用余弦退火学习率调度器
    criterion = nn.HuberLoss()  # 使用Huber损失函数

    # 训练循环
    for epoch in tqdm(range(config["epochs"]), desc="Training"):  # 进行多个epoch的训练
        model.train()  # 将模型设置为训练模式
        for X_batch, y_batch in train_loader:  # 遍历每个批次
            # 将每个窗口的数据移动到设备
            X_batch = [x.to(device) for x in X_batch]  # 将每个窗口的数据移动到指定设备
            y_batch = y_batch.to(device).unsqueeze(1)  # 将标签数据移动到指定设备并增加一个维度

            optimizer.zero_grad()  # 清零梯度
            preds = model(X_batch)  # 前向传播
            loss = criterion(preds, y_batch)  # 计算损失
            loss.backward()  # 反向传播
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # 梯度裁剪
            optimizer.step()  # 更新参数

        scheduler.step()  # 更新学习率

    return model  # 返回训练好的模型

3.8 模型评估

def evaluate_model(model, val_data, device):
    """评估模型"""
    X_val, y_val = val_data  # 解包验证数据

    # 创建多窗口数据集和数据加载器
    val_dataset = MultiWindowDataset(X_val, y_val)  # 初始化多窗口数据集
    val_loader = DataLoader(
        val_dataset,
        batch_size=config["batch_size"],  # 设置批量大小
        shuffle=False,  # 不打乱数据
        collate_fn=collate_fn,  # 使用自定义批处理函数
        drop_last=False,  # 不丢弃最后一个不完整的批次
    )

    model.eval()  # 将模型设置为评估模式
    criterion = nn.HuberLoss()  # 使用Huber损失函数
    total_loss = 0.0  # 初始化总损失
    num_samples = 0  # 初始化样本数量

    with torch.no_grad():  # 关闭梯度计算
        for X_batch, y_batch in val_loader:  # 遍历每个批次
            # 将每个窗口的数据移动到设备
            X_batch = [x.to(device) for x in X_batch]  # 将每个窗口的数据移动到指定设备
            y_batch = y_batch.to(device).unsqueeze(1)  # 将标签数据移动到指定设备并增加一个维度

            preds = model(X_batch)  # 前向传播
            loss = criterion(preds, y_batch)  # 计算损失
            total_loss += loss.item() * len(y_batch)  # 累加当前批次的损失
            num_samples += len(y_batch)  # 累加当前批次的样本数量

    avg_loss = total_loss / num_samples  # 计算平均损失
    return avg_loss  # 返回平均损失

3.9 双均线策略

class DualMovingAverageStrategy:
    """双均线策略"""

    def __init__(self, pred_returns, volatility, params):
        """
        初始化双均线策略
        :param pred_returns: 预测收益率序列
        :param volatility: 波动率序列
        :param params: 策略参数字典
        """
        self.pred_returns = pred_returns  # 预测收益率序列
        self.volatility = volatility.clip(lower=0.01)  # 波动率序列,防止零波动
        self.params = params  # 策略参数字典

    def generate_signals(self):
        """
        生成交易信号
        :return: 交易信号和仓位大小
        """
        # 获取参数
        short_window = self.params.get("short_window", 5)  # 短期窗口,默认5天
        long_window = self.params.get("long_window", 20)  # 长期窗口,默认20天

        # 计算短期和长期的简单移动平均线
        short_mavg = self.pred_returns.rolling(window=short_window, min_periods=1).mean()  # 短期移动平均线
        long_mavg = self.pred_returns.rolling(window=long_window, min_periods=1).mean()  # 长期移动平均线

        # 创建一个空的信号序列
        signals = pd.Series(0, index=self.pred_returns.index)

        # 当短期均线上穿长期均线时,设置买入信号
        signals[(short_mavg > long_mavg) & (short_mavg.shift(1) <= long_mavg.shift(1))] = 1

        # 当短期均线下穿长期均线时,设置卖出信号
        signals[(short_mavg < long_mavg) & (short_mavg.shift(1) >= long_mavg.shift(1))] = -1

        # 波动率调整仓位
        position_size = np.tanh(self.pred_returns.abs() / self.volatility)  # 根据预测收益率和波动率计算仓位大小
        position_size = position_size.clip(0.1, 0.8)  # 限制仓位大小在0.1到0.8之间

        # 打印信号统计信息
        print("Long Conditions:", (signals == 1).sum())  # 打印买入信号数量
        print("Short Conditions:", (signals == -1).sum())  # 打印卖出信号数量
        print("Signals Distribution:\n", signals.value_counts())  # 打印信号分布

        return signals, position_size  # 返回交易信号和仓位大小

3.10 动态阈值策略

class AdaptiveStrategy:
    """动态阈值策略"""

    def __init__(self, pred_returns, volatility, params):
        """
        初始化动态阈值策略
        :param pred_returns: 预测收益率序列
        :param volatility: 波动率序列
        :param params: 策略参数字典
        """
        self.pred_returns = pred_returns  # 预测收益率序列
        self.volatility = volatility.clip(lower=0.01)  # 波动率序列,防止零波动
        self.params = params  # 策略参数字典

    def generate_signals(self):
        """
        生成交易信号
        :return: 交易信号和仓位大小
        """
        # 获取参数
        rolling_window = self.params.get("window", 5)  # 滚动窗口,默认5天

        # 计算动态分位数阈值
        upper_thresh = self.pred_returns.rolling(rolling_window).quantile(0.7)  # 上阈值(70%分位数)
        lower_thresh = self.pred_returns.rolling(rolling_window).quantile(0.3)  # 下阈值(30%分位数)

        # 波动率调整仓位
        position_size = np.tanh(self.pred_returns.abs() / self.volatility)  # 根据预测收益率和波动率计算仓位大小
        position_size = position_size.clip(0.1, 0.8)  # 限制仓位大小在0.1到0.8之间

        # 生成信号
        long_cond = (self.pred_returns > upper_thresh) & (position_size > 0.2)  # 买入条件
        short_cond = (self.pred_returns < lower_thresh) & (position_size > 0.2)  # 卖出条件

        # 创建一个空的信号序列
        signals = pd.Series(0, index=self.pred_returns.index)

        # 设置买入信号
        signals[long_cond] = 1

        # 设置卖出信号
        signals[short_cond] = -1

        # 打印信号统计信息
        print("Long Conditions:", long_cond.sum())  # 打印买入信号数量
        print("Short Conditions:", short_cond.sum())  # 打印卖出信号数量
        print("Signals Distribution:\n", signals.value_counts())  # 打印信号分布

        return signals, position_size  # 返回交易信号和仓位大小

3.11 回测引擎

def backtest_strategy(model, test_data, scaler, df):
    """回测引擎"""
    X_test, y_test = test_data  # 分解测试数据

    # 生成预测
    model.eval()  # 将模型设置为评估模式
    with torch.no_grad():  # 禁用梯度计算
        test_tensor = [torch.FloatTensor(x).to(device) for x in X_test]  # 将测试数据转换为Tensor并移动到设备
        preds = model(test_tensor).cpu().numpy().flatten()  # 生成预测值并转换为NumPy数组

    # 对齐时间索引
    test_dates = df.index[-len(preds) :]  # 获取与预测值对齐的时间索引
    result_df = pd.DataFrame(
        {
            "close": df["close"].values[-len(preds) :],  # 闭盘价
            "pred_returns": preds.flatten(),  # 预测收益率
            "volatility": df["atr"].values[-len(preds) :] / df["close"].values[-len(preds) :],  # 波动率
        },
        index=test_dates,
    )

    # 检查预测值分布
    print("Predicted Returns Stats:", result_df["pred_returns"].describe())  # 打印预测收益率统计信息
    print("Predicted Returns Mean:", result_df["pred_returns"].mean())  # 打印预测收益率均值
    print(
        "Predicted Returns Max/Min:",
        result_df["pred_returns"].max(),
        result_df["pred_returns"].min(),
    )  # 打印预测收益率最大值和最小值
    # 检查波动率计算异常
    print("Volatility Stats:", result_df["volatility"].describe())  # 打印波动率统计信息

    # 动态阈值策略
    # strategy = AdaptiveStrategy(
    #     pred_returns=result_df["pred_returns"],
    #     volatility=result_df["volatility"],
    #     params={"window": 14},
    # )

    # 双均线策略
    strategy = DualMovingAverageStrategy(
        pred_returns=result_df["pred_returns"],  # 预测收益率
        volatility=result_df["volatility"],  # 波动率
        params={"short_window": 5, "long_window": 20},  # 策略参数
    )

    # 生成信号
    signals, position_size = strategy.generate_signals()  # 生成交易信号和仓位大小

    # 构建投资组合
    pf = vbt.Portfolio.from_signals(
        close=result_df["close"],  # 闭盘价
        entries=signals == 1,  # 买入信号
        exits=signals == -1,  # 卖出信号
        size=position_size.abs(),  # 仓位大小
        size_type="percent",  # 仓位类型(百分比)
        freq="D",  # 时间频率(日)
        # 增强参数
        accumulate=False,  # 禁止累积仓位
        log=True,  # 记录交易日志
        call_seq="auto",  # 自动处理订单顺序
    )

    return pf, result_df  # 返回投资组合对象和结果DataFrame

3.12 主程序运行

# 示例数据加载
# 选择要加载的股票代码
ts_code = "600000.SH" # 浦发银行(600000.SH)

# 读取处理后的Parquet文件
df = pd.read_parquet(f"./data/processed_{ts_code}.parquet")

# 检查 close 列的数据类型是否为数值型
print(df["close"].dtype)  # 应为 float64 或 int64

# 检查 close 列是否有缺失值
print(df["close"].isna().sum())  # 应为 0

# 将 trade_date 列转换为 datetime 类型
df["trade_date"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")

# 将 trade_date 列设置为索引
df.set_index("trade_date", inplace=True)

# 计算每日收益率,并将结果向前移动一天
df["returns"] = df["close"].pct_change().shift(-1)

# 删除所有包含 NaN 的行
df.dropna(inplace=True)

# 数据准备
train_data, test_data, scaler = prepare_data(df)  # 调用prepare_data函数进行数据准备

# Optuna超参优化
def objective(trial):
    """定义Optuna的优化目标函数"""
    config = {
        "hidden_dim": trial.suggest_int("hidden_dim", 64, 256),  # 建议隐藏层维度在64到256之间
        "windows": trial.suggest_categorical("windows", [5, 10, 20, 30]),  # 建议窗口大小
        "lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),  # 建议学习率在1e-4到1e-3之间(对数尺度)
        "batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),  # 建议批量大小
        "weight_decay": trial.suggest_float("weight_decay", 1e-6, 1e-4),  # 建议权重衰减在1e-6到1e-4之间
        "epochs": 100,  # 固定训练轮数为100
    }
    model = train_model(config, train_data)  # 训练模型
    val_loss = evaluate_model(model, test_data)  # 评估模型
    return val_loss  # 返回验证损失

# 创建Optuna研究对象,目标是最小化验证损失
study = optuna.create_study(direction="minimize")
# 运行优化,最多10次试验,显示进度条,超时时间为1小时
study.optimize(objective, n_trials=10, show_progress_bar=True, timeout=3600)

# 最佳模型回测
best_config = study.best_params  # 获取最佳参数
best_config["epochs"] = 100  # 设置训练轮数为100
best_model = train_model(best_config, train_data)  # 使用最佳参数训练模型

# 训练完成后保存模型和参数
model_path = "./models/pytorch_lstm_model.pth"  # 模型保存路径
torch.save(
    {
        "model_state_dict": best_model.state_dict(),  # 保存模型状态字典
        "scaler": scaler,  # 保存特征缩放器
        **best_config,  # 保存最佳配置
    },
    model_path,
)
print(f"PyTorch LSTM model and parameters saved to {model_path}")  # 打印保存路径

# 加载最佳模型
checkpoint = torch.load(  # 加载模型检查点
    "./models/pytorch_lstm_model.pth", weights_only=False, map_location=device
)

X_train, y_train = train_data  # 获取训练数据

# 确保 windows 是一个列表或元组
windows = checkpoint.get("windows", [5, 10, 20, 30])  # 获取窗口大小
if not isinstance(windows, (list, tuple)):
    windows = [windows]  # 如果不是列表或元组,则转换为列表
hidden_dim = checkpoint["hidden_dim"]  # 获取隐藏层维度

# 初始化多尺度LSTM模型
best_model = MultiScaleLSTM(
    input_dim=X_train[0].shape[-1], windows=windows, hidden_dim=hidden_dim
).to(device)
best_model.load_state_dict(checkpoint["model_state_dict"])  # 加载模型状态字典

# 回测
pf, result_df = backtest_strategy(best_model, test_data, scaler, df)  # 进行回测

# 绩效分析
print(pf.stats())  # 打印投资组合统计信息
pf.plot().show()  # 显示投资组合图表

# 可视化结果
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig = make_subplots(specs=[[{"secondary_y": True}]])

# 主坐标轴(价格)
fig.add_trace(
    go.Scatter(
        x=result_df.index,  # 时间索引
        y=result_df["close"],  # 实际价格
        name="Actual Price",  # 图例名称
    ),
    secondary_y=False,  # 主坐标轴
)

# 次坐标轴(收益率)
fig.add_trace(
    go.Bar(
        x=result_df.index,  # 时间索引
        y=result_df["pred_returns"],  # 预测收益率
        name="Predicted Returns",  # 图例名称
        marker=dict(opacity=0.8),  # 设置柱状图透明度
        width=86400000,  # 设置柱宽为1天(毫秒单位)
    ),
    secondary_y=True,  # 次坐标轴
)

# 设置布局
fig.update_layout(
    title="Price vs Predicted Returns",  # 图表标题
    template="vbt_dark",  # 使用vbt_dark模板
    legend=dict(orientation="h", yanchor="bottom", y=1.02),  # 图例设置
    hovermode="x unified",  # 悬停模式
    width=800,  # 图表宽度
)

# 设置坐标轴标签
fig.update_yaxes(title_text="Price", secondary_y=False, showgrid=False, tickprefix="¥")  # 主坐标轴标签
fig.update_yaxes(
    title_text="Returns", secondary_y=True, showgrid=False, tickformat=".2%"  # 次坐标轴标签
)

fig.show()  # 显示图表

4. 关键类和函数说明

MultiScaleLSTM
  • 描述: 多尺度LSTM模型,用于处理不同窗口长度的数据。
  • 参数:
    • input_dim: 输入特征的维度。
    • windows: 不同窗口长度的列表,默认为 [5, 10, 20, 30]
    • hidden_dim: LSTM隐藏层的维度,默认为 128
  • 方法:
    • forward(x_multi): 前向传播函数,输入为不同窗口长度的数据列表。
MultiWindowDataset
  • 描述: 自定义多窗口数据集类,用于生成多窗口数据。
  • 参数:
    • X_windows: 包含多个窗口数据的列表,每个元素形状为 (num_samples, window_size, num_features)
    • y: 标签数据,形状为 (num_samples,)
  • 方法:
    • __len__(): 返回数据集的长度。
    • __getitem__(idx): 返回一个样本的所有窗口数据和对应标签。
DualMovingAverageStrategy
  • 描述: 双均线策略类,用于生成交易信号。
  • 参数:
    • pred_returns: 预测收益率。
    • volatility: 波动率。
    • params: 策略参数字典,包含短期和长期窗口长度。
  • 方法:
    • generate_signals(): 生成买卖信号和仓位大小。
AdaptiveStrategy
  • 描述: 动态阈值策略类,用于生成交易信号。
  • 参数:
    • pred_returns: 预测收益率。
    • volatility: 波动率。
    • params: 策略参数字典,包含滚动窗口长度。
  • 方法:
    • generate_signals(): 生成买卖信号和仓位大小。

函数

feature_engineering(df)
  • 描述: 特征工程模块,对输入数据进行特征增强和分层处理。
  • 参数:
    • df: 输入的DataFrame数据。
  • 返回值: 处理后的特征数据。
feature_selection(X, y)
  • 描述: 动态特征选择策略,通过互信息、随机森林重要性和递归特征消除进行特征选择。
  • 参数:
    • X: 输入特征矩阵。
    • y: 目标变量。
  • 返回值: 选择后的特征矩阵。
prepare_data(df)
  • 描述: 数据处理模块,包括特征工程、特征选择、数据标准化和生成多窗口序列。
  • 参数:
    • df: 输入的DataFrame数据。
  • 返回值: 训练数据、测试数据和特征缩放器。
collate_fn(batch)
  • 描述: 自定义批处理函数,用于处理多窗口数据集的批次数据。
  • 参数:
    • batch: 包含多个元组的列表,每个元组为 (window_samples, label)
  • 返回值: 批次窗口数据和标签。
train_model(config, train_data)
  • 描述: 训练多尺度LSTM模型。
  • 参数:
    • config: 模型配置字典,包含超参数。
    • train_data: 训练数据。
  • 返回值: 训练好的模型。
evaluate_model(model, val_data)
  • 描述: 评估多尺度LSTM模型。
  • 参数:
    • model: 待评估的模型。
    • val_data: 验证数据。
  • 返回值: 验证损失。
backtest_strategy(model, test_data, scaler, df)
  • 描述: 回测引擎,使用训练好的模型进行回测。
  • 参数:
    • model: 训练好的模型。
    • test_data: 测试数据。
    • scaler: 特征缩放器。
    • df: 原始数据。
  • 返回值: 回测结果和预测数据。

5. 优化建议

当前方案
增加另类数据
改进模型架构
强化风险管理
新闻情感分析
Transformer
动态止损策略
方向具体措施
特征工程增加行业因子/市场情绪指标
模型结构引入Transformer替代LSTM
策略优化加入止损机制和仓位再平衡
计算优化实现增量训练和在线学习

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。


http://www.kler.cn/a/613361.html

相关文章:

  • GenBI 中如何引入 LLM 做意图路由,区分查数据还是闲聊
  • Android生态大变革,谷歌调整开源政策,核心开发不再公开
  • MAC环境给docker换源
  • 【力扣hot100题】(010)滑动窗口最大值
  • 项目接入通义千问 api 接口实现步骤详解
  • 基于ssm的养老院综合服务系统
  • 项目-苍穹外卖(十五) Apache ECharts+数据统计
  • Blender绘图——旋转曲线(以LCP与RCP为例)
  • Linux系统编程 | 线程的基本概念
  • FLEXlm如何通过web调用
  • Go服务开发高手课(极客讲堂)
  • 为什么大模型在 OCR 任务上表现不佳?
  • 0.雷达信号
  • 什么是uv 和pip的区别是什么
  • leetcode131-分割回文串
  • 构建大语言模型应用:句子转换器(Sentence Transformers)(第三部分)
  • 手机零售行业的 AI 破局与创新降本实践 | OceanBase DB大咖说
  • 探索:如何构建一个自我的AI辅助的开发环境?
  • 淘宝API+爬虫混合方案:合规采集历史价格与评价数据
  • Axios核心原理