当前位置: 首页 > article >正文

DQN详解

1️⃣ DQN介绍

前面介绍的表格形式的Q-learning只适用于状态空间 S \mathcal{S} S和动作空间 A \mathcal{A} A都是较小有限离散集合情况。但当状态空间 S \mathcal{S} S和动作空间 A \mathcal{A} A变大时,表格形式不再适用。DQN是神经网络形式的Q学习,使用神经网络近似最优动作价值函数,由DeepMind 团队在2015年提出,可以说是打开了深度强化学习的大门。


2️⃣ 原理分析

具体过程:
1.初始化

  • Q网络:初始化一个神经网络 Q ( s , a ; θ ) Q(s,a;\theta) Q(s,a;θ)来近似最优动作价值函数 Q ⋆ Q^\star Q,参数为 θ \theta θ
  • 目标网络:初始化一个与Q网络结构相同的目标网络 Q t a r g e t ( s , a ; θ ′ ) Q_\mathrm{target}(s,a;\theta') Qtarget(s,a;θ),参数为 θ ′ \theta' θ (参数初始值与Q网络相同),每隔固定步数将Q网络的参数复制给日标网络。
  • 经验回放池:创建一个经验回放池 D D D,用于存储经验数据。

2.训练过程
2.1 采集数据

  • 环境交互:在当前状态 s s s下,根据 ϵ \epsilon ϵ-贪心策略选择动作 a a a,即:
    • 以概率 ϵ \epsilon ϵ随机选择一个动作,进行探索。
    • 以概率 1 − ϵ 1-\epsilon 1ϵ选择当前Q网络估计的最优动作 a = arg ⁡ max ⁡ a ∈ A Q ( s , a ; θ ) a=\arg\max_{a \in A}Q(s,a;\theta) a=argmaxaAQ(s,a;θ),进行利用。
  • 执行动作:执行选择的动作 a a a,得到奖励 r r r和下一个状态 s ′ s^{\prime} s
  • 存储经验:将新获得的经验 ( s , a , r , s ′ ) (s,a,r,s^{\prime}) (s,a,r,s)存入经验回放池 D D D
    在这里插入图片描述

2.2 训练过程

  • 训练数据:从经验回放池 D D D中随机采样batch个数据 ( s , a , r , s ′ ) (s,a,r,s^{\prime}) (s,a,r,s)
    在这里插入图片描述

  • 利用Q网络计算预测值 Q ( s , a ; θ ) Q(s, a; \theta) Q(s,a;θ)

  • 计算TD目标:对于每个数据,基于目标网络计算TD目标

    y ^ t = { r 如果  s ′  是终止状态 r + γ ⋅ max ⁡ a ∈ A Q target ( s ′ , a ; θ ′ ) 如果  s ′  非终止状态 \widehat{y}_t = \begin{cases} r & \text{如果 } s' \text{ 是终止状态} \\ r + \gamma \cdot \max_{a \in A} Q_{\text{target}}(s', a; \theta') & \text{如果 } s' \text{ 非终止状态} \end{cases} y t={rr+γmaxaAQtarget(s,a;θ)如果 s 是终止状态如果 s 非终止状态
    其中 r r r代表在状态 s s s时的奖励, γ \gamma γ代表折扣率, max ⁡ a ∈ A Q target ( s t + 1 , a ; w ) \max_{a \in A} Q_{\text{target}}(s_{t+1}, a; \mathbf{w}) maxaAQtarget(st+1,a;w)代表目标网络中下一个状态 s ′ s' s下所有动作的最大Q值; θ ′ \theta' θ表示目标网络的参数

    注:max中的是目标网络,而不是Q网络,为什么?
    引入目标网络可以降低训练的不稳定性。如果直接使用Q网络计算TD目标,会导致Q网络更新过于频繁,导致网络波动。

  • 计算损失函数
    Loss = 1 2 [ Q ( s , a ; θ ) − y ^ t ] 2 \text{Loss} = \frac{1}{2} [Q(s, a; \theta) - \widehat{y}_t]^2 Loss=21[Q(s,a;θ)y t]2
    使用梯度下降算法最小化损失函数,更新Q网络的参数 θ \theta θ

2.3 同步目标网络
每C步将Q网络 Q ( s , a ; θ ) Q(s,a;\theta) Q(s,a;θ)的参数复制给目标网络 Q t a r g e t ( s , a ; θ ′ ) Q_\mathrm{target}(s,a;\theta') Qtarget(s,a;θ)

3. 循环直至收敛
持续进行上述步骤,直至Q值收敛或达到预设的停止条件

3️⃣ 代码

1.创建一个名为network.py的文件

# 2024.11.7
import collections
import torch
import torch.nn as nn
import numpy as np
import random
import torch.nn.functional as F

class RelayBuffer():
    def __init__(self,capacity):
        # 创建一个双端队列,最大长度maxlen为capacity
        # 当队列达到这个最大长度后,如果再添加新元素,deque会自动移除最早添加的元素,从而保持长度不变
        self.buffer=collections.deque(maxlen=capacity)
    # 将经验打包成元组加入经验回放池
    # buffer最终会变成下面的样子    deque([(),(),()……])
    def add(self,state,action,reward,next_state,done):
        self.buffer.append((state,action,reward,next_state,done))
    # 统计经验回放池的大小
    def size(self):
        return len(self.buffer)
    def sample(self,batch_size):
        # random.sample(self.buffer, batch_size),从 self.buffer(经验池)中随机抽取 batch_size 个样本,抽取的样本是无序的
        # 返回值是一个列表,包含batchsize个经验,每个经验格式为元组(state, action, reward, next_state, done)
        # batch_samples是中包含的数据格式为[(state1, action1, reward1, next_state1, done1), ...]
        batch_samples=random.sample(self.buffer, batch_size)
        # *batch_samples:* 运算符会将batch_samples列表中的每个元组拆分,传入zip函数
        # zip(*batch_samples):zip 将所有的 state、action、reward、next_state和done各自组合到一个新列表中
        # 具体而言
        # state 会包含 batch_size 个状态。
        # action 会包含 batch_size 个动作。
        # reward 会包含 batch_size 个奖励。
        # next_state 会包含 batch_size 个下一状态。
        # done 会包含 batch_size 个布尔值
        state, action, reward, next_state, done = zip(*batch_samples)
        return np.array(state), action, reward, np.array(next_state), done


class Net(nn.Module):
    def __init__(self,n_states,n_hidden,n_actions):
        super().__init__()
        self.fc1=nn.Linear(n_states,n_hidden)
        self.fc2=nn.Linear(n_hidden,n_actions)
    
    def forward(self,x):
        out=self.fc1(x)
        out=self.fc2(out)
        # 返回所有动作的Q值
        return out

class DQN(nn.Module):
    def __init__(self,n_states,n_actions,n_hidden,lr,gamma,epsilon,target_update):
        super().__init__()
        # 状态空间大小
        self.n_states=n_states
        # 动作空间大小
        self.n_actions=n_actions
        # 隐藏层大小
        self.n_hidden=n_hidden
        # 学习率
        self.lr=lr
        # 折扣率
        self.gamma=gamma
        # 贪婪系数
        self.epsilon=epsilon
        # 目标网络更新频率
        self.target_update=target_update
        # update计数器,用于记录q网络更新的次数,每到self.target_update数量,对目标网络更新
        self.count=0

        # 初始化构建Q网络和目标网络
        self.q_net=Net(self.n_states,self.n_hidden,self.n_actions)
        self.target_net=Net(self.n_states, self.n_hidden, self.n_actions)

        # 优化器
        self.optimizer=torch.optim.Adam(self.q_net.parameters(),lr=self.lr)
    
    def take_action(self,state):
        # state是一个NumPy 数组,例如:[-0.04796449, -0.00108455,  0.03372429,  0.01539581],shape为(4,)
        # np.newaxis:这是 NumPy 中用于增加新轴(维度)的操作。state[np.newaxis, :] 会在 state 的第一个轴位置增加一个新维度
        # state[np.newaxis, :]会让state变成(1,4)
        # torch.Tensor():将 NumPy 数组转换为 PyTorch 的张量(Tensor)
        # 最终得到一个 (1, 4) 的 PyTorch 张量
        # 新添加一维的目的适配神经网络模型的输入,网络的输入第一维通常是批量大小
        state=torch.Tensor(state[np.newaxis,:])
        # 采用ε贪婪策略作为行为策略
        # np.random.random()生成一个位于 [0.0, 1.0) 区间的随机浮点数
        # 如果随机数小于ε,则选择估计的“最优的动作”
        if np.random.random()<self.epsilon:
            # self.q_net(state) 会隐式调用 self.q_net.forward(state)
            actions_value=self.q_net(state)
            # actions_value 是一个张量,例如,torch.tensor([0.5, 1.2]),包含当前状态下所有可能动作的 Q 值
            # actions_value.argmax()返回这个张量中最大值的索引,即tensor(1)
            # actions_value.argmax().item(),将tensor转为int
            action=actions_value.argmax().item()
        # 否则就进行探索
        else:
            # np.random.randint(self.n_actions)在[0, 2) 范围内随机选择一个整数
            # 相当与只能返回0或者1
            action=np.random.randint(self.n_actions)
        # 返回action,即0或1
        return action

    def update(self,sample_dict):
        # 取出batchsize个状态
        # torch.tensor():这是 PyTorch 的方法,用于将数据(如 NumPy 数组或列表)转换为 PyTorch 张量(Tensor)
        # dtype=torch.float:神经网络要求浮点型,因此指定数据类型为浮点型
        # batch_state大小为torch.Size([32,4])
        batch_state=torch.tensor(sample_dict['states'],dtype=torch.float)
        # 取出batchsize个动作,因为动作要么0要么1,torch.tensor() 默认会使用 torch.int64
        # torch.tensor(sample_dict['actions'])的大小是torch.Size([32])
        # .view(-1,1)将张量的形状重塑为torch.Size([32,1]),方便后续计算
        # -1 表示自动推断 batch_size,而 1 表示将其转换成列向量
        # batch_action大小为torch.Size([32,1])
        batch_action=torch.tensor(sample_dict['actions']).view(-1,1)
        # 取出batchsize个奖励,大小torch.Size([32,1])
        batch_reward=torch.tensor(sample_dict['rewards'], dtype=torch.float).view(-1,1)
        # 取出batchsize个下一个状态,大小torch.Size([32,4])
        batch_next_state= torch.tensor(sample_dict['next_states'], dtype=torch.float)
        # 取出batchsize个done,大小torch.Size([32,1])
        # sample_dict['dones']是布尔型的,转成float
        batch_done=torch.tensor(sample_dict['dones'],dtype=torch.float).view(-1,1)

        # 当前batchsize个状态的Q值
        # self.q_net(batch_state):将batchsize个 states传入Q网络q_net中,得到Q值,大小是[batchsize=32,2]
        # .gather(1, batch_action):1代表在[32,2]每行中,根据batch_action提取与动作对应的Q值
        # 最终输出torch.Size([32, 1])
        # ----------------------------------------------------
        # 例如
        # self.q_net(batch_state)为:
        # [[1.2, 2.3],
        # [0.5, 1.7],
        # [3.0, 0.9]].
        # batch_action为: [[1], [0], [1]]
        # gather就会在第一行选择2.3,第二行选择0.5,第三行选择0.9
        # 最终输出为[[2.3],[0.5],[0.9]]
        # ----------------------------------------------------
        q_value=self.q_net(batch_state).gather(1,batch_action)

        # batchsize个next state的最大Q值,这个是输入到目标网络中得到的,为了计算TD目标
        # self.target_q_net(batch_next_state):将batchsize个next_states传入目标网络target_net 中,得到Q值
        # self.target_q_net(batch_next_state)返回[32,2]
        # .max(1)返回两个值 ,第一个是每一行的最大值,以及对应的索引
        # ----------------------------------------------------
        # 例如
        # self.target_q_net(batch_next_state)为
        # [[1.2, 2.3], 
        # [0.5, 1.7]
        # [3.2, 1.7]]
        # .max(1)返回
        # values=tensor([2.3,1.7,3.2])
        # indices=tensor([1,1,0])
        # ----------------------------------------------------
        # [0]表示返回values=tensor([2.3,1.7,3.2]),该结果大小是torch.Size([32])
        # .view(-1,1) 将张量的形状重塑为torch.Size([32,1]),方便后续计算
        # -1 表示自动推断 batch_size,而 1 表示将其转换成列向量
        max_next_state_q=self.target_net(batch_next_state).max(1)[0].view(-1,1)

        # TD目标
        # done=1表示终止状态,所以只有1-done才有计算的意义
        # td_target大小为torch.Size([32, 1])
        td_target=batch_reward+self.gamma*max_next_state_q*(1-batch_done)


        # loss函数:均方误差
        # F.mse_loss 是 PyTorch 中的均方误差损失函数
        loss=F.mse_loss(q_value,td_target)
        # 梯度清零
        self.optimizer.zero_grad()
        # 反向传播
        loss.backward()
        # 更新权重
        self.optimizer.step()

        # 更新目标网络
        if self.count%self.target_update==0:
            # self.target_q_net.load_state_dict(...)将参数字典加载到模型中
            # self.q_net.state_dict()获取模型参数
            self.target_net.load_state_dict(self.q_net.state_dict())

        self.count+=1

2.创建一个名为train.py的文件

import torch
import gym
import numpy as np
from network import RelayBuffer,DQN
import matplotlib.pyplot as plt

# ---------------------------------------------------------------------------------
# 初始化变量
# ---------------------------------------------------------------------------------

# 经验池的容量
capacity=500
# 学习率
lr=2e-3
# 折扣率
gamma=0.9
# 贪心系数
epsilon=0.9
# Q网络每200次update,更新一次目标网络
target_update=200 
# 批量大小
batch_size=32
# 隐含层神经元个数
n_hidden=128
# 经验池的最小容量,超过该值后才能训练
min_size=200
# 记录每个回合的回报
return_list=[]

# ---------------------------------------------------------------------------------
# 初始化环境模型
# ---------------------------------------------------------------------------------
env=gym.make("CartPole-v1",render_mode="human")
# 状态空间:小车的位置、速度、杆子的角度和角速度
# ----------------------------------------------------
# env.observation_space会返回:Box([-4.8 -inf -0.41887903 -inf], [ 4.8 inf  0.41887903 inf], (4,), float32)
# Box 是 Gym 中的一种数据结构,用来表示连续的数值空间。在 CartPole-v1 中,Box 表示每个观测值的范围
# 小车位置在 -4.8 到 4.8 之间
# 小车速度和杆子角速度的范围为无穷大(-inf 到 inf),即理论上没有限制
# 杆子的角度范围在 -0.418 到 0.418 弧度(大约 ±24 度)之间
# (4,):表示这是一个四维的空间
# float32:表示观测值的数据类型是 32 位浮点数
# ----------------------------------------------------
# env.observation_space.shape返回(4,)
# env.observation_space.shape[0]返回4,表示状态空间的大小
n_states=env.observation_space.shape[0]
# 动作空间:向左施加力(0),向右施加力(1)
# ----------------------------------------------------
# env.action_space返回Discrete(2),这意味着动作空间是离散的,只有两个可能的动作
# env.action_space.n返回2
n_actions=env.action_space.n 

# 实例化经验回放池
relay_buffer=RelayBuffer(capacity=capacity)
# 实例化DQN
agent=DQN(n_states,n_actions,n_hidden,lr,gamma,epsilon,target_update)


# ---------------------------------------------------------------------------------
# 模型训练
# ---------------------------------------------------------------------------------


# 具体的逻辑如下:
# 设置N个eposide,初始化得到随机的状态state,done为False
#   进入while循环
#       随机状态state→通过take_action()→得到动作action→与环境交互→得到经验(下一状态,reward,done)
#       将经验加入到经验回访池
#       如果经验中done为True,代表游戏结束,跳出while循环
# --------------------------------------------------------------------
# 进入下一个eposide
#   进入while循环
#       重复上述过程
# --------------------------------------------------------------------
# 进入下一个eposide
#   进入while循环
#       ①随机状态state→通过take_action()→得到动作action→与环境交互→得到经验(下一状态,reward,done)
#       ②将经验加入到经验回访池        
#       ③此时经验回放池的长度大于200
#           采样batchsize个经验,训练Q网络
#       重复①②③
#       当①中的done变为True时,这个while循环就结束了
#       即在经验回放池的长度大于200后,每个eposide中,添加几个经验就会更新几次Q网络
# --------------------------------------------------------------------
# 当经验回访池中的长度大于500后,队头的就会被挤出,
for eposide in range(100):
    # 重置环境
    # env.reset()返回(array([-0.04796449, -0.00108455,  0.03372429,  0.01539581], dtype=float32), {})
    # env.reset()[0]初始状态[-0.04796449, -0.00108455,  0.03372429,  0.01539581]
    state=env.reset()[0]
    # 回合是否结束的标志
    done=False
    # 每个eposide的累计汇报
    eposide_return=0
    # 每个eposide新添加到经验回放池的经验数目
    count_per_eposide=0
    # 每个每个eposide更新的次数
    count_update=0

    while True:
        # 智能体根据当前状态选择动作
        # action是0或1,向左施加力(0),向右施加力(1)

        action=agent.take_action(state)
        # env.step(action) 是 Gym 环境中的一个方法,用于让代理在当前状态下执行指定的 action(动作),并推进环境到下一个状态
        # 返回一个包含5个元素的元组:(array([ 3.2732802e-04,  5.7809782e-01, -2.1898283e-03, -8.3598077e-01],dtype=float32), 1.0, False, False, {})
        # 分别是next_state(下一个状态),reward,done(回合是否结束),剩下的两个没用
        next_state,reward,done,_,_=env.step(action)
        # 将经验加入经验回放池
        relay_buffer.add(state,action,reward,next_state,done)
        # 计数器加一
        count_per_eposide+=1
        # 更新状态
        state=next_state
        # 该回合累计的奖励
        eposide_return+=reward
        


        # 当经验池容量大于min_size时,进行网络训练
        if relay_buffer.size()>min_size:
            # 从经验池中随机抽样一个batch作为训练集
            # s:batchsize个state
            # a:batchsize个action
            # r:batchsize个reward
            # ns:batchsize个next_state
            # d:batchsize个done
            s, a, r, ns, d = relay_buffer.sample(batch_size)
            # 构造训练样本的字典形式
            sample_dict = {
                'states': s,
                'actions': a,
                'next_states': ns,
                'rewards': r,
                'dones': d
            }
            # 更新DQN
            agent.update(sample_dict)
            count_update+=1
            print("Q网络已经更新%d次"%agent.count)

        if done:
            break         
    
    return_list.append(eposide_return)
    print("回合:%d,  加入到经验回放池的经验数为:%d,  经验回访池累计经验为:%d"%(eposide+1,count_per_eposide,relay_buffer.size()))
    print('更新的次数%d'%(count_update))
    print('-------------------------------------------------------------------')



# ------------------------------- #
# 绘制训练过程的回报变化曲线
# ------------------------------- #
episodes_list = list(range(len(return_list)))  # 回合索引列表
plt.plot(episodes_list, return_list)           # 绘制回合与回报的关系
plt.xlabel('Episodes')                         # x轴标签:回合数
plt.ylabel('Returns')                          # y轴标签:回报值
plt.title('DQN Returns')                       # 图标题
plt.savefig('D:\code\AI_ladder\\6_DQN\dqn.png') # 保存图表


4️⃣ 总结

  • DQN打开了深度强化学习的大门
  • 注意:计算TD目标的时候用的是目标函数


http://www.kler.cn/a/384720.html

相关文章:

  • Unet++改进3:添加NAMAttention注意力机制
  • 怎么将byte转换为String?
  • itextpdf打印A5的问题
  • 17、论文阅读:VMamba:视觉状态空间模型
  • leetcode字符串(二)-重复的子字符串
  • java list使用基本操作
  • 分享大模型发展进入新阶段,产业应用成为竞争焦点
  • 80后聊架构:架构设计中两个重要指标,延时与吞吐量(Latency vs Throughput) | 架构师之路...
  • Java链表及源码解析
  • 鸿蒙next选择 Flutter 开发跨平台应用的原因
  • 探索 Java 中 String 类的常用方法
  • MySQL分区表(二)
  • 2024-11-07 问AI: [AI面试题] 解释推荐系统的概念
  • WorkFlow源码剖析——Communicator之TCPServer(中)
  • Hive 的数据存储单元结构
  • 存储数据库的传输效率提升-ETLCloud结合HBASE
  • 《安全软件开发框架(SSDF) 1.1:降低软件漏洞风险的建议》解读(四)
  • Java项目实战II基于SpringBoot在线课程管理系统的设计与实现(开发文档+数据库+源码)
  • 特征检测与特征匹配方法笔记+代码分享
  • Supervisor的使用-ubuntu
  • 在OceanBase 中,实现自增列的4种方法
  • 练习题 - Django 4.x HTTP 网络协议使用示例和配置方法
  • OpenSSH 安全漏洞(CVE-2023-38408)解决方案
  • leetcode hot100【LeetCode 78. 子集】java实现
  • 船体平整如镜,玛哈特矫平机为航海安全保驾护航
  • Docker Compose部署Rabbitmq(Dockerfile安装延迟队列)