Python 实现深度学习模型预测控制--预测模型构建
链接:深度学习模型预测控制 (如果认为有用,动动小手为我点亮github小星星哦),持续更新中……
链接:WangXiaoMingo/TensorDL-MPC: DL-MPC(deep learning model predictive control) is a software toolkit developed based on the Python and TensorFlow frameworks, designed to enhance the performance of traditional Model Predictive Control (MPC) through deep learning technology. This toolkit provides core functionalities such as model training, simulation, parameter optimization. (github.com)https://github.com/WangXiaoMingo/TensorDL-MPC此文档为如何使用深度学习模型预测控制系列文档。
使用 TensorDL-MPC 进行 MPC 控制,包括初始化系统、训练模型、执行 MPC 控制、模拟系统动力学等。
1. 构建模型
方法1:可以直接加载模型进行网络训练。
方法2:同样允许我们自己构建特定模型。使用方法与使用TensorFlow的Keras API构建方式类似,包括Sequential模型、函数式API(Functional API)、以及子类化(Subclassing)方法。这些方法都可以完成从模型构建到训练的整个过程。下面将分别展示这些方式的具体实现。
1.1 使用Sequential模型
import tensorflow as tf
from tensorflow.keras import layers, models
# 使用Sequential API 构建模型
model = models.Sequential([
layers.Dense(128, activation='relu', input_shape=(784,)), # 输入层
layers.Dense(64, activation='relu'), # 隐藏层
layers.Dense(10, activation='softmax') # 输出层
])
# 编译模型
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.summary()
1.2 函数式API(Functional API)
函数式API允许创建更复杂的模型结构,例如多输入、多输出或共享层的模型。
from tensorflow.keras import Input, Model
# 使用Functional API 构建模型
inputs = Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = Model(inputs=inputs, outputs=outputs)
# 编译模型
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
model.summary()
也可以自定义类实现模型封装,如:
# dlmpc/models/BPMM.py
import tensorflow as tf
from tensorflow import keras
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from layers.normlization import MinMaxNormalization
import numpy as np
import random
import os
# 固定随机数种子
random_seed = 42
random.seed(random_seed) # set random seed for python
np.random.seed(random_seed) # set random seed for numpy
tf.random.set_seed(random_seed) # set random seed for tensorflow-cpu
os.environ['TF_DETERMINISTIC_OPS'] = '1' # set random seed for tensorflow-gpu
# warnings.filterwarnings("ignore")
class MultiBPNet():
def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
use_mask=False):
"""
初始化函数
参数:
hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
dim_u (int): 输入的维度,默认为1。必须为整数。
dim_x (int): 输出的维度,默认为3。必须为整数。
dim_y (int): 输出的维度,默认为1。必须为整数。
feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
use_mask (bool): 是否启用掩码,默认为False。
"""
if not isinstance(hidden_blocks, int):
raise ValueError("hidden_blocks must be an integer")
if not isinstance(dim_u, int):
raise ValueError("dim_u must be an integer")
if not isinstance(dim_x, int):
raise ValueError("dim_x must be an integer")
super().__init__()
self.nblocks = hidden_blocks
self.dim_u = dim_u
self.dim_x = dim_x
self.dim_y = dim_y
self.feature_range = feature_range
self.use_mask = use_mask
self.min_val_x, self.min_val_u = self._validate_values(min_val)
self.max_val_x, self.max_val_u = self._validate_values(max_val)
self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x, max_val=self.max_val_x)
self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u, max_val=self.max_val_u)
def _validate_values(self, values):
if values is not None:
if not (isinstance(values, (list, tuple)) and len(values) == 2):
raise ValueError("values must be a list or tuple of length 2")
return values[0], values[1]
return None, None
def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
if min_val is not None and max_val is not None:
return normalization_layer(input_tensor)
return input_tensor
def build(self, units=32, time_step = 3 ,u_steps = 1, data_type='1D'):
# 构建模型,2个输入,分别x和u
input_x = keras.Input(shape=(time_step,self.dim_x))
input_u = keras.Input(shape=(u_steps,self.dim_u))
if self.use_mask:
input_x = keras.layers.Masking(mask_value=0.0)(input_x)
input_u = keras.layers.Masking(mask_value=0.0)(input_u)
norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)
inputs = self._concatenate_inputs(units,norm_x, norm_u, data_type)
for j in range(self.nblocks):
x = keras.layers.Dense(units, activation='linear')(inputs)
inputs = x
out = keras.layers.Dense(self.dim_y, activation='linear')(x)
out = out[:, -1]
model = keras.models.Model(inputs=[input_x, input_u], outputs=out, name='BP')
return model
def _concatenate_inputs(self, units,norm_x, norm_u, data_type):
if data_type == '1D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1] # 0.0002 0.0133 0.0105 0.9896 0.9915 0.0272
return tf.concat([tf.reshape(norm_x,[-1,1,_dim_1x * self.dim_x]), tf.reshape(norm_u,[-1,1,_dim_1u * self.dim_u])], axis=2)
elif data_type == '2D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
x = keras.layers.Dense(units, activation='linear')(norm_x)
u = keras.layers.Dense(units, activation='linear')(norm_u)
# return tf.concat([x, u], axis=1) # 0.0034 0.0581 0.0464 0.8209 0.8454 0.1254; 0.0027 0.0521 0.0407 0.7753 0.7776 0.1065
# return x @ tf.transpose(u,[0,2,1]) # 0.0003 0.0161 0.0127 0.9833 0.9858 0.0328, 0.0001 0.0118 0.0097 0.9916 0.9917 0.0262
# return u @ tf.transpose(x, [0, 2, 1]) # 0.0004 0.0197 0.0152 0.9732 0.9768 0.0382; 0.0002 0.0126 0.0103 0.9904 0.9905 0.028
return tf.reshape(tf.concat([x, u], axis=1),[-1,1,(_dim_1x+_dim_1u)*units]) # 0.0001 0.0117 0.0093 0.9919 0.9919 0.0248; 0.0002 0.0145 0.0118 0.9877 0.9904 0.031
else:
raise ValueError("Unsupported data_type. Choose from '1D', '2D'")
if __name__ == '__main__':
min_val_x = tf.constant([0.,0,0]) # 手动指定的最小值
max_val_x = tf.constant([10,10,10.]) # 手动指定的最大值
min_val_u = tf.constant([0.,0]) # 手动指定的最小值
max_val_u = tf.constant([10.,10.]) # 手动指定的最大值
min_val = [min_val_x,min_val_u]
max_val = [max_val_x, max_val_u]
model = MultiBPNet(hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val,
use_mask=True).build(units=32, data_type='1D')
model.compile(optimizer='adam', loss='mse')
model.summary()
print(model.name)
1.3 使用子类化(Subclassing)方法
子类化方法允许对模型的结构和前向传播过程进行最大程度的控制。
class MyModel(tf.keras.Model):
def __init__(self):
super(MyModel, self).__init__()
self.dense1 = layers.Dense(128, activation='relu')
self.dense2 = layers.Dense(64, activation='relu')
self.dense3 = layers.Dense(10, activation='softmax')
def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
return self.dense3(x)
model = MyModel()
# 编译模型
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
对于更复杂的功能,可以如下:
import tensorflow as tf
from tensorflow import keras
class MinMaxNormalization(keras.layers.Layer):
def __init__(self, feature_range=(0, 1), min_val=None, max_val=None, **kwargs):
super(MinMaxNormalization, self).__init__(**kwargs)
self.feature_range = feature_range
self.min_val = min_val
self.max_val = max_val
def call(self, inputs):
if self.min_val is not None and self.max_val is not None:
scaled = (inputs - self.min_val) / (self.max_val - self.min_val)
return scaled * (self.feature_range[1] - self.feature_range[0]) + self.feature_range[0]
return inputs
class BPNeuralNetwork(keras.Model):
def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
use_mask=False):
"""
初始化函数
参数:
hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
dim_u (int): 输入的维度,默认为1。必须为整数。
dim_x (int): 输出的维度,默认为3。必须为整数。
dim_y (int): 输出的维度,默认为1。必须为整数。
feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
use_mask (bool): 是否启用掩码,默认为False。
"""
super(BPNeuralNetwork, self).__init__()
if not isinstance(hidden_blocks, int):
raise ValueError("hidden_blocks must be an integer")
if not isinstance(dim_u, int):
raise ValueError("dim_u must be an integer")
if not isinstance(dim_x, int):
raise ValueError("dim_x must be an integer")
self.nblocks = hidden_blocks
self.dim_u = dim_u
self.dim_x = dim_x
self.dim_y = dim_y
self.feature_range = feature_range
self.use_mask = use_mask
self.min_val_x, self.min_val_u = self._validate_values(min_val)
self.max_val_x, self.max_val_u = self._validate_values(max_val)
self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
max_val=self.max_val_x)
self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
max_val=self.max_val_u)
self.dense_layers = [keras.layers.Dense(32, activation='linear') for _ in range(self.nblocks)]
self.output_layer = keras.layers.Dense(self.dim_y, activation='linear')
# if data_type == '2D':
self.dense_x = keras.layers.Dense(32, activation='linear')
self.dense_u = keras.layers.Dense(32, activation='linear')
def _validate_values(self, values):
if values is not None:
if not (isinstance(values, (list, tuple)) and len(values) == 2):
raise ValueError("values must be a list or tuple of length 2")
return values[0], values[1]
return None, None
def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
if min_val is not None and max_val is not None:
return normalization_layer(input_tensor)
return input_tensor
def _concatenate_inputs(self, norm_x, norm_u, data_type):
if data_type == '1D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
elif data_type == '2D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
x = self.dense_x(norm_x)
u = self.dense_u(norm_u)
# return x @ tf.transpose(u,[0,2,1])
return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * 32])
else:
raise ValueError("Unsupported data_type. Choose from '1D', '2D'")
def call(self, inputs, data_type='1D'):
input_x, input_u = inputs
if self.use_mask:
input_x = keras.layers.Masking(mask_value=0.0)(input_x)
input_u = keras.layers.Masking(mask_value=0.0)(input_u)
norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)
inputs = self._concatenate_inputs(norm_x, norm_u, data_type)
for j in range(self.nblocks):
x = self.dense_layers[j](inputs)
inputs = x
out = self.output_layer(x)
out = out[:, -1]
return out
if __name__ == '__main__':
min_val_x = tf.constant([0., 0, 0]) # 手动指定的最小值
max_val_x = tf.constant([10, 10, 10.]) # 手动指定的最大值
min_val_u = tf.constant([0., 0]) # 手动指定的最小值
max_val_u = tf.constant([10., 10.]) # 手动指定的最大值
min_val = [min_val_x, min_val_u]
max_val = [max_val_x, max_val_u]
model = BPNeuralNetwork(hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
# model.compile(optimizer='adam', loss='mse')
# model.build([(None, 3, 3), (None, 1, 1)])
model.summary()
这三种方法各有优缺点:
- Sequential API: 最简单,但只适用于线性堆叠的模型。
- Functional API: 灵活性好,可以处理复杂模型结构,同时保持了简洁性。
- Model Subclassing: 最灵活,但相对复杂,适合需要自定义层或复杂逻辑的情况。
当然,我们也可以利用三种方式的组合实现更复杂的模型构建
1.4 使用函数式API(Functional API)构建更复杂模型
方法1: Model Subclassing+Functional API
# dlmpc/models/BPNN1.py
import tensorflow as tf
from tensorflow import keras
class MinMaxNormalization(keras.layers.Layer):
def __init__(self, feature_range=(0, 1), min_val=None, max_val=None, **kwargs):
super(MinMaxNormalization, self).__init__(**kwargs)
self.feature_range = feature_range
self.min_val = min_val
self.max_val = max_val
def call(self, inputs):
if self.min_val is not None and self.max_val is not None:
scaled = (inputs - self.min_val) / (self.max_val - self.min_val)
return scaled * (self.feature_range[1] - self.feature_range[0]) + self.feature_range[0]
return inputs
class BPNeuralNetwork1(keras.Model):
def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
use_mask=False):
"""
初始化函数
参数:
hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
dim_u (int): 输入的维度,默认为1。必须为整数。
dim_x (int): 输出的维度,默认为3。必须为整数。
dim_y (int): 输出的维度,默认为1。必须为整数。
feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
use_mask (bool): 是否启用掩码,默认为False。
"""
super(BPNeuralNetwork1, self).__init__()
if not isinstance(hidden_blocks, int):
raise ValueError("hidden_blocks must be an integer")
if not isinstance(dim_u, int):
raise ValueError("dim_u must be an integer")
if not isinstance(dim_x, int):
raise ValueError("dim_x must be an integer")
self.nblocks = hidden_blocks
self.dim_u = dim_u
self.dim_x = dim_x
self.dim_y = dim_y
self.feature_range = feature_range
self.use_mask = use_mask
self.min_val_x, self.min_val_u = self._validate_values(min_val)
self.max_val_x, self.max_val_u = self._validate_values(max_val)
self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
max_val=self.max_val_x)
self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
max_val=self.max_val_u)
self.dense_layers = [keras.layers.Dense(32, activation='linear') for _ in range(self.nblocks)]
self.output_layer = keras.layers.Dense(self.dim_y, activation='linear')
# if data_type == '2D':
self.dense_x = keras.layers.Dense(32, activation='linear')
self.dense_u = keras.layers.Dense(32, activation='linear')
def _validate_values(self, values):
if values is not None:
if not (isinstance(values, (list, tuple)) and len(values) == 2):
raise ValueError("values must be a list or tuple of length 2")
return values[0], values[1]
return None, None
def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
if min_val is not None and max_val is not None:
return normalization_layer(input_tensor)
return input_tensor
def _concatenate_inputs(self, norm_x, norm_u, data_type):
if data_type == '1D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
elif data_type == '2D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
x = self.dense_x(norm_x)
u = self.dense_u(norm_u)
# return x @ tf.transpose(u,[0,2,1])
return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * 32])
else:
raise ValueError("Unsupported data_type. Choose from '1D', '2D'")
def call(self, inputs, data_type='1D'):
input_x, input_u = inputs
if self.use_mask:
input_x = keras.layers.Masking(mask_value=0.0)(input_x)
input_u = keras.layers.Masking(mask_value=0.0)(input_u)
norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)
inputs = self._concatenate_inputs(norm_x, norm_u, data_type)
for j in range(self.nblocks):
x = self.dense_layers[j](inputs)
inputs = x
out = self.output_layer(x)
out = out[:, -1]
return out
if __name__ == '__main__':
min_val_x = tf.constant([0., 0, 0]) # 手动指定的最小值
max_val_x = tf.constant([10, 10, 10.]) # 手动指定的最大值
min_val_u = tf.constant([0., 0]) # 手动指定的最小值
max_val_u = tf.constant([10., 10.]) # 手动指定的最大值
min_val = [min_val_x, min_val_u]
max_val = [max_val_x, max_val_u]
input_x = keras.layers.Input(shape=(3, 3))
input_u = keras.layers.Input(shape=(1, 2))
model = BPNeuralNetwork1(hidden_blocks=3, dim_u=2, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
output = model([input_x, input_u], data_type='1D')
model = keras.models.Model(inputs=[input_x, input_u], outputs=output, name='BP')
方法2: Model Subclassing+Functional API
# dlmpc/models/BPNN.py
import tensorflow as tf
from tensorflow import keras
class BPNeuralNetwork(keras.Model):
def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
use_mask=False):
"""
初始化函数
参数:
hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
dim_u (int): 输入的维度,默认为1。必须为整数。
dim_x (int): 输出的维度,默认为3。必须为整数。
dim_y (int): 输出的维度,默认为1。必须为整数。
feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
use_mask (bool): 是否启用掩码,默认为False。
"""
super(BPNeuralNetwork, self).__init__()
if not isinstance(hidden_blocks, int):
raise ValueError("hidden_blocks must be an integer")
if not isinstance(dim_u, int):
raise ValueError("dim_u must be an integer")
if not isinstance(dim_x, int):
raise ValueError("dim_x must be an integer")
self.nblocks = hidden_blocks
self.dim_u = dim_u
self.dim_x = dim_x
self.dim_y = dim_y
self.feature_range = feature_range
self.use_mask = use_mask
self.min_val_x, self.min_val_u = self._validate_values(min_val)
self.max_val_x, self.max_val_u = self._validate_values(max_val)
self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
max_val=self.max_val_x)
self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
max_val=self.max_val_u)
def _validate_values(self, values):
if values is not None:
if not (isinstance(values, (list, tuple)) and len(values) == 2):
raise ValueError("values must be a list or tuple of length 2")
return values[0], values[1]
return None, None
def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
if min_val is not None and max_val is not None:
return normalization_layer(input_tensor)
return input_tensor
def _concatenate_inputs(self, norm_x, norm_u):
if self.data_type == '1D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
elif self.data_type == '2D':
_dim_1x = tf.shape(norm_x)[1]
_dim_1u = tf.shape(norm_u)[1]
x = keras.layers.Dense(self.units)(norm_x)
u = keras.layers.Dense(self.units)(norm_u)
# return tf.concat([x, u], axis=1) # 0.0034 0.0581 0.0464 0.8209 0.8454 0.1254; 0.0027 0.0521 0.0407 0.7753 0.7776 0.1065
# return x @ tf.transpose(u,[0,2,1]) # 0.0003 0.0161 0.0127 0.9833 0.9858 0.0328, 0.0001 0.0118 0.0097 0.9916 0.9917 0.0262
# return u @ tf.transpose(x, [0, 2, 1]) # 0.0004 0.0197 0.0152 0.9732 0.9768 0.0382; 0.0002 0.0126 0.0103 0.9904 0.9905 0.028
return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * self.units])
else:
raise ValueError("Unsupported data_type. Choose from '1D', '2D'")
def call(self, inputs):
input_x, input_u = inputs
if self.use_mask:
input_x = keras.layers.Masking(mask_value=0.0)(input_x)
input_u = keras.layers.Masking(mask_value=0.0)(input_u)
norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)
inputs = self._concatenate_inputs(norm_x, norm_u)
for j in range(self.nblocks):
x = keras.layers.Dense(self.units, activation='relu')(inputs)
inputs = x
out = keras.layers.Dense(self.dim_y, activation='linear')(x)
out = out[:, -1]
return out
def build(self, units=32, time_step=3, u_steps=1, data_type='1D'):
self.units = units
self.data_type = data_type
input_x = keras.Input(shape=(time_step, self.dim_x))
input_u = keras.Input(shape=(u_steps, self.dim_u))
model = keras.models.Model(inputs=[input_x, input_u], outputs=self.call([input_x, input_u]), name='BP')
return model
if __name__ == '__main__':
min_val_x = tf.constant([0., 0, 0]) # 手动指定的最小值
max_val_x = tf.constant([10, 10, 10.]) # 手动指定的最大值
min_val_u = tf.constant([0., 0]) # 手动指定的最小值
max_val_u = tf.constant([10., 10.]) # 手动指定的最大值
min_val = [min_val_x, min_val_u]
max_val = [max_val_x, max_val_u]
model = BPNeuralNetwork(hidden_blocks=3, dim_u=2, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
model = model.build(units=32, time_step=3, u_steps=1, data_type='1D')
model.summary()
print(model.name)
2. 模型训练
if __name__ == '__main__':
from src.dlmpc import Dataset
from src.dlmpc import WindowGeneratorMIMO
from src.dlmpc import DataLoader
from src.dlmpc import MultiBPNet,BPNeuralNetwork1, BPNeuralNetwork
from src.dlmpc import TrainModel
from src.dlmpc import MinMaxNormalization
from src.dlmpc import Calculate_Regression_metrics
from src.dlmpc import plot_line
from src.dlmpc import loadpack
import seaborn as sns
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import numpy as np
train = False
# TODO: 1. load model and generate data, (u,y)
plant = Dataset(plant_name='MISO',noise_amplitude=0.01,sine_wave=True)
data = plant.preprocess(num=1000)
print(np.array(data[['u1','u2']]))
# TODO: 2. generate Window data
data = {
'u': np.array(data[['u1','u2']]),
'y': np.array(data[['y1']])
}
print(data['u'])
window_generator = WindowGeneratorMIMO(data, input_dy=[3], input_du=[3, 3], input_shift_y=[1], input_shift_u=[2, 2],
output_predict_steps=1, u_steps=1)
x_sequences, u_sequences, y_sequences = window_generator.generate_sequences()
print(np.shape(x_sequences))
# TODO 3. generate data for train, valid, test
loader = DataLoader((x_sequences, u_sequences, y_sequences))
split_seed = [0.8, 0.1, 0.1]
(train_data, valid_data, test_data) = loader.load_data(split_seed)
# print(train_data['train_x_sequences'].shape)
'''
train_x_sequences, train_u_sequences, train_y_sequences
valid_x_sequences, valid_u_sequences, valid_y_sequences
test_x_sequences, test_u_sequences, test_y_sequences
'''
# TODO: 4. train model and save model
dim_x = np.shape(x_sequences)[2] # state variable number
dim_u = np.shape(u_sequences)[2] # input variable number
dim_y = np.shape(y_sequences)[2]
# 找到 x_sequences 每个变量的最小值和最大值
x_min = np.amin(x_sequences, axis=(0, 1))
x_max = np.amax(x_sequences, axis=(0, 1))
# 找到 u_sequences 每个变量的最小值和最大值
u_min = np.amin(u_sequences, axis=(0, 1))
u_max = np.amax(u_sequences, axis=(0, 1))
min_val = [tf.constant(x_min,dtype=tf.float32), tf.constant(u_min,dtype=tf.float32)]
max_val = [tf.constant(x_max,dtype=tf.float32), tf.constant(u_max,dtype=tf.float32)]
# 方法1:BPMM
my_model = MultiBPNet(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
# my_model = MultiBPNet(hidden_blocks=1, dim_u=dim_u, dim_x=dim_x, dim_y=dim_y,use_mask=True)
model = my_model.build(units=32,time_step = 3 ,u_steps = 1, data_type='1D')
# 方法2:BPNN1
# input_x = keras.layers.Input(shape=(3, 3))
# input_u = keras.layers.Input(shape=(1, 2))
# model = BPNeuralNetwork1(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
# output = model([input_x, input_u], data_type='1D')
# model = keras.models.Model(inputs=[input_x, input_u], outputs=output, name='BP1')
# # 方法3:BPNN1
# model = BPNeuralNetwork(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
# model = model.build(units=32, time_step=3, u_steps=1, data_type='1D')
'''data_type can select: 1D, 2D, 2DT'''
model_name = model.name
print(model_name)
# train model and load best model
if train:
TrainModel(model,lr = 0.01,epoch=200).train_model(train_data,valid_data,show_loss=True)
model.summary()
# 方法2注释以下两行
model = load_model(f'models_save/{model_name}.h5',custom_objects={'MinMaxNormalization': MinMaxNormalization})
model.save(f'models_save/{model_name}_predictor.h5')
else:
model = load_model(f'models_save/{model_name}_predictor.h5',custom_objects={'MinMaxNormalization': MinMaxNormalization})
model.summary()
# TODO: predict and plot
keras.utils.plot_model(model, to_file=f'model_fig/{model_name}.png', show_shapes=True,
show_layer_names=True)
y_pred_train = model.predict([train_data['train_x_sequences'], train_data['train_u_sequences']])
y_pred_test = model.predict([test_data['test_x_sequences'], test_data['test_u_sequences']])
print(model.evaluate([test_data['test_x_sequences'], test_data['test_u_sequences']], test_data['test_y_sequences']))
train_result = Calculate_Regression_metrics(y_pred_train.flatten(), train_data['train_y_sequences'].reshape(-1, 1),
label=f'{model_name}_train')
test_result = Calculate_Regression_metrics(y_pred_test.flatten(), test_data['test_y_sequences'].reshape(-1, 1),
label=f'{model_name}_test')
figure_property = {'title': model_name, 'X_label': 'Prediction set samples', 'Y_label': 'Prediction Value'}
plot_line(y_pred_test.flatten(), test_data['test_y_sequences'].reshape(-1, 1), figure_property)
print('train\n ', train_result)
print('test:\n', test_result)
# 设置Seaborn样式
sns.set_style("whitegrid")
# 创建一个Matplotlib图
fig, ax = plt.subplots(figsize=(12, 2))
# 移除坐标轴
ax.axis('off')
ax.set_title(f'{model_name}_predictor test result.png', fontsize=16, pad=2) # pad参数可以调整标题与表格之间的距离
# 将DataFrame转换为表格
tab = ax.table(cellText=test_result.values, colLabels=test_result.columns, rowLabels=test_result.index, loc='center')
# 可以为表格添加样式
tab.auto_set_font_size(True)
tab.set_fontsize(12)
tab.scale(1.0, 1.0)
# 保存图片
plt.savefig(f'model_result/{model_name}_predictor_test_result.png', bbox_inches='tight', dpi=500)
# 显示图片
plt.show()
mse | rmse | mae | r2 | mape | |
方法1(BPMM) | 0.0002 | 0.0125 | 0.0098 | 0.9909 | 0.0268 |
方法2(BPNN1) | 0.0002 | 0.014 | 0.0115 | 0.9911 | 0.0309 |
方法3(BPNN) | 0.0001 | 0.0107 | 0.0088 | 0.9933 | 0.0246 |
mse | rmse | mae | r2 | mape | |
方法1(BPMM) | 0.0002 | 0.0139 | 0.011 | 0.9886 | 0.029 |
方法2(BPNN1) | 0.0002 | 0.0144 | 0.0115 | 0.9877 | 0.0302 |
方法3(BPNN) | 0.0001 | 0.0113 | 0.0091 | 0.9925 | 0.0251 |
2D相较于1D输入较好,方法3模型精度更高。