当前位置: 首页 > article >正文

Python 实现深度学习模型预测控制--预测模型构建

链接:深度学习模型预测控制  (如果认为有用,动动小手为我点亮github小星星哦),持续更新中……

链接:WangXiaoMingo/TensorDL-MPC: DL-MPC(deep learning model predictive control) is a software toolkit developed based on the Python and TensorFlow frameworks, designed to enhance the performance of traditional Model Predictive Control (MPC) through deep learning technology. This toolkit provides core functionalities such as model training, simulation, parameter optimization. (github.com)icon-default.png?t=O83Ahttps://github.com/WangXiaoMingo/TensorDL-MPC此文档为如何使用深度学习模型预测控制系列文档。

使用 TensorDL-MPC 进行 MPC 控制,包括初始化系统、训练模型、执行 MPC 控制、模拟系统动力学等。

1. 构建模型

方法1:可以直接加载模型进行网络训练。

方法2:同样允许我们自己构建特定模型。使用方法与使用TensorFlow的Keras API构建方式类似,包括Sequential模型、函数式API(Functional API)、以及子类化(Subclassing)方法。这些方法都可以完成从模型构建到训练的整个过程。下面将分别展示这些方式的具体实现。

1.1  使用Sequential模型

import tensorflow as tf
from tensorflow.keras import layers, models

# 使用Sequential API 构建模型
model = models.Sequential([
    layers.Dense(128, activation='relu', input_shape=(784,)),  # 输入层
    layers.Dense(64, activation='relu'),  # 隐藏层
    layers.Dense(10, activation='softmax')  # 输出层
])

# 编译模型
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

1.2  函数式API(Functional API)

函数式API允许创建更复杂的模型结构,例如多输入、多输出或共享层的模型。

from tensorflow.keras import Input, Model

# 使用Functional API 构建模型
inputs = Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)

model = Model(inputs=inputs, outputs=outputs)

# 编译模型
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

也可以自定义类实现模型封装,如:

# dlmpc/models/BPMM.py

import tensorflow as tf
from tensorflow import keras
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from layers.normlization import MinMaxNormalization
import numpy as np
import random
import os


# 固定随机数种子
random_seed = 42
random.seed(random_seed)  # set random seed for python
np.random.seed(random_seed)  # set random seed for numpy
tf.random.set_seed(random_seed)  # set random seed for tensorflow-cpu
os.environ['TF_DETERMINISTIC_OPS'] = '1' # set random seed for tensorflow-gpu
# warnings.filterwarnings("ignore")



class MultiBPNet():
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                     use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        super().__init__()
        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x, max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u, max_val=self.max_val_u)

    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None
    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def build(self, units=32, time_step = 3 ,u_steps = 1, data_type='1D'):
        # 构建模型,2个输入,分别x和u
        input_x = keras.Input(shape=(time_step,self.dim_x))
        input_u = keras.Input(shape=(u_steps,self.dim_u))

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(units,norm_x, norm_u, data_type)

        for j in range(self.nblocks):
            x = keras.layers.Dense(units, activation='linear')(inputs)
            inputs = x
        out = keras.layers.Dense(self.dim_y, activation='linear')(x)
        out = out[:, -1]
        model = keras.models.Model(inputs=[input_x, input_u], outputs=out, name='BP')
        return model
    def _concatenate_inputs(self, units,norm_x, norm_u, data_type):
        if data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]  # 0.0002  0.0133  0.0105  0.9896  0.9915  0.0272
            return tf.concat([tf.reshape(norm_x,[-1,1,_dim_1x * self.dim_x]), tf.reshape(norm_u,[-1,1,_dim_1u * self.dim_u])], axis=2)
        elif data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = keras.layers.Dense(units, activation='linear')(norm_x)
            u = keras.layers.Dense(units, activation='linear')(norm_u)
            # return tf.concat([x, u], axis=1)          #  0.0034  0.0581  0.0464  0.8209  0.8454  0.1254; 0.0027  0.0521  0.0407  0.7753  0.7776  0.1065
            # return x @ tf.transpose(u,[0,2,1])        # 0.0003  0.0161  0.0127  0.9833  0.9858  0.0328, 0.0001  0.0118  0.0097  0.9916  0.9917  0.0262
            # return u @ tf.transpose(x, [0, 2, 1])    # 0.0004  0.0197  0.0152  0.9732  0.9768  0.0382; 0.0002  0.0126  0.0103  0.9904  0.9905  0.028
            return tf.reshape(tf.concat([x, u], axis=1),[-1,1,(_dim_1x+_dim_1u)*units]) # 0.0001  0.0117  0.0093  0.9919  0.9919  0.0248; 0.0002  0.0145  0.0118  0.9877  0.9904  0.031


        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")


if __name__ == '__main__':
    min_val_x = tf.constant([0.,0,0])  # 手动指定的最小值
    max_val_x = tf.constant([10,10,10.])  # 手动指定的最大值
    min_val_u = tf.constant([0.,0])  # 手动指定的最小值
    max_val_u = tf.constant([10.,10.])  # 手动指定的最大值
    min_val = [min_val_x,min_val_u]
    max_val = [max_val_x, max_val_u]
    model = MultiBPNet(hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val,
                       use_mask=True).build(units=32, data_type='1D')
    model.compile(optimizer='adam', loss='mse')
    model.summary()
    print(model.name)

1.3 使用子类化(Subclassing)方法

子类化方法允许对模型的结构和前向传播过程进行最大程度的控制。

class MyModel(tf.keras.Model):
    def __init__(self):
        super(MyModel, self).__init__()
        self.dense1 = layers.Dense(128, activation='relu')
        self.dense2 = layers.Dense(64, activation='relu')
        self.dense3 = layers.Dense(10, activation='softmax')

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        return self.dense3(x)

model = MyModel()

# 编译模型
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

对于更复杂的功能,可以如下:

import tensorflow as tf
from tensorflow import keras

class MinMaxNormalization(keras.layers.Layer):
    def __init__(self, feature_range=(0, 1), min_val=None, max_val=None, **kwargs):
        super(MinMaxNormalization, self).__init__(**kwargs)
        self.feature_range = feature_range
        self.min_val = min_val
        self.max_val = max_val

    def call(self, inputs):
        if self.min_val is not None and self.max_val is not None:
            scaled = (inputs - self.min_val) / (self.max_val - self.min_val)
            return scaled * (self.feature_range[1] - self.feature_range[0]) + self.feature_range[0]
        return inputs


class BPNeuralNetwork(keras.Model):
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                 use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """
        super(BPNeuralNetwork, self).__init__()

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
                                                         max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
                                                         max_val=self.max_val_u)

        self.dense_layers = [keras.layers.Dense(32, activation='linear') for _ in range(self.nblocks)]
        self.output_layer = keras.layers.Dense(self.dim_y, activation='linear')

        # if data_type == '2D':
        self.dense_x = keras.layers.Dense(32, activation='linear')
        self.dense_u = keras.layers.Dense(32, activation='linear')

    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None

    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def _concatenate_inputs(self, norm_x, norm_u, data_type):
        if data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
                              tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
        elif data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = self.dense_x(norm_x)
            u = self.dense_u(norm_u)
            # return x @ tf.transpose(u,[0,2,1])   
            return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * 32])
        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")

    def call(self, inputs, data_type='1D'):
        input_x, input_u = inputs

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(norm_x, norm_u, data_type)

        for j in range(self.nblocks):
            x = self.dense_layers[j](inputs)
            inputs = x
        out = self.output_layer(x)
        out = out[:, -1]
        return out

if __name__ == '__main__':
    min_val_x = tf.constant([0., 0, 0])  # 手动指定的最小值
    max_val_x = tf.constant([10, 10, 10.])  # 手动指定的最大值
    min_val_u = tf.constant([0., 0])  # 手动指定的最小值
    max_val_u = tf.constant([10., 10.])  # 手动指定的最大值
    min_val = [min_val_x, min_val_u]
    max_val = [max_val_x, max_val_u]

    model = BPNeuralNetwork(hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)

    # model.compile(optimizer='adam', loss='mse')
    # model.build([(None, 3, 3), (None, 1, 1)])
    model.summary()

这三种方法各有优缺点:

  • Sequential API: 最简单,但只适用于线性堆叠的模型。
  • Functional API: 灵活性好,可以处理复杂模型结构,同时保持了简洁性。
  • Model Subclassing: 最灵活,但相对复杂,适合需要自定义层或复杂逻辑的情况。

当然,我们也可以利用三种方式的组合实现更复杂的模型构建

1.4 使用函数式API(Functional API)构建更复杂模型

方法1: Model Subclassing+Functional API

# dlmpc/models/BPNN1.py 

import tensorflow as tf
from tensorflow import keras

class MinMaxNormalization(keras.layers.Layer):
    def __init__(self, feature_range=(0, 1), min_val=None, max_val=None, **kwargs):
        super(MinMaxNormalization, self).__init__(**kwargs)
        self.feature_range = feature_range
        self.min_val = min_val
        self.max_val = max_val

    def call(self, inputs):
        if self.min_val is not None and self.max_val is not None:
            scaled = (inputs - self.min_val) / (self.max_val - self.min_val)
            return scaled * (self.feature_range[1] - self.feature_range[0]) + self.feature_range[0]
        return inputs


class BPNeuralNetwork1(keras.Model):
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                 use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """
        super(BPNeuralNetwork1, self).__init__()

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
                                                         max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
                                                         max_val=self.max_val_u)

        self.dense_layers = [keras.layers.Dense(32, activation='linear') for _ in range(self.nblocks)]
        self.output_layer = keras.layers.Dense(self.dim_y, activation='linear')

        # if data_type == '2D':
        self.dense_x = keras.layers.Dense(32, activation='linear')
        self.dense_u = keras.layers.Dense(32, activation='linear')

    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None

    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def _concatenate_inputs(self, norm_x, norm_u, data_type):
        if data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
                              tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
        elif data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = self.dense_x(norm_x)
            u = self.dense_u(norm_u)
            # return x @ tf.transpose(u,[0,2,1])   
            return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * 32])
        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")

    def call(self, inputs, data_type='1D'):
        input_x, input_u = inputs

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(norm_x, norm_u, data_type)

        for j in range(self.nblocks):
            x = self.dense_layers[j](inputs)
            inputs = x
        out = self.output_layer(x)
        out = out[:, -1]
        return out

if __name__ == '__main__':
    min_val_x = tf.constant([0., 0, 0])  # 手动指定的最小值
    max_val_x = tf.constant([10, 10, 10.])  # 手动指定的最大值
    min_val_u = tf.constant([0., 0])  # 手动指定的最小值
    max_val_u = tf.constant([10., 10.])  # 手动指定的最大值
    min_val = [min_val_x, min_val_u]
    max_val = [max_val_x, max_val_u]    

    input_x = keras.layers.Input(shape=(3, 3))
    input_u = keras.layers.Input(shape=(1, 2))
    model = BPNeuralNetwork1(hidden_blocks=3, dim_u=2, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
    output = model([input_x, input_u], data_type='1D')
    model = keras.models.Model(inputs=[input_x, input_u], outputs=output, name='BP')

方法2: Model Subclassing+Functional API

# dlmpc/models/BPNN.py

import tensorflow as tf
from tensorflow import keras


class BPNeuralNetwork(keras.Model):
    def __init__(self, hidden_blocks=3, dim_u=1, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=None, max_val=None,
                 use_mask=False):
        """
        初始化函数

        参数:
        hidden_blocks (int): 隐藏层的数量,默认为3。必须为整数。
        dim_u (int): 输入的维度,默认为1。必须为整数。
        dim_x (int): 输出的维度,默认为3。必须为整数。
        dim_y (int): 输出的维度,默认为1。必须为整数。
        feature_range (tuple): 特征的范围,默认为(0, 1)。必须为长度为2的元组。
        min_val (list or tuple): 输入和输出的最小值,默认为None。如果提供,必须为长度为2的列表或元组。
        max_val (list or tuple): 输入和输出的最大值,默认为None。如果提供,必须为长度为2的列表或元组。
        use_mask (bool): 是否启用掩码,默认为False。
        """
        super(BPNeuralNetwork, self).__init__()

        if not isinstance(hidden_blocks, int):
            raise ValueError("hidden_blocks must be an integer")
        if not isinstance(dim_u, int):
            raise ValueError("dim_u must be an integer")
        if not isinstance(dim_x, int):
            raise ValueError("dim_x must be an integer")

        self.nblocks = hidden_blocks
        self.dim_u = dim_u
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.feature_range = feature_range
        self.use_mask = use_mask

        self.min_val_x, self.min_val_u = self._validate_values(min_val)
        self.max_val_x, self.max_val_u = self._validate_values(max_val)

        self.normalization_layer_x = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_x,
                                                         max_val=self.max_val_x)
        self.normalization_layer_u = MinMaxNormalization(feature_range=self.feature_range, min_val=self.min_val_u,
                                                         max_val=self.max_val_u)
    def _validate_values(self, values):
        if values is not None:
            if not (isinstance(values, (list, tuple)) and len(values) == 2):
                raise ValueError("values must be a list or tuple of length 2")
            return values[0], values[1]
        return None, None

    def _apply_normalization(self, input_tensor, normalization_layer, min_val, max_val):
        if min_val is not None and max_val is not None:
            return normalization_layer(input_tensor)
        return input_tensor

    def _concatenate_inputs(self, norm_x, norm_u):
        if self.data_type == '1D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            return tf.concat([tf.reshape(norm_x, [-1, 1, _dim_1x * self.dim_x]),
                              tf.reshape(norm_u, [-1, 1, _dim_1u * self.dim_u])], axis=2)
        elif self.data_type == '2D':
            _dim_1x = tf.shape(norm_x)[1]
            _dim_1u = tf.shape(norm_u)[1]
            x = keras.layers.Dense(self.units)(norm_x)
            u = keras.layers.Dense(self.units)(norm_u)
            # return tf.concat([x, u], axis=1)          #  0.0034  0.0581  0.0464  0.8209  0.8454  0.1254; 0.0027  0.0521  0.0407  0.7753  0.7776  0.1065
            # return x @ tf.transpose(u,[0,2,1])        # 0.0003  0.0161  0.0127  0.9833  0.9858  0.0328, 0.0001  0.0118  0.0097  0.9916  0.9917  0.0262
            # return u @ tf.transpose(x, [0, 2, 1])    # 0.0004  0.0197  0.0152  0.9732  0.9768  0.0382; 0.0002  0.0126  0.0103  0.9904  0.9905  0.028
            return tf.reshape(tf.concat([x, u], axis=1), [-1, 1, (_dim_1x + _dim_1u) * self.units])
        else:
            raise ValueError("Unsupported data_type. Choose from '1D', '2D'")

    def call(self, inputs):
        input_x, input_u = inputs

        if self.use_mask:
            input_x = keras.layers.Masking(mask_value=0.0)(input_x)
            input_u = keras.layers.Masking(mask_value=0.0)(input_u)

        norm_x = self._apply_normalization(input_x, self.normalization_layer_x, self.min_val_x, self.max_val_x)
        norm_u = self._apply_normalization(input_u, self.normalization_layer_u, self.min_val_u, self.max_val_u)

        inputs = self._concatenate_inputs(norm_x, norm_u)

        for j in range(self.nblocks):
            x = keras.layers.Dense(self.units, activation='relu')(inputs)
            inputs = x
        out = keras.layers.Dense(self.dim_y, activation='linear')(x)
        out = out[:, -1]
        return out

    def build(self, units=32, time_step=3, u_steps=1, data_type='1D'):
        self.units = units
        self.data_type = data_type
        input_x = keras.Input(shape=(time_step, self.dim_x))
        input_u = keras.Input(shape=(u_steps, self.dim_u))
        model = keras.models.Model(inputs=[input_x, input_u], outputs=self.call([input_x, input_u]), name='BP')
        return model

if __name__ == '__main__':
    min_val_x = tf.constant([0., 0, 0])  # 手动指定的最小值
    max_val_x = tf.constant([10, 10, 10.])  # 手动指定的最大值
    min_val_u = tf.constant([0., 0])  # 手动指定的最小值
    max_val_u = tf.constant([10., 10.])  # 手动指定的最大值
    min_val = [min_val_x, min_val_u]
    max_val = [max_val_x, max_val_u]

    model = BPNeuralNetwork(hidden_blocks=3, dim_u=2, dim_x=3, dim_y=1, feature_range=(0, 1), min_val=min_val, max_val=max_val, use_mask=True)
    model = model.build(units=32, time_step=3, u_steps=1, data_type='1D')
    model.summary()
    print(model.name)

2. 模型训练


if __name__ == '__main__':
    from src.dlmpc import Dataset
    from src.dlmpc import WindowGeneratorMIMO
    from src.dlmpc import DataLoader
    from src.dlmpc import MultiBPNet,BPNeuralNetwork1, BPNeuralNetwork
    from src.dlmpc import TrainModel
    from src.dlmpc import MinMaxNormalization
    from src.dlmpc import Calculate_Regression_metrics
    from src.dlmpc import plot_line
    from src.dlmpc import loadpack
    import seaborn as sns
    from tensorflow import keras
    import tensorflow as tf
    from tensorflow.keras.models import load_model
    import matplotlib.pyplot as plt
    import numpy as np

    train = False

    # TODO: 1. load model and generate data, (u,y)
    plant = Dataset(plant_name='MISO',noise_amplitude=0.01,sine_wave=True)
    data = plant.preprocess(num=1000)
    print(np.array(data[['u1','u2']]))

    # TODO: 2. generate Window data
    data = {
        'u': np.array(data[['u1','u2']]),
        'y': np.array(data[['y1']])
    }
    print(data['u'])

    window_generator = WindowGeneratorMIMO(data, input_dy=[3], input_du=[3, 3], input_shift_y=[1], input_shift_u=[2, 2],
                                 output_predict_steps=1, u_steps=1)
    x_sequences, u_sequences, y_sequences = window_generator.generate_sequences()
    print(np.shape(x_sequences))

    # TODO 3. generate data for train, valid, test
    loader = DataLoader((x_sequences, u_sequences, y_sequences))
    split_seed = [0.8, 0.1, 0.1]
    (train_data, valid_data, test_data) =  loader.load_data(split_seed)
    # print(train_data['train_x_sequences'].shape)

    '''
    train_x_sequences, train_u_sequences, train_y_sequences
    valid_x_sequences, valid_u_sequences, valid_y_sequences
    test_x_sequences, test_u_sequences, test_y_sequences
    '''

    # TODO: 4. train model and save model
    dim_x = np.shape(x_sequences)[2]      # state variable number
    dim_u = np.shape(u_sequences)[2]      # input variable number
    dim_y = np.shape(y_sequences)[2]

    # 找到 x_sequences 每个变量的最小值和最大值
    x_min = np.amin(x_sequences, axis=(0, 1))
    x_max = np.amax(x_sequences, axis=(0, 1))

    # 找到 u_sequences 每个变量的最小值和最大值
    u_min = np.amin(u_sequences, axis=(0, 1))
    u_max = np.amax(u_sequences, axis=(0, 1))

    min_val = [tf.constant(x_min,dtype=tf.float32), tf.constant(u_min,dtype=tf.float32)]
    max_val = [tf.constant(x_max,dtype=tf.float32), tf.constant(u_max,dtype=tf.float32)]

    # 方法1:BPMM
    my_model = MultiBPNet(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
    # my_model = MultiBPNet(hidden_blocks=1, dim_u=dim_u, dim_x=dim_x, dim_y=dim_y,use_mask=True)
    model = my_model.build(units=32,time_step = 3 ,u_steps = 1, data_type='1D')

    # 方法2:BPNN1
    # input_x = keras.layers.Input(shape=(3, 3))
    # input_u = keras.layers.Input(shape=(1, 2))
    # model = BPNeuralNetwork1(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
    # output = model([input_x, input_u], data_type='1D')
    # model = keras.models.Model(inputs=[input_x, input_u], outputs=output, name='BP1')

    # # 方法3:BPNN1
    # model = BPNeuralNetwork(hidden_blocks=1, dim_u = dim_u, dim_x = dim_x, dim_y = dim_y,feature_range=(0, 1), min_val= min_val, max_val=max_val,use_mask=True)
    # model = model.build(units=32, time_step=3, u_steps=1, data_type='1D')


    '''data_type can select: 1D, 2D, 2DT'''

    model_name = model.name
    print(model_name)

    # train model and load best model
    if train:
        TrainModel(model,lr = 0.01,epoch=200).train_model(train_data,valid_data,show_loss=True)
        model.summary()
        # 方法2注释以下两行
        model = load_model(f'models_save/{model_name}.h5',custom_objects={'MinMaxNormalization': MinMaxNormalization})
        model.save(f'models_save/{model_name}_predictor.h5')

    else:
        model = load_model(f'models_save/{model_name}_predictor.h5',custom_objects={'MinMaxNormalization': MinMaxNormalization})

    model.summary()
    # TODO: predict and plot
    keras.utils.plot_model(model, to_file=f'model_fig/{model_name}.png', show_shapes=True,
                           show_layer_names=True)
    y_pred_train = model.predict([train_data['train_x_sequences'], train_data['train_u_sequences']])
    y_pred_test = model.predict([test_data['test_x_sequences'], test_data['test_u_sequences']])
    print(model.evaluate([test_data['test_x_sequences'], test_data['test_u_sequences']], test_data['test_y_sequences']))

    train_result = Calculate_Regression_metrics(y_pred_train.flatten(), train_data['train_y_sequences'].reshape(-1, 1),
                                                 label=f'{model_name}_train')
    test_result = Calculate_Regression_metrics(y_pred_test.flatten(), test_data['test_y_sequences'].reshape(-1, 1),
                                                label=f'{model_name}_test')
    figure_property = {'title': model_name, 'X_label': 'Prediction set samples', 'Y_label': 'Prediction Value'}
    plot_line(y_pred_test.flatten(), test_data['test_y_sequences'].reshape(-1, 1), figure_property)

    print('train\n ', train_result)

    print('test:\n', test_result)
    # 设置Seaborn样式
    sns.set_style("whitegrid")
    # 创建一个Matplotlib图
    fig, ax = plt.subplots(figsize=(12, 2))
    # 移除坐标轴
    ax.axis('off')
    ax.set_title(f'{model_name}_predictor test result.png', fontsize=16, pad=2)  # pad参数可以调整标题与表格之间的距离
    # 将DataFrame转换为表格
    tab = ax.table(cellText=test_result.values, colLabels=test_result.columns, rowLabels=test_result.index, loc='center')
    # 可以为表格添加样式
    tab.auto_set_font_size(True)
    tab.set_fontsize(12)
    tab.scale(1.0, 1.0)
    # 保存图片
    plt.savefig(f'model_result/{model_name}_predictor_test_result.png', bbox_inches='tight', dpi=500)
    # 显示图片
    plt.show()

不同模型构建方式结果对比--2D--units=32
msermsemaer2mape
方法1(BPMM)0.0002  0.0125 0.0098  0.9909 0.0268
方法2(BPNN1) 0.0002 0.014  0.01150.99110.0309
方法3(BPNN) 0.0001  0.01070.0088 0.9933 0.0246
不同模型构建方式结果对比--1D--units=32
msermsemaer2mape
方法1(BPMM)0.0002 0.01390.011 0.98860.029
方法2(BPNN1)0.0002  0.0144 0.01150.9877 0.0302
方法3(BPNN)0.0001    0.01130.0091 0.99250.0251

2D相较于1D输入较好,方法3模型精度更高。


http://www.kler.cn/a/374696.html

相关文章:

  • vue3 如何使用 mounted
  • 详解磁盘IO、网络IO、零拷贝IO、BIO、NIO、AIO、IO多路复用(select、poll、epoll)
  • 企业数字化转型加速,现代 IT 如何用 Datadog 全面提升可观测性?
  • QT用Enigmavb 打包成单独exe
  • Scala项目(图书管理系统)
  • 一网多平面
  • ISO 26262与ISO 21434:汽车安全领域的双重保障与交汇探索
  • 开启TikTok直播的全攻略:从网络条件到设备准备
  • API接口开放与安全管控 - 原理与实践
  • 城市交通场景分割系统:Web前端可视化
  • 汽车车辆控制单元SRAM存储解决方案
  • elasticsearch 8.x 插件安装(三)之拼音插件
  • 两个有序链表序列的交集
  • mosh-react-course
  • 计算机毕业设计django+大模型租房推荐系统 租房可视化 租房大屏可视化 租房爬虫 spark 58同城租房爬虫 房源推荐系统
  • 在本地电脑部署属于你的AI大模型
  • 手敲Webpack 5:React + TypeScript项目脚手架搭建实践
  • Java面试题十四
  • C++中的依赖注入
  • 记录一次企业外部通过ssh 连接数据库的事DBeaver
  • Apache Solr 身份认证绕过导致任意文件读取漏洞复现(CVE-2024-45216)
  • Apache paimon表管理
  • 稀土抗菌剂:食品包装中的安全保障
  • Ubuntu 22.04 的Python3.11.8 安装
  • 本科阶段最后一次竞赛Vlog——2024年智能车大赛智慧医疗组准备全过程——8ResNet模型的使用
  • CCNA对学历有要求吗?看看你是否有资格报考