当前位置: 首页 > article >正文

LSTM卫星轨道预测(一)

一.多文件预测

代码详细解析


1. 文件读取与数据处理

功能
  • .sp3 文件中读取卫星轨迹数据。
  • 提取包括 Satellite_ID, X, Y, Z 等字段的信息。
  • 计算派生特征(如速度和加速度),便于后续建模使用。
主要函数:extract_sp3_data(file_path)
  • 正则表达式提取数据

    • 使用 re.match() 提取每行中有关卫星编号、轨迹坐标和其他列的信息。
    • 匹配模式:^PG(\d{2})\s+([-]?\d+\.\d+)\s+([-]?\d+\.\d+)\s+([-]?\d+\.\d+)\s+([-]?\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)
      • PG(\d{2}): 匹配卫星编号。
      • ([-]?\d+\.\d+): 匹配 X, Y, Z 坐标值。
      • 其他字段:数值或整数类型的列。
  • 计算时间特征

    • 周期性特征 sin_timecos_time
      data_df['sin_time'] = np.sin(2 * np.pi * np.arange(num_rows) / num_rows)
      data_df['cos_time'] = np.cos(2 * np.pi * np.arange(num_rows) / num_rows)
      
      • 模拟时间周期的特性,有助于模型捕捉卫星轨迹的周期性变化。
  • 计算速度和加速度

    • 速度 Speed:基于坐标的欧几里得距离差分计算:
      speed = np.sqrt(np.diff(data_df['X'])**2 + np.diff(data_df['Y'])**2 + np.diff(data_df['Z'])**2)
      
    • 加速度 Accel_X, Accel_Y, Accel_Z:坐标的二阶差分。

数据合并
  • 将所有 .sp3 文件解析后的数据合并为一个完整的 DataFrame
    all_data_df = pd.concat([all_data_df, data_df], ignore_index=True)
    
保存数据
  • 将合并数据保存为 CSV 文件:
    all_data_df.to_csv(output_csv_path, index=False)
    

2. 数据加载与选择

加载数据
  • 使用 @st.cache_data 缓存处理后的 CSV 数据:
    data_df = load_data()
    
用户选择
  • 用户通过 Streamlit 的 selectbox 选择感兴趣的卫星编号:
    satellite_id = st.selectbox("选择卫星编号", satellite_ids)
    satellite_data = data_df[data_df['Satellite_ID'] == satellite_id]
    
特征完整性检查
  • 确保所有特征列(如 X, Y, Z, Speed, sin_time, cos_time)完整,否则报错:
    missing_features = [feature for feature in features if feature not in satellite_data.columns]
    if missing_features:
        st.error(f"缺少以下特征列:{', '.join(missing_features)}")
    
归一化处理
  • 使用 MinMaxScaler 将特征归一化为 [0, 1] 区间,便于训练模型:
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(satellite_data[features])
    

3. 序列数据创建

功能
  • 将归一化后的数据转换为时间序列形式,方便 LSTM 模型处理。
主要代码
  • 滑动窗口生成序列

    def create_sequences(data, seq_length=20):
        X, y = [], []
        for i in range(len(data) - seq_length):
            X.append(data[i:i+seq_length])
            y.append(data[i + seq_length][:3])  # 仅预测 X, Y, Z
        return np.array(X), np.array(y)
    
  • 生成特征和目标数据

    • 特征:时间序列的多维特征。
    • 目标:序列末尾的 X, Y, Z 坐标。

4. 模型定义与训练

模型架构
  • LSTM 模型

    • 两层 LSTM 层(num_layers=2),具有 64 维隐藏层。
    • 输出层是一个全连接层,用于预测 X, Y, Z 坐标。
  • 代码实现

    class LSTMModel(nn.Module):
        def __init__(self, input_size, hidden_size, output_size, num_layers=2):
            super(LSTMModel, self).__init__()
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=0.2)
            self.fc = nn.Linear(hidden_size, output_size)
    
        def forward(self, x):
            lstm_out, (h_n, c_n) = self.lstm(x)
            out = self.fc(lstm_out[:, -1, :])  # 输出序列最后一个时刻的预测值
            return out
    

训练过程
  • 损失函数与优化器

    • 使用均方误差(MSELoss)作为损失函数。
    • 使用 AdamW 优化器更新权重。
    • 使用 ReduceLROnPlateau 动态调整学习率。
  • 训练逻辑

    • 每个 epoch 记录训练损失值(loss_values),用于可视化。
    • 保存当前最佳模型参数:
      if avg_loss < best_loss:
          best_loss = avg_loss
          torch.save(model.state_dict(), 'best_lstm_model.pth')
      
损失值可视化
  • 使用 Matplotlib 绘制训练损失曲线:
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, num_epochs + 1), loss_values, marker='o', linestyle='-', color='blue', label='Training Loss')
    plt.title("Training Loss over Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.grid()
    plt.legend()
    st.pyplot(plt)
    

5. 预测与误差评估

预测
  • 加载最佳模型权重:
    model.load_state_dict(torch.load('best_lstm_model.pth'))
    model.eval()
    
  • 使用模型预测坐标并反归一化,得到实际坐标。
误差评估
  • 使用 mean_squared_error, mean_absolute_error, 和 r2_score 等指标评估模型性能:
    mse = mean_squared_error(true_coordinates, predicted_df[['X', 'Y', 'Z']].values)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(true_coordinates, predicted_df[['X', 'Y', 'Z']].values)
    r2 = r2_score(true_coordinates, predicted_df[['X', 'Y', 'Z']].values)
    

6. 轨迹可视化

功能
  • 使用 Plotly 绘制 3D 图形,直观展示历史轨迹与预测轨迹的对比。
代码实现
  • 历史轨迹(青色线条)

    fig.add_trace(go.Scatter3d(
        x=satellite_data['X'], y=satellite_data['Y'], z=satellite_data['Z'],
        mode='lines',
        line=dict(color='cyan', width=4)
    ))
    
  • 预测轨迹(红色线条)

    fig.add_trace(go.Scatter3d(
        x=predicted_coordinates[:, 0], y=predicted_coordinates[:, 1], z=predicted_coordinates[:, 2],
        mode='lines',
        line=dict(color='red', width=4)
    ))
    
  • 更新布局
    设置图形标题和坐标轴标签:

    fig.update_layout(
        title="历史轨迹与预测轨迹对比",
        scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z'),
        margin=dict(r=0, l=0, b=0, t=0)
    )
    

关键输出

  1. 训练损失曲线
    • 展示损失值随训练迭代的变化趋势。
  2. 预测结果表
    • 显示预测的 X, Y, Z 坐标值。
  3. 误差评估指标
    • 输出 MSE、RMSE、MAE 和 R² 值。
  4. 轨迹对比图
    • 3D 轨迹图对比历史
      在这里插入图片描述
      在这里插入图片描述
      在这里插入图片描述

精度评定:
均方误差 (MSE):150749.9629
均方根误差 (RMSE): 388.2653
平均绝对误差 (MAE): 286.9628
R²:0.9994
在这里插入图片描述
在这里插入图片描述

完整代码

import re
import streamlit as st
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import plotly.graph_objects as go

# 文件路径
sp3_file_paths = ['igs16370.sp3', 'igs16371.sp3', 'igs16372.sp3']
output_csv_path = 'output_data.csv'

# 解析 SP3 文件,提取所有数据列
def extract_sp3_data(file_path):
    satellite_data_pattern = r"^PG(\d{2})\s+([-]?\d+\.\d+)\s+([-]?\d+\.\d+)\s+([-]?\d+\.\d+)\s+([-]?\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)"
    satellite_ids, timestamps, x_coords, y_coords, z_coords = [], [], [], [], []
    col_5, col_6, col_7, col_8 = [], [], [], []  # 额外四列

    with open(file_path, 'r') as file:
        for line in file:
            match = re.match(satellite_data_pattern, line)
            if match:
                satellite_id = match.group(1)
                x = float(match.group(2))
                y = float(match.group(3))
                z = float(match.group(4))
                col_5_value = float(match.group(5))
                col_6_value = int(match.group(6))
                col_7_value = int(match.group(7))
                col_8_value = int(match.group(8))

                satellite_ids.append(satellite_id)
                x_coords.append(x)
                y_coords.append(y)
                z_coords.append(z)
                col_5.append(col_5_value)
                col_6.append(col_6_value)
                col_7.append(col_7_value)
                col_8.append(col_8_value)

    data_df = pd.DataFrame({
        "Satellite_ID": satellite_ids,
        "X": x_coords,
        "Y": y_coords,
        "Z": z_coords,
        "Col5": col_5,
        "Col6": col_6,
        "Col7": col_7,
        "Col8": col_8,
    })

    # 计算周期性时间特征
    num_rows = len(data_df)
    data_df['sin_time'] = np.sin(2 * np.pi * np.arange(num_rows) / num_rows)
    data_df['cos_time'] = np.cos(2 * np.pi * np.arange(num_rows) / num_rows)

    # 计算速度(X, Y, Z 差分)
    speed = np.sqrt(np.diff(data_df['X'])**2 + np.diff(data_df['Y'])**2 + np.diff(data_df['Z'])**2)
    speed = np.append(speed, np.nan)  # 确保列长度一致
    data_df['Speed'] = speed

    # 计算加速度(X, Y, Z 的二阶差分)
    acc_x = np.diff(data_df['X'], n=2)
    acc_y = np.diff(data_df['Y'], n=2)
    acc_z = np.diff(data_df['Z'], n=2)
    acc_x = np.append(acc_x, [np.nan, np.nan])  # 补充 NaN 保持长度一致
    acc_y = np.append(acc_y, [np.nan, np.nan])
    acc_z = np.append(acc_z, [np.nan, np.nan])
    data_df['Accel_X'] = acc_x
    data_df['Accel_Y'] = acc_y
    data_df['Accel_Z'] = acc_z

    return data_df

# 提取所有文件的数据并合并
all_data_df = pd.DataFrame()
for file_path in sp3_file_paths:
    data_df = extract_sp3_data(file_path)
    all_data_df = pd.concat([all_data_df, data_df], ignore_index=True)

# 保存合并后的数据到 CSV
all_data_df.to_csv(output_csv_path, index=False)
print("数据已提取并保存到 CSV 文件。")

# 加载数据
@st.cache_data
def load_data():
    data_path = output_csv_path
    data_df = pd.read_csv(data_path)
    return data_df

# 加载并筛选数据
data_df = load_data()
satellite_ids = data_df['Satellite_ID'].unique()
st.title("卫星轨道预测与轨迹展示")
satellite_id = st.selectbox("选择卫星编号", satellite_ids)
satellite_data = data_df[data_df['Satellite_ID'] == satellite_id]

# 使用历史数据作为真实轨迹数据进行对比
real_satellite_data = satellite_data

# 特征列表
features = ['X', 'Y', 'Z', 'sin_time', 'cos_time', 'Speed', 'Accel_X', 'Accel_Y', 'Accel_Z']

missing_features = [feature for feature in features if feature not in satellite_data.columns]
if missing_features:
    st.error(f"缺少以下特征列:{', '.join(missing_features)}")
else:
    # 数据预处理
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(satellite_data[features])

    # 创建训练序列
    def create_sequences(data, seq_length=20):
        X, y = [], []
        for i in range(len(data) - seq_length):
            X.append(data[i:i+seq_length])
            y.append(data[i + seq_length][:3])  # 仅预测 X, Y, Z
        return np.array(X), np.array(y)

    seq_length = 20
    X, y = create_sequences(scaled_data, seq_length)

    # 转换为 PyTorch 张量
    X_tensor = torch.tensor(X, dtype=torch.float32)
    y_tensor = torch.tensor(y, dtype=torch.float32)

    # 创建 DataLoader
    batch_size = 32
    dataset = TensorDataset(X_tensor, y_tensor)
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # 定义 PyTorch LSTM 模型
    class LSTMModel(nn.Module):
        def __init__(self, input_size, hidden_size, output_size, num_layers=2):
            super(LSTMModel, self).__init__()
            self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=0.2)
            self.fc = nn.Linear(hidden_size, output_size)

        def forward(self, x):
            lstm_out, (h_n, c_n) = self.lstm(x)
            out = self.fc(lstm_out[:, -1, :])
            return out

    # 初始化模型
    input_size = X.shape[2]
    hidden_size = 64
    output_size = 3
    model = LSTMModel(input_size, hidden_size, output_size)

    # 损失函数与优化器
    criterion = nn.MSELoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

    # 学习率调度器
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10, verbose=True)

    # 存储每个 epoch 的损失值
    loss_values = []

    # 训练模型
    st.write("训练模型中,请稍等...")
    num_epochs = 50
    best_loss = float('inf')
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            output = model(batch_X)
            loss = criterion(output, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(train_loader)
        loss_values.append(avg_loss)  # 保存每个 epoch 的平均损失值
        scheduler.step(avg_loss)

        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), 'best_lstm_model.pth')

        if (epoch + 1) % 10 == 0:
            st.write(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_loss:.4f}")

    # 绘制损失值的可视化图
    st.subheader("训练损失值变化图:")
    import matplotlib.pyplot as plt

    plt.figure(figsize=(10, 6))
    plt.plot(range(1, num_epochs + 1), loss_values, marker='o', linestyle='-', color='blue', label='Training Loss')
    plt.title("Training Loss over Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.grid()
    plt.legend()
    st.pyplot(plt)


    # 最佳模型
    model.load_state_dict(torch.load('best_lstm_model.pth'))
    model.eval()

    # 预测结果
    with torch.no_grad():
        predicted_scaled = model(X_tensor).numpy()

    # 反标准化预测结果
    predicted_df = pd.DataFrame(predicted_scaled, columns=['X_scaled', 'Y_scaled', 'Z_scaled'])

    # 拼接其他特征
    predicted_sin_time = np.tile(satellite_data['sin_time'].iloc[-1], (predicted_df.shape[0], 1))
    predicted_cos_time = np.tile(satellite_data['cos_time'].iloc[-1], (predicted_df.shape[0], 1))
    predicted_speed = np.tile(satellite_data['Speed'].iloc[-1], (predicted_df.shape[0], 1))
    predicted_accel_x = np.tile(satellite_data['Accel_X'].iloc[-1], (predicted_df.shape[0], 1))
    predicted_accel_y = np.tile(satellite_data['Accel_Y'].iloc[-1], (predicted_df.shape[0], 1))
    predicted_accel_z = np.tile(satellite_data['Accel_Z'].iloc[-1], (predicted_df.shape[0], 1))

    predicted_full_features = np.hstack((
        predicted_df[['X_scaled', 'Y_scaled', 'Z_scaled']].values,
        predicted_sin_time,
        predicted_cos_time,
        predicted_speed,
        predicted_accel_x,
        predicted_accel_y,
        predicted_accel_z
    ))

    predicted_full_features_inverse = scaler.inverse_transform(predicted_full_features)

    predicted_coordinates = predicted_full_features_inverse[:, :3]
    predicted_df[['X', 'Y', 'Z']] = predicted_coordinates

    st.subheader("预测结果:")
    st.write(predicted_df[['X', 'Y', 'Z']])

    # 计算误差
    true_coordinates = real_satellite_data[['X', 'Y', 'Z']].iloc[seq_length:].values  # 使用真实的坐标数据

    mse = mean_squared_error(true_coordinates, predicted_df[['X', 'Y', 'Z']].values)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(true_coordinates, predicted_df[['X', 'Y', 'Z']].values)
    r2 = r2_score(true_coordinates, predicted_df[['X', 'Y', 'Z']].values)

    st.subheader("精度评定:")
    st.write(f"均方误差 (MSE): {mse:.4f}")
    st.write(f"均方根误差 (RMSE): {rmse:.4f}")
    st.write(f"平均绝对误差 (MAE): {mae:.4f}")
    st.write(f"R²: {r2:.4f}")

    # 可视化预测轨迹与历史数据对比
    fig = go.Figure()

    # 绘制历史轨迹,仅有一条轨迹
    fig.add_trace(go.Scatter3d(
        x=satellite_data['X'], y=satellite_data['Y'], z=satellite_data['Z'],
        mode='lines',
        line=dict(color='cyan', width=4)
    ))

    fig.add_trace(go.Scatter3d(
        x=predicted_coordinates[:, 0], y=predicted_coordinates[:, 1], z=predicted_coordinates[:, 2],
        mode='lines',
        line=dict(color='red', width=4)
    ))

    # 更新布局,设置标题和坐标轴标签
    fig.update_layout(
        title="历史轨迹与预测轨迹对比",
        scene=dict(xaxis_title='X', yaxis_title='Y', zaxis_title='Z'),
        margin=dict(r=0, l=0, b=0, t=0)
    )

    # 显示图表
    st.plotly_chart(fig)


http://www.kler.cn/a/415070.html

相关文章:

  • 人脸识别API解锁智能生活、C++人脸识别接口软文
  • 快速排序hoare版本和挖坑法(代码注释版)
  • SpringBoot 集成MQTT实现消息订阅
  • Flask项目入门—会话技术Cookie和Session
  • 初试无监督学习 - K均值聚类算法
  • Dockerfile打包部署
  • 【HarmonyOS开发模板/组件分享 – 用户隐私政策弹窗】
  • 贪心算法-Huffman树 不等式 推公式
  • iscsi服务器
  • [问题记录] Android裁剪Provision应用后无法打开开发者选项
  • 基于Linux操作系统的DNS服务器实验
  • python网络爬虫进阶
  • 全面解析LLM业务落地:RAG技术的创新应用、ReAct的智能化实践及基于业务场景的评估框架设计
  • 开发一套ERP 第七弹 RUst 操作数据库
  • 全国1000米分辨率逐月植被覆盖度(FVC)数据集(2000-2024)
  • 网络安全——--网络安全的基本概念--病毒防护--入侵检测技术与防火墙--虚拟专用网
  • C#里怎么样使用继承实现不同的功能,以及调用基类函数?
  • 在Linux中备份msyql数据库和表的详细操作
  • 【ChatGPT大模型开发调用】如何获得 OpenAl API Key?
  • Linux系统管理基础指南--习题
  • Python3 爬虫 Scrapy的安装
  • Docker容器ping不通外网问题排查及解决
  • 【uniapp】轮播图
  • 力扣整理版十:动态规划(待更新)
  • 【CLIP】3: semantic-text2image-search允许局域网访问
  • 卷积神经网络实现图像分类