fine_tune_tansat2
import torch
import torch.nn as nn
import math
import numpy as np
# 1. Positional Encoding (位置编码)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
# 创建一个 max_len * d_model 的位置编码矩阵
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置
pe[:, 1::2] = torch.cos(position * div_term) # 奇数位置
pe = pe.unsqueeze(0) # 增加 batch 维度
self.register_buffer('pe', pe) # 这个变量不会被训练
def forward(self, x):
# 将位置编码加到输入 x 上,x 的尺寸为 (batch_size, seq_len, d_model)
x = x + self.pe[:, :x.size(1), :]
return x
# 2. Multi-Head Attention (多头自注意力机制)
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, nhead):
super(MultiHeadAttention, self).__init__()
assert d_model % nhead == 0, "d_model 必须是 nhead 的倍数"
self.d_model = d_model
self.nhead = nhead
self.d_k = d_model // nhead # 每个头的维度
# Q, K, V 的线性变换
self.w_q = nn.Linear(d_model, d_model)
self.w_k = nn.Linear(d_model, d_model)
self.w_v = nn.Linear(d_model, d_model)
self.w_o = nn.Linear(d_model, d_model)
def attention(self, q, k, v, mask=None):
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
attn = torch.softmax(scores, dim=-1)
output = torch.matmul(attn, v)
return output, attn
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
# 线性变换后进行分头 (batch_size, nhead, seq_len, d_k)
q = self.w_q(query).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
k = self.w_k(key).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
v = self.w_v(value).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)
# 计算多头注意力
attn_output, attn = self.attention(q, k, v, mask)
# 合并多头的结果 (batch_size, seq_len, d_model)
attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
# 输出线性变换
output = self.w_o(attn_output)
return output
# 3. Feed Forward Network (前馈神经网络)
class FeedForward(nn.Module):
def __init__(self, d_model, dim_feedforward, dropout=0.1):
super(FeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, dim_feedforward)
self.fc2 = nn.Linear(dim_feedforward, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
x = self.dropout(torch.relu(self.fc1(x)))
x = self.fc2(x)
return x
# 4. Transformer Encoder Layer
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
super(TransformerEncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, nhead)
self.feed_forward = FeedForward(d_model, dim_feedforward, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, src, mask=None):
# 多头自注意力
src2 = self.self_attn(src, src, src, mask)
src = src + self.dropout(src2)
src = self.norm1(src)
# 前馈神经网络
src2 = self.feed_forward(src)
src = src + self.dropout(src2)
src = self.norm2(src)
return src
# 5. Transformer Regression Model
class TransformerRegression(nn.Module):
def __init__(self, input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, extra_features=[],dropout=0.1, max_len=64, extra_features_dim=0):
super(TransformerRegression, self).__init__()
self.input_fc = nn.Linear(input_dim, d_model)
self.position_encoding = PositionalEncoding(d_model, max_len)
# 创建多个编码器层
self.encoder_layers = nn.ModuleList([
TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
for _ in range(num_encoder_layers)
])
# 新增额外特征的维度
self.extra_features_dim = extra_features_dim
self.norm = nn.LayerNorm(d_model + extra_features_dim)
self.norm1 = nn.LayerNorm(128)
self.norm2 = nn.LayerNorm(64)
self.norm3 = nn.LayerNorm(32)
self.norm4 = nn.LayerNorm(64)
self.norm5 = nn.LayerNorm(128)
# 定义多个全连接层
self.fc1 = nn.Linear(d_model + extra_features_dim, 128) # 第一层
self.fc2 = nn.Linear(128, 64) # 第二层
self.fc3 = nn.Linear(64, 32) # 第三层
self.fc4 = nn.Linear(32, 64) # 第四层
self.fc5 = nn.Linear(64, 128) # 第五层
self.output_fc = nn.Linear(128, 1) # 输出层
def forward(self, x, extra_features=None, mask=None):
# 输入线性变换和位置编码
x = self.input_fc(x).unsqueeze(1) # 增加序列维度,并确保输入的序列长度是1
x = self.position_encoding(x)
# 通过多个编码器层
for layer in self.encoder_layers:
x = layer(x, mask)
# 移除序列维度
x = x.squeeze(1)
# 如果有额外特征,拼接它们
if extra_features is not None:
# 拼接额外特征 (batch_size, d_model + extra_features_dim)
#对x做变换 随着priori的大小进行变化 torch.max(x, dim=1)[0]
# print((x/torch.norm(x, dim=1, keepdim=True)).shape)
extra_features[:,20]=((torch.cos(torch.deg2rad(extra_features[:,20])).unsqueeze(1)+1)/2*((torch.max(extra_features[:,:20], dim=1)[0]-torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))+(torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1)).squeeze(1)#longitude cos -1~1
extra_features[:,21]=((torch.sin(torch.deg2rad(extra_features[:,21])).unsqueeze(1)+1)/2*((torch.max(extra_features[:,:20], dim=1)[0]-torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))+(torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1)).squeeze(1)#latitude sin -1~1
extra_features[:,22:24]=(torch.cos(torch.deg2rad(extra_features[:,22:24]))*((torch.max(extra_features[:,:20], dim=1)[0]-torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))+(torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1)).squeeze(1)#sen&solar zen cos 0~1
extra_features[:,24:26]=((torch.cos(torch.deg2rad(extra_features[:,24:26]))+1)/2*((torch.max(extra_features[:,:20], dim=1)[0]-torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))+(torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1)).squeeze(1)#sen&solar azi cos -1~1
# print((torch.max(x, dim=1)[0]-torch.min(x, dim=1)[0]).shape)
# print((torch.min(x, dim=1)[0]).shape)
# print((x/torch.norm(x, dim=1, keepdim=True))*((torch.max(x, dim=1)[0]-torch.min(x, dim=1)[0]).unsqueeze(1)))
x = (x/torch.norm(x, dim=1, keepdim=True))*((torch.max(extra_features[:,:20], dim=1)[0]-torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))+(torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1)
# x = (x-torch.min(x))/(torch.max(x)-torch.min(x))*(torch.max(extra_features[:,:20])-torch.min(extra_features[:,:20]))+torch.min(extra_features[:,:20])
x = torch.cat([x, extra_features], dim=1)
# 通过全连接层
x1 = torch.relu(self.fc1(self.norm(x))) # 第一层 128
x2 = torch.relu(self.fc2(self.norm1(x1))) # 第二层 64
x3 = torch.relu(self.fc3(self.norm2(x2))) # 第三层 32
x4 = torch.relu(self.fc4(self.norm3(x3))) # 第三层 64
x5 = torch.relu(self.fc5(self.norm4(x4+x2))) # 第三层 128
# 输出回归值
# output = self.output_fc(self.norm5(x5+x1))
x_2rd_last=self.norm5(x5+x1)
output = self.output_fc((x_2rd_last/torch.norm(x_2rd_last, dim=1, keepdim=True))*((torch.max(extra_features[:,:20], dim=1)[0]-torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))+(torch.min(extra_features[:,:20], dim=1)[0]).unsqueeze(1))
return output
#weitiao
# data_in=np.load('out_tansat_0218(1).npy')
from sklearn.model_selection import train_test_split
def load_data_from_csv(file_path):
# 使用pandas读取CSV文件
scaler_file=np.load(r'D:/Transf_learning/Tansat2/fine_tune/scaler_2020_246810m.npy')
max_num_list=scaler_file[0,:]
min_num_list=scaler_file[1,:]
data = np.load(file_path)
# 前7列是特征,最后一列是输出
X = data[:, :1016] # 取所有行的前7列作为输入特征
X_ex=data[:, 1016:-1]
y = data[:, -1] # 取所有行的最后一列作为输出
X = (np.array(X)-np.array(min_num_list))/(np.array(max_num_list)-np.array(min_num_list))
# X_ex=scaler.fit_transform(X_ex) 额外变量不归一化
return X,X_ex, y
def create_dataset(X, extra_features, y, test_size=0.1):
# 数据集拆分为训练集和测试集
X_train, X_test, y_train, y_test, extra_train, extra_test = train_test_split(
X, y, extra_features, test_size=test_size, random_state=42
)
# 转换为PyTorch张量
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1) # y 需要是2D张量
extra_train_tensor = torch.tensor(extra_train, dtype=torch.float32) # 额外特征
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1) # y 需要是2D张量
extra_test_tensor = torch.tensor(extra_test, dtype=torch.float32) # 额外特征
return X_train_tensor,extra_train_tensor,y_train_tensor,X_test_tensor,extra_test_tensor,y_test_tensor
def test_model(model, X_train_tensor,extra_train_tensor,y_train_tensor):
model.eval()
criterion = nn.MSELoss()
total_loss = 0
with torch.no_grad():
# 如果有额外特征,使用它们(同样需要定义如何获取)
# extra_features = ... # 这里需要定义如何获取额外特征
output = model(X_train_tensor.cuda(), extra_features=extra_train_tensor.cuda()) # 传入额外特征
loss = criterion(output, y_train_tensor.cuda())
total_loss += loss.item()
return total_loss
X,X_ex, y=load_data_from_csv('out_tansat_0218(1).npy')
# X,X_ex=torch.Tensor(X),torch.tensor(X_ex)
X_train_tensor,extra_train_tensor,y_train_tensor,X_test_tensor,extra_test_tensor,y_test_tensor=create_dataset(X, X_ex, y, test_size=0.1)
model=torch.load('model_transformer_change_angle_1116.pth')
learning_rate=1e-5
criterion = nn.MSELoss() # 均方误差损失函数,用于回归任务
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
num_epochs=50
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
# 提取当前批次的数据
# 训练步骤
optimizer.zero_grad()
outputs = model(X_train_tensor.cuda(),extra_train_tensor.cuda())
loss = criterion(outputs, torch.Tensor(y).cuda())
loss.backward()
optimizer.step()
running_loss += loss.item()
train_loss_= test_model(model, X_train_tensor,extra_train_tensor,y_train_tensor)
test_loss_ = test_model(model, X_test_tensor,extra_test_tensor,y_test_tensor)
print(f"Epoch [{epoch + 1}/{num_epochs}], Train Loss: {train_loss_}")
print(f"Epoch [{epoch + 1}/{num_epochs}], Test Loss: {test_loss_}")
大致结果
def out_data(model, X_train_tensor,extra_train_tensor,y_train_tensor):
model.eval()
# criterion = nn.MSELoss()
# total_loss = 0
with torch.no_grad():
# 如果有额外特征,使用它们(同样需要定义如何获取)
# extra_features = ... # 这里需要定义如何获取额外特征
output = model(X_train_tensor.cuda(), extra_features=extra_train_tensor.cuda()) # 传入额外特征
# loss = criterion(output, y_train_tensor.cuda())
# total_loss += loss.item()
return np.hstack(output.cpu().numpy().squeeze()),np.hstack(np.array(y_train_tensor))
def calc_Rsquare(data1, data2):
R = np.corrcoef(data1, data2)
return R[0, 1] * R[0, 1]
def calc_RMSE(data1, data2):
aver = np.mean(np.power(data1 - data2, 2))
return np.sqrt(aver)
def calc_MAPE(y_true, y_pred):
return np.mean(np.abs((y_pred - y_true) / y_true)) * 100
def best_fit_slope_and_intercept(xs, ys):
m = (((np.mean(xs) * np.mean(ys)) - np.mean(xs * ys)) / ((np.mean(xs) * np.mean(xs)) - np.mean(xs * xs)))
b = np.mean(ys) - m * np.mean(xs)
return m, b
y_p,y_t=out_data(model,X_train_tensor,extra_train_tensor,y_train_tensor)
k,b=best_fit_slope_and_intercept(y_t, y_p)
r2=calc_Rsquare(y_t,y_p)
rmse=calc_RMSE(y_t,y_p)
mape=calc_MAPE(y_t,y_p)
print(f"train curve: y={k}*x+{b}")
print(f"R2,RMSE,MAPE: {r2},{rmse},{mape}")