当前位置: 首页 > article >正文

Dyna-Q 算法_笔记_20241023

文章目录

  • 前言
  • 一、Dyna-Q 的原理
    • 1. 基本构架
    • 2. 算法流程
    • 3. 优点
    • 4. 挑战
  • 二、实践
    • **悬崖漫步的环境代码**
    • **DynaQ算法函数**
    • **Dyna-Q 算法在悬崖漫步环境中的训练函数**
    • 绘图
    • **结果**
  • 总结


前言

引用资源:Dyna-Q 算法
在这里插入图片描述


一、Dyna-Q 的原理

在这里插入图片描述

Dyna-Q 是一种结合了强化学习模型学习的方法,旨在提高学习的效率和效果。其基本思想是通过在强化学习中集成环境模型,使用模拟的经验来加速学习过程。以下是 Dyna-Q 的详细原理和组成部分:

1. 基本构架

Dyna-Q 的工作流程可以分为以下几个步骤:

  1. 学习模型

    • 在每次与环境交互后,智能体不仅更新价值函数或策略,还通过观察状态转移和奖励来构建或更新环境模型
      显然,Dyna-Q的特殊性在更新环境模型,环境模型可以加快学习模型的快速收敛。
    • 环境模型通常是一个近似函数,能够根据当前状态 s s s 和所采取的动作 a a a 预测下一个状态 s ′ s' s 及其对应的奖励 r r r
  2. 选择动作

    • 智能体在实际环境中选择动作并执行,根据得到的奖励和状态转移更新其价值函数(如 Q 值)。
  3. 规划步骤

    • 利用学习到的环境模型,智能体可以进行多次“虚拟”体验,模拟从当前状态开始采取不同动作的结果。
    • 这些虚拟体验使得智能体能够在不与真实环境交互的情况下,更新其价值函数。

2. 算法流程

在这里插入图片描述
在这里插入图片描述

Dyna-Q 的具体步骤如下:

  1. 初始化

    • 初始化 Q 值、状态转移模型和奖励模型。
  2. 与环境交互

    • 在每个时间步骤 t t t
      • 选择一个动作 a t a_t at(通常使用某种探索策略,如 ϵ \epsilon ϵ-贪心策略)。
      • 执行动作 a t a_t at,观察得到的奖励 r t r_t rt 和下一个状态 s t + 1 s_{t+1} st+1
      • 更新 Q 值:
        Q ( s t , a t ) ← Q ( s t , a t ) + α [ r t + γ max ⁡ a ′ Q ( s t + 1 , a ′ ) − Q ( s t , a t ) ] Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \left[ r_t + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t) \right] Q(st,at)Q(st,at)+α[rt+γamaxQ(st+1,a)Q(st,at)]
  3. 更新模型

    • 更新状态转移模型 P ( s ′ ∣ s , a ) P(s'|s, a) P(ss,a) 和奖励模型 R ( s , a ) R(s, a) R(s,a)
  4. 进行规划

    • 使用模型进行模拟体验:
      • 随机选择多个状态-动作对,从模型中获取虚拟的状态转移和奖励。
      • 更新 Q 值基于这些模拟的经验。
  5. 重复步骤 2 至 4,直到达到收敛或满足某种停止条件。

3. 优点

  • 样本效率:通过利用模型进行规划,Dyna-Q 能够在有限的真实经验基础上进行更多的学习。
  • 加速学习:智能体可以通过模拟快速地探索状态空间,找到更优的策略。
  • 灵活性:可以在不同的环境下快速适应,通过不断更新模型来应对动态变化的环境。

4. 挑战

  • 模型学习的准确性:模型的准确性对 Dyna-Q 的性能至关重要。如果模型不准确,可能会导致学习效果的降低。
  • 计算复杂度:进行大量模拟可能会增加计算开销,尤其是在状态空间较大的情况下。

Dyna-Q 将模型学习与经典的强化学习方法结合,使得智能体在学习过程中更为高效,为解决实际问题提供了良好的框架。

二、实践

悬崖漫步的环境代码

import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import random
import time


class CliffWalkingEnv:
    def __init__(self, ncol, nrow):
        self.nrow = nrow
        self.ncol = ncol
        self.x = 0  # 记录当前智能体位置的横坐标
        self.y = self.nrow - 1  # 记录当前智能体位置的纵坐标

    def step(self, action):  # 外部调用这个函数来改变当前位置
        # 4种动作, change[0]:上, change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        self.x = min(self.ncol - 1, max(0, self.x + change[action][0]))
        self.y = min(self.nrow - 1, max(0, self.y + change[action][1]))
        next_state = self.y * self.ncol + self.x
        reward = -1
        done = False
        if self.y == self.nrow - 1 and self.x > 0:  # 下一个位置在悬崖或者目标
            done = True
            if self.x != self.ncol - 1:
                reward = -100
        return next_state, reward, done

    def reset(self):  # 回归初始状态,起点在左上角
        self.x = 0
        self.y = self.nrow - 1
        return self.y * self.ncol + self.x

 self.x = min(self.ncol - 1, max(0, self.x + change[action][0]))

目的是限制 self.x的取值范围,[0,self.ncol - 1],实际取值为当前位置(x)加上选取的动作和集中的x坐标[change[action][0]]。


next_state = self.y * self.ncol + self.x

类似十进制,只是这里是以一整行为进制,即列坐标乘以列数加上横坐标可以得到与坐标对应的唯一值。


 if self.y == self.nrow - 1 and self.x > 0:  # 下一个位置在悬崖或者目标
            done = True

在这里插入图片描述
定义悬崖(cliff)范围


DynaQ算法函数

class DynaQ:
    """ Dyna-Q算法 """
    def __init__(self,
                 ncol,
                 nrow,
                 epsilon,
                 alpha,
                 gamma,
                 n_planning,
                 n_action=4):
        self.Q_table = np.zeros([nrow * ncol, n_action])  # 初始化Q(s,a)表格
        self.n_action = n_action  # 动作个数
        self.alpha = alpha  # 学习率
        self.gamma = gamma  # 折扣因子
        self.epsilon = epsilon  # epsilon-贪婪策略中的参数

        self.n_planning = n_planning  #执行Q-planning的次数, 对应1次Q-learning
        self.model = dict()  # 环境模型

    def take_action(self, state):  # 选取下一步的操作
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.n_action)
        else:
            action = np.argmax(self.Q_table[state])
        return action

    def q_learning(self, s0, a0, r, s1):
        td_error = r + self.gamma * self.Q_table[s1].max(
        ) - self.Q_table[s0, a0]
        self.Q_table[s0, a0] += self.alpha * td_error

    def update(self, s0, a0, r, s1):
        self.q_learning(s0, a0, r, s1)
        self.model[(s0, a0)] = r, s1  # 将数据添加到模型中
        for _ in range(self.n_planning):  # Q-planning循环
            # 随机选择曾经遇到过的状态动作对
            (s, a), (r, s_) = random.choice(list(self.model.items()))
            self.q_learning(s, a, r, s_)

 def take_action(self, state):  # 选取下一步的操作
        if np.random.random() < self.epsilon:
            action = np.random.randint(self.n_action)
        else:
            action = np.argmax(self.Q_table[state])
        return action

np.random.random()随机生成一个介于0到1之间的浮点数。
np.random.randint(self.n_action)随机生成一个从 0 到 self.n_action - 1的整数,并赋值给self.n_action


 def q_learning(self, s0, a0, r, s1):
        td_error = r + self.gamma * self.Q_table[s1].max(
        ) - self.Q_table[s0, a0]
        self.Q_table[s0, a0] += self.alpha * td_error

在这里插入图片描述


def update(self, s0, a0, r, s1):
        self.q_learning(s0, a0, r, s1)
        self.model[(s0, a0)] = r, s1  # 将数据添加到模型中
        for _ in range(self.n_planning):  # Q-planning循环
            # 随机选择曾经遇到过的状态动作对
            (s, a), (r, s_) = random.choice(list(self.model.items()))
            self.q_learning(s, a, r, s_)

self.q_learning来更新Q
(s, a), (r, s_) = random.choice(list(self.model.items()))为储存在模型中的状态动作对和其对应的奖励和新状态
def update(self, s0, a0, r, s1):利用以往的经验进行模拟更新


Dyna-Q 算法在悬崖漫步环境中的训练函数

def DynaQ_CliffWalking(n_planning):
    ncol = 12
    nrow = 4
    env = CliffWalkingEnv(ncol, nrow)
    epsilon = 0.01
    alpha = 0.1
    gamma = 0.9
    agent = DynaQ(ncol, nrow, epsilon, alpha, gamma, n_planning)
    num_episodes = 300  # 智能体在环境中运行多少条序列

    return_list = []  # 记录每一条序列的回报
    for i in range(10):  # 显示10个进度条
        # tqdm的进度条功能
        with tqdm(total=int(num_episodes / 10),
                  desc='Iteration %d' % i) as pbar:
            for i_episode in range(int(num_episodes / 10)):  # 每个进度条的序列数
                episode_return = 0
                state = env.reset()
                done = False
                while not done:
                    action = agent.take_action(state)
                    next_state, reward, done = env.step(action)
                    episode_return += reward  # 这里回报的计算不进行折扣因子衰减
                    agent.update(state, action, reward, next_state)
                    state = next_state
                return_list.append(episode_return)
                if (i_episode + 1) % 10 == 0:  # 每10条序列打印一下这10条序列的平均回报
                    pbar.set_postfix({
                        'episode':
                        '%d' % (num_episodes / 10 * i + i_episode + 1),
                        'return':
                        '%.3f' % np.mean(return_list[-10:])
                    })
                pbar.update(1)
    return return_list

 for i in range(10):  # 显示10个进度条
        # tqdm的进度条功能
        with tqdm(total=int(num_episodes / 10),
                  desc='Iteration %d' % i) as pbar:

for i in range(10):  # 显示10个进度条`
  • 作用:外部循环迭代次数为 10,将整个训练过程分成10个部分,每个部分显示一个进度条。
  • 目的:通过分阶段显示进度,便于观察智能体的学习过程,而不是在训练结束时才查看结果。

 with tqdm(total=int(num_episodes / 10),
                  desc='Iteration %d' % i) as pbar:
  • tqdm:一个用于显示循环进度条的Python库,适合长时间运行的任务。
  • total=int(num_episodes / 10):设置进度条的总长度。
    • num_episodes 是训练的总序列数,例如 300。
    • int(num_episodes / 10) 表示每个进度条显示的序列数,这里是 30 条。
  • desc='Iteration %d' % i:设置进度条的描述文字。
    • 使用字符串格式化,将当前迭代次数 i 显示在进度条上。例如,第一次迭代时显示 Iteration 0,第二次迭代时显示 Iteration 1,依此类推。
  • as pbar:创建进度条对象,并命名为 pbar。这个对象可以在 with 块内使用来更新进度条状态。

上下文管理器 with

  • 上下文管理:with tqdm(...) as pbar: 创建一个上下文管理器,用于管理进度条的生命周期。
    • 进入 with 块时:进度条 pbar 被初始化并创建。
    • 退出 with 块时:无论是否发生异常,进度条都会正确关闭,确保资源管理和输出整洁。

            for i_episode in range(int(num_episodes / 10)):  # 每个进度条的序列数
                episode_return = 0
                state = env.reset()
                done = False
                while not done:
                    action = agent.take_action(state)
                    next_state, reward, done = env.step(action)
                    episode_return += reward  # 这里回报的计算不进行折扣因子衰减
                    agent.update(state, action, reward, next_state)
                    state = next_state
                return_list.append(episode_return)

在这里插入图片描述


绘图

np.random.seed(0)
random.seed(0)
n_planning_list = [0, 2, 20]
for n_planning in n_planning_list:
    print('Q-planning步数为:%d' % n_planning)
    time.sleep(0.5)
    return_list = DynaQ_CliffWalking(n_planning)
    episodes_list = list(range(len(return_list)))
    plt.plot(episodes_list,
             return_list,
             label=str(n_planning) + ' planning steps')
plt.legend()
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('Dyna-Q on {}'.format('Cliff Walking'))
plt.show()

结果

在这里插入图片描述

在这里插入图片描述


总结


http://www.kler.cn/news/368680.html

相关文章:

  • 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-24
  • spring final修饰
  • 【gRPC】什么是RPC——介绍一下RPC
  • python构建flask服务用于视频文件的处理后返回
  • 【自然语言处理】BERT模型
  • 清华大学《2022年+2021年822自动控制原理真题》 (完整版)
  • 微信小程序-获取头像和昵称
  • CSS中的!important和空格选择器深入解析
  • 安全运营 -- 监控linux命令history
  • Python量子生成对抗网络QGAN神经网络药物发现、多方法乳腺癌药物筛选应用
  • 开放式耳机哪个品牌音质好?音质最好的开放式耳机推荐!
  • linux上离线安装python环境以及机器学习环境
  • 什么是DDoS脉冲攻击?怎么防御?
  • vue项目常见模块的安装及使用
  • 【Qt】系统相关——多线程、Qt多线程介绍、常用函数、线程安全、网络、UDP Socket、TCP Socket
  • Axios与Java Spring交互:RESTful API设计与实现全解析
  • 图像变换的知识
  • Bash 中的 ${} 和 $() 有什么区别 ?
  • 使用田口实验获取优化算法的参数最优组合
  • 【系统面试篇】进程和线程的区别
  • GEE app:全球地表水使用 ui.root.onResize() 演示移动友好型用户界面
  • 2025选题推荐|基于SpringBoot+Vue实现大学生双创竟赛项目申报与路演管理系统
  • Unity DOTS
  • 利用ADPF性能提示优化Android应用体验
  • Flink Python作业快速入门
  • 百度搜索推广和信息流推广的区别,分别适用于什么场景!