当前位置: 首页 > article >正文

基于“动手学强化学习”的知识点(六):第 19 章 目标导向的强化学习(gym版本 >= 0.26)

第 19 章 目标导向的强化学习(gym版本 >= 0.26)

  • 摘要

摘要

本系列知识点讲解基于动手学强化学习中的内容进行详细的疑难点分析!具体内容请阅读动手学强化学习!


对应动手学强化学习——目标导向的强化学习


import torch
import torch.nn.functional as F
import numpy as np
import random
from tqdm import tqdm
import collections
import matplotlib.pyplot as plt


class WorldEnv:
    """
    WorldEnv 类实现了一个简单的二维移动环境,主要特点包括:
    - 状态包含当前坐标和目标坐标(共四维向量)。
    - 环境在 reset() 时随机生成目标位置,初始状态固定在原点 [0,0]。
    - 在 step() 中,智能体根据动作改变当前位置,并计算与目标的距离,
      给出奖励(距离大于阈值则惩罚 -1,否则 0),当达到目标(距离小于等于阈值)或步数达到50时结束回合。
    """
    def __init__(self):
        self.distance_threshold = 0.15
        '''
        作用:
        - 设置动作的界限,即智能体每个动作分量的取值范围为 [−1,1]。
        数值示例:
        - 若动作计算结果超出 1 或 -1,将被截断到 1 或 -1。
        '''
        self.action_bound = 1

    def reset(self):  # 重置环境
        """定义环境重置函数,初始化当前状态和目标,并返回环境的初始观测。"""
        # 生成一个目标状态, 坐标范围是[3.5~4.5, 3.5~4.5]
        '''
        生成目标状态。
        - random.uniform(-0.5, 0.5) 生成一个在 -0.5 到 0.5 之间的随机数。
        - 分别加到 4 上,使得目标坐标在 [3.5,4.5] 范围内。
        '''
        self.goal = np.array([4 + random.uniform(-0.5, 0.5), 4 + random.uniform(-0.5, 0.5)])
        '''将智能体初始状态固定为 [0,0]。'''
        self.state = np.array([0, 0])  # 初始状态
        '''初始化步数计数器,用于跟踪当前回合步数。'''
        self.count = 0
        '''
        作用:
        - 返回初始观测,使用 np.hstack 将当前状态和目标状态水平拼接。
        数值示例:
        - 如果 state=[0,0] 且 goal=[3.7,4.4],则返回 [0, 0, 3.7, 4.4]。
        '''
        return np.hstack((self.state, self.goal))

    def step(self, action):
        """定义环境一步交互函数,输入动作,输出新的观测、奖励和 done 标志。"""
        '''
        将输入动作限制在 [−action_bound,action_bound] 内。
        数值示例:
        - 若输入 action=[1.5, -2.0],而 action_bound=1,则 np.clip 结果为 [1, -1].
        '''
        action = np.clip(action, -self.action_bound, self.action_bound)
        '''
        作用:
        - 更新 x 坐标:
          - 将当前 x 坐标加上动作中的第一个分量,确保结果在 [0,5] 内。
        数值示例:
        - 假设当前 state[0]=0,action[0]=0.8,则 x = max(0, min(5, 0+0.8)) = 0.8;
        - 如果 state[0]=4.9,action[0]=0.5,则 4.9+0.5=5.4,min(5,5.4)=5,x=max(0,5)=5。
        '''
        x = max(0, min(5, self.state[0] + action[0]))
        y = max(0, min(5, self.state[1] + action[1]))
        self.state = np.array([x, y])
        self.count += 1
        '''
        计算当前状态与目标状态之间的欧氏距离。
        '''
        dis = np.sqrt(np.sum(np.square(self.state - self.goal)))
        # if dis > self.distance_threshold:
        #     reward = -1.0
        # else:
        #     reward = 0.0
        reward = -1.0 if dis > self.distance_threshold else 0
        if dis <= self.distance_threshold or self.count == 50:
            done = True
        else:
            done = False

        return np.hstack((self.state, self.goal)), reward, done
    
    
class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim, action_bound):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
        self.action_bound = action_bound  # action_bound是环境可以接受的动作最大值

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        '''用 tanh 激活函数限制输出在 (-1,1) 范围内。'''
        x = torch.tanh(x)
        x = x * self.action_bound
        return x
        # x = F.relu(self.fc2(F.relu(self.fc1(x))))
        # return torch.tanh(self.fc3(x)) * self.action_bound


class QValueNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(QValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x, a):
        cat = torch.cat([x, a], dim=1)  # 拼接状态和动作
        x = self.fc1(cat)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)  
        return x
        # x = F.relu(self.fc2(F.relu(self.fc1(cat))))
        # return self.fc3(x)   
    
    
class DDPG:
    """
    意义与作用
    DDPG(Deep Deterministic Policy Gradient)是一种 off-policy 强化学习算法,适用于连续动作控制。
    它主要包括两个网络:
    - Actor(策略网络):给定状态产生确定性动作(在训练时加上探索噪声)。
    - Critic(Q 网络):估计状态-动作对的价值。
    同时,为了训练稳定性,引入了目标网络(target_actor 和 target_critic),并采用软更新机制。
    另外,DDPG 在执行时会在 actor 输出的动作上加入高斯噪声以促进探索。
    输入
    - 状态维度、动作维度、动作上界、各个学习率、折扣因子 gamma、软更新系数 tau、高斯噪声标准差 sigma、设备 device 等。
    输出
    - 更新后的 actor 与 critic 网络,使得策略在连续动作任务中表现更优。
    """
    ''' DDPG算法 '''
    def __init__(self, state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device):
        self.action_dim = action_dim
        '''作用:实例化一个策略网络 PolicyNet,用于产生确定性动作,并将其移到指定设备。'''
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
        '''作用:实例化一个 Q 网络,用于评估 (state, action) 对的价值。'''
        self.critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)
        '''作用:构造目标策略网络,用于计算下一状态的动作,保证训练稳定性。'''
        self.target_actor = PolicyNet(state_dim, hidden_dim, action_dim, action_bound).to(device)
        '''作用:构造目标 Q 网络,与 critic 网络结构相同。'''
        self.target_critic = QValueNet(state_dim, hidden_dim, action_dim).to(device)
        # 初始化目标价值网络并使其参数和价值网络一样
        self.target_critic.load_state_dict(self.critic.state_dict())
        # 初始化目标策略网络并使其参数和策略网络一样
        self.target_actor.load_state_dict(self.actor.state_dict())
        
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        
        self.gamma = gamma
        self.sigma = sigma  # 高斯噪声的标准差,均值直接设为0
        self.tau = tau  # 目标网络软更新参数
        self.action_bound = action_bound
        self.device = device

    def take_action(self, state):
        '''定义在给定状态下如何选择动作,主要用于与环境交互。'''
        if isinstance(state, tuple):
            state = state[0]
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        action = self.actor(state).detach().cpu().numpy()[0]
        # 给动作添加噪声,增加探索
        '''
        给生成的动作加上高斯噪声,
        噪声标准差为 sigma,np.random.randn(self.action_dim) 生成与动作维度相同的正态分布样本。
        '''
        action = action + self.sigma * np.random.randn(self.action_dim)
        return action

    def soft_update(self, net, target_net):
        for param_target, param in zip(target_net.parameters(), net.parameters()):
            param_target.data.copy_(param_target.data * (1.0 - self.tau) + param.data * self.tau)

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions'], dtype=torch.float).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
        '''
        作用:
        - 对 next_states 使用 target_actor 生成下一动作。
        - 将 next_states 与生成的动作输入 target_critic 计算 Q 值。
        '''
        next_q_values = self.target_critic(next_states, self.target_actor(next_states))
        '''
        计算 TD 目标 
                                        y=r+γQ(s′,a′)(1−d)
        '''
        q_targets = rewards + self.gamma * next_q_values * (1 - dones)
        # MSE损失函数
        '''计算 critic 网络输出与 TD 目标之间的 MSE 损失,并对批次取均值。'''
        critic_loss = torch.mean(F.mse_loss(self.critic(states, actions), q_targets))
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # 策略网络就是为了使Q值最大化
        '''
        计算 actor 损失。目标是使 critic 对 (state, actor(state)) 评估的 Q 值最大化,
        因此 actor_loss 取负号。
        '''
        actor_loss = -torch.mean(self.critic(states, self.actor(states)))
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        self.soft_update(self.actor, self.target_actor)  # 软更新策略网络
        self.soft_update(self.critic, self.target_critic)  # 软更新价值网络   
    
    
class Trajectory:
    """用于记录一条完整轨迹,保存轨迹中的状态、动作、奖励、done 标志,以及轨迹长度。"""
    ''' 用来记录一条完整轨迹 '''
    def __init__(self, init_state):
        self.states = [init_state]
        self.actions = []
        self.rewards = []
        self.dones = []
        self.length = 0

    def store_step(self, action, state, reward, done):
        self.actions.append(action)
        self.states.append(state)
        self.rewards.append(reward)
        self.dones.append(done)
        self.length += 1


class ReplayBuffer_Trajectory:
    """
    作用:
    - 用于存储多条轨迹,每条轨迹由 Trajectory 类记录。
      提供添加轨迹、获取缓冲区大小以及采样批量轨迹并支持 HER(事后经验回放)功能。
    输入:
    - 在初始化时,设定缓冲区最大容量 capacity。
    - 通过 add_trajectory 添加 Trajectory 实例。
    输出:
    - sample() 方法返回一个批次的轨迹数据,
      经过 HER 处理后转换为标准 (state, action, reward, next_state, done) 格式。
    """
    ''' 存储轨迹的经验回放池 '''
    def __init__(self, capacity):
        '''
        使用 collections.deque 创建一个双端队列,并设置最大长度为 capacity。
        当超过容量时,新添加的轨迹会覆盖最旧的轨迹。
        '''
        self.buffer = collections.deque(maxlen=capacity)

    def add_trajectory(self, trajectory):
        '''定义方法 add_trajectory,用于将一个 Trajectory 实例添加到缓冲区中。
        trajectory:一个 Trajectory 类的实例。'''
        self.buffer.append(trajectory)

    def size(self):
        return len(self.buffer)

    def sample(self, batch_size, use_her, dis_threshold=0.15, her_ratio=0.8):
        """定义 sample 方法,从存储的轨迹中采样一个批次的数据,并可选择使用 HER 对目标进行重设。"""
        '''
        作用:
        - 初始化一个字典 batch,用于存储采样后的数据,每个键对应一个列表,后续将存储轨迹中选取的转换数据。
        数值例子:
        - 初始时 batch 为 { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [] }。
        '''
        batch = dict(states=[], actions=[], next_states=[], rewards=[], dones=[])
        for _ in range(batch_size):
            '''
            作用:
            - 从缓冲区中随机采样一条轨迹(返回的是列表,所以取第一个元素)。
            数值例子:
            - 如果缓冲区中有 5000 条轨迹,则随机选择其中一条,如 traj1。
            '''
            traj = random.sample(self.buffer, 1)[0]
            '''作用:在该轨迹中随机选择一个步数索引,范围在 [0, traj.length)。'''
            step_state = np.random.randint(traj.length)
            '''取出轨迹中第 step_state 步的状态。'''
            state = traj.states[step_state]
            '''取出轨迹中第 step_state+1 步的状态,即当前步之后的状态。'''
            next_state = traj.states[step_state + 1]
            '''取出轨迹中第 step_state 步的动作。'''
            action = traj.actions[step_state]
            '''取出轨迹中第 step_state 步的奖励。'''
            reward = traj.rewards[step_state]
            '''取出轨迹中第 step_state 步的 done 标志。'''
            done = traj.dones[step_state]
            '''
            判断是否采用 HER(事后经验回放)。
            - 如果 use_her 为 True 且随机数在 [0,1] 小于等于 her_ratio,则使用 HER。
            '''
            if use_her and np.random.uniform() <= her_ratio:
                '''在当前步之后随机选取一个目标步索引,用于 HER 策略的“未来”目标。'''
                step_goal = np.random.randint(step_state + 1, traj.length + 1)
                '''从选定的目标步中取出状态的前两个元素作为新的目标(通常前两个元素代表位置)。'''
                goal = traj.states[step_goal][:2]  # 使用HER算法的future方案设置目标
                '''计算当前 next_state 前两个元素(位置)与新目标 goal 之间的欧氏距离。'''
                dis = np.sqrt(np.sum(np.square(next_state[:2] - goal)))
                '''根据新的距离重新设定奖励:如果距离大于 dis_threshold,则奖励为 -1,否则为 0。'''
                reward = -1.0 if dis > dis_threshold else 0
                '''
                根据新的距离设定 done 标志:
                - 如果距离大于阈值,则 done=False(还未达到目标);
                - 否则 done=True(目标达成)。
                '''
                done = False if dis > dis_threshold else True
                '''
                作用:
                - 更新状态:将原状态前两个元素(位置)与新目标拼接,形成新的状态信息。
                  - 这通常用于 HER,目标被重新设定后状态信息中包含新目标。
                数值例子:
                - 如果原 state[:2] = [0.15, 0.25],goal=[0.3, 0.4],则新 state = [0.15, 0.25, 0.3, 0.4].
                '''
                state = np.hstack((state[:2], goal))
                '''
                作用:
                - 同样更新下一状态,将其前两个元素与新目标拼接。
                数值例子:
                - 如果 next_state[:2] = [0.2, 0.3],goal=[0.3, 0.4],则 new next_state = [0.2, 0.3, 0.3, 0.4].
                '''
                next_state = np.hstack((next_state[:2], goal))

            batch['states'].append(state)
            batch['next_states'].append(next_state)
            batch['actions'].append(action)
            batch['rewards'].append(reward)
            batch['dones'].append(done)

        batch['states'] = np.array(batch['states'])
        batch['next_states'] = np.array(batch['next_states'])
        batch['actions'] = np.array(batch['actions'])
        return batch    
    
    
    
actor_lr = 1e-3
critic_lr = 1e-3
hidden_dim = 128
state_dim = 4
action_dim = 2
action_bound = 1
sigma = 0.1
tau = 0.005
gamma = 0.98
num_episodes = 2000
n_train = 20
batch_size = 256
minimal_episodes = 200
buffer_size = 10000
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
env = WorldEnv()
replay_buffer = ReplayBuffer_Trajectory(buffer_size)
agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device)

return_list = []
for i in range(10):
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0
            '''
            重置环境,获得初始观测。
            - reset() 返回的观测通常为状态和目标拼接的向量,例如 [0, 0, goal_x, goal_y]。
            '''
            state = env.reset()
            '''
            创建一条轨迹记录,传入初始状态。
            - Trajectory 对象会记录整个回合中的状态、动作、奖励和 done 标志。
            '''
            traj = Trajectory(state)
            done = False
            while not done:
                action = agent.take_action(state)
                state, reward, done = env.step(action)
                episode_return += reward
                traj.store_step(action, state, reward, done)
            replay_buffer.add_trajectory(traj)
            return_list.append(episode_return)
            '''判断回放池中是否有足够轨迹(至少 minimal_episodes 条)后再进行策略更新。'''
            if replay_buffer.size() >= minimal_episodes:
                for _ in range(n_train):
                    transition_dict = replay_buffer.sample(batch_size, True)
                    agent.update(transition_dict)
            '''每 10 个回合更新一次进度条后缀,显示当前回合编号和最近 10 个回合的平均回报。'''
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DDPG with HER on {}'.format('GridWorld'))
plt.show()    
    
    
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
env = WorldEnv()
replay_buffer = ReplayBuffer_Trajectory(buffer_size)
agent = DDPG(state_dim, hidden_dim, action_dim, action_bound, actor_lr, critic_lr, sigma, tau, gamma, device)

return_list = []
for i in range(10):
    with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:
        for i_episode in range(int(num_episodes / 10)):
            episode_return = 0
            state = env.reset()
            traj = Trajectory(state)
            done = False
            while not done:
                action = agent.take_action(state)
                state, reward, done = env.step(action)
                episode_return += reward
                traj.store_step(action, state, reward, done)
            replay_buffer.add_trajectory(traj)
            return_list.append(episode_return)
            if replay_buffer.size() >= minimal_episodes:
                for _ in range(n_train):
                    # 和使用HER训练的唯一区别
                    transition_dict = replay_buffer.sample(batch_size, False)
                    agent.update(transition_dict)
            if (i_episode + 1) % 10 == 0:
                pbar.set_postfix({
                    'episode':
                    '%d' % (num_episodes / 10 * i + i_episode + 1),
                    'return':
                    '%.3f' % np.mean(return_list[-10:])
                })
            pbar.update(1)

episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('DDPG without HER on {}'.format('GridWorld'))
plt.show()   

http://www.kler.cn/a/586717.html

相关文章:

  • C++之OOP
  • 蓝桥杯嵌入式赛道复习笔记1(led点亮)
  • 【Python机器学习】2.4. K均值聚类(KMeans Analysis)实战(进阶)
  • SOME/IP:用Python实现协议订阅、Offer、订阅ACK与报文接收
  • 【Unity网络同步框架 - Nakama研究(二)】
  • Spark 中创建 DataFrame 的2种方式对比
  • 【最后203篇系列】015 几种消息队列的思考
  • docker后台运行,便于后期用命令行进入它的终端
  • 【vscode-03】AUTOSAR CP 插件配置
  • DataWhale 速通AI编程开发:(进阶篇)第3章 提示词(Prompts)配置项
  • AI与人的智能,改变一生的思维模型【7】易得性偏差
  • 空调acwing二进制差分
  • C++移动语义与右值引用:从理论到实践的深度解析引言
  • python:数据类构建器
  • 《DeepSeek深度使用教程:开启智能交互新体验》Deepseek深度使用教程
  • 【大模型基础_毛玉仁】2.4 基于 Encoder-Decoder 架构的大语言模型
  • AtCoder Beginner Contest 003(A - 社の給料、B -トランプ、C -プログラミング講座、D - 社の冬 )题目讲解
  • 【PHP】新版本特性记录(持续更新)
  • java 的标记接口RandomAccess使用方法
  • vulnhub靶场之stapler靶机