当前位置: 首页 > article >正文

基于蒙特卡洛方法的网格世界求解

一、概述

该代码主要实现了在一个简单的网格世界环境中,使用蒙特卡洛方法来求解最优值函数和最优策略。网格世界由一个固定大小的方格组成,智能体可以在其中执行特定的动作(上、下、左、右),并根据状态转移规则和奖励函数获得相应的奖励。蒙特卡洛方法通过采样多个情节(episode),根据每个情节中状态的回报来更新值函数,并通过策略改进逐步得到最优策略。

二、依赖库

  1. numpy:用于数值计算,包括数组操作、求均值等。
  2. matplotlib.pyplot:用于数据可视化,绘制值函数和最优策略的图形。

三、代码详解

1. 环境参数定义

python

# 定义网格世界的大小
GRID_SIZE = 5
ACTIONS = ['up', 'down', 'left', 'right']  # 可能的动作
REWARD_END = 10  # 到达终点的奖励
REWARD_STEP = -1  # 每步的惩罚
GAMMA = 0.9  # 折扣因子

# 定义网格世界的起点和终点
START = (0, 0)
END = (GRID_SIZE - 1, GRID_SIZE - 1)

  • GRID_SIZE:表示网格世界的边长,这里设置为 5,即网格世界是一个 5x5 的方格。
  • ACTIONS:列出了智能体在网格世界中可以采取的四个动作,分别是向上、向下、向左、向右。
  • REWARD_END:当智能体到达终点时所获得的奖励值,设定为 10。
  • REWARD_STEP:智能体在每一步移动时所受到的惩罚值,为 - 1。
  • GAMMA:折扣因子,用于计算未来奖励的现值,取值为 0.9。
  • START:定义了智能体的起始状态,坐标为 (0, 0),即网格世界的左上角方格。
  • END:定义了目标状态,坐标为 (GRID_SIZE - 1, GRID_SIZE - 1),即网格世界的右下角方格。

2. 状态转移函数

python

def transition(state, action):
    """
    根据当前状态和动作,返回下一个状态。
    :param state: 当前状态 (x, y)
    :param action: 动作 ('up', 'down', 'left', 'right')
    :return: 下一个状态 (x, y)
    """
    x, y = state
    if action == 'up':
        x = max(x - 1, 0)
    elif action == 'down':
        x = min(x + 1, GRID_SIZE - 1)
    elif action == 'left':
        y = max(y - 1, 0)
    elif action == 'right':
        y = min(y + 1, GRID_SIZE - 1)
    return (x, y)

该函数根据输入的当前状态state(一个坐标元组 (x, y))和动作action,计算并返回下一个状态。通过对当前状态的坐标进行相应的增减操作来实现状态转移,同时确保坐标值不会超出网格世界的边界范围。

3. 奖励函数

python

def reward(state, next_state):
    """
    根据当前状态和下一个状态,返回奖励。
    :param state: 当前状态 (x, y)
    :param next_state: 下一个状态 (x, y)
    :return: 奖励值
    """
    if next_state == END:
        return REWARD_END
    else:
        return REWARD_STEP

此函数根据当前状态state和下一个状态next_state,返回相应的奖励值。如果下一个状态是目标状态END,则返回到达终点的奖励REWARD_END;否则返回每步的惩罚值REWARD_STEP

4. 初始化值函数和策略

python

def initialize():
    """
    初始化值函数和策略。
    :return: 值函数和策略
    """
    states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
    V = {state: 0 for state in states}  # 值函数
    policy = {state: np.random.choice(ACTIONS) for state in states}  # 随机策略
    policy[END] = None  # 终点没有动作
    return V, policy

  • 首先生成网格世界中的所有状态,存储在states列表中。
  • 初始化值函数V,将每个状态的值初始化为 0。
  • 初始化策略policy,为每个状态随机选择一个动作(从ACTIONS中随机选取),但终点状态END设置为None,表示在终点没有动作可执行。
  • 最后返回初始化的值函数V和策略policy

5. 采样一个情节(episode)

python

def generate_episode(policy):
    """
    根据当前策略采样一个 episode。
    :param policy: 策略,字典形式,键为状态,值为动作
    :return: episode,包含状态、动作和奖励的列表
    """
    episode = []
    state = START
    while state != END:
        action = policy[state]
        next_state = transition(state, action)
        r = reward(state, next_state)
        episode.append((state, action, r))
        state = next_state
    return episode

  • 该函数根据输入的策略policy,从起始状态START开始,不断执行动作并转移状态,直到到达终点状态END
  • 在每次循环中,获取当前状态对应的动作action,计算下一个状态next_state和奖励r,并将当前状态、动作和奖励以元组的形式添加到episode列表中。
  • 最终返回一个包含状态、动作和奖励的列表episode,表示一个完整的情节。

6. 蒙特卡洛法:更新值函数和策略

python

def monte_carlo(V, policy, num_episodes=1000):
    """
    使用蒙特卡洛法更新值函数和策略。
    :param V: 值函数,字典形式,键为状态,值为值函数值
    :param policy: 策略,字典形式,键为状态,值为动作
    :param num_episodes: 采样的 episode 数量
    :return: 更新后的值函数和策略
    """
    returns = {state: [] for state in V.keys()}  # 记录每个状态的回报
    for _ in range(num_episodes):
        episode = generate_episode(policy)
        G = 0  # 累积回报
        # 从后向前遍历 episode
        for t in reversed(range(len(episode))):
            state, action, r = episode[t]
            G = r + GAMMA * G
            # 首次访问蒙特卡洛法
            if state not in [x[0] for x in episode[:t]]:
                returns[state].append(G)
                V[state] = np.mean(returns[state])
                # 策略改进
                best_action = None
                best_value = -np.inf
                for a in ACTIONS:
                    next_state = transition(state, a)
                    value = reward(state, next_state) + GAMMA * V[next_state]
                    if value > best_value:
                        best_value = value
                        best_action = a
                policy[state] = best_action
    return V, policy

  • 参数说明
    • V:当前的值函数,是一个字典,键为状态,值为对应状态的值函数值。
    • policy:当前的策略,也是一个字典,键为状态,值为对应状态的动作。
    • num_episodes:要采样的情节数量,默认为 1000。
  • 函数逻辑
    • 初始化returns字典,用于记录每个状态的回报列表。
    • 循环采样num_episodes个情节:
      • 生成一个情节episode
      • 初始化累积回报G为 0,从后向前遍历情节中的每个步骤:
        • 计算当前步骤的回报G,通过即时奖励r和折扣因子GAMMA更新累积回报。
        • 如果当前状态是首次访问(即之前在该情节中未出现过),将当前回报G添加到returns[state]中,并更新值函数V[state]为该状态所有回报的均值。
        • 进行策略改进:遍历所有可能的动作a,计算采取该动作后的下一个状态的价值value,选择使价值最大的动作作为当前状态的新策略policy[state]
    • 最后返回更新后的值函数V和策略policy

7. 生成所有状态

python

states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]

使用列表推导式生成一个包含网格世界中所有状态的列表,每个状态由一个坐标元组(x, y)表示。

8. 初始化值函数和策略

python

V, policy = initialize()

调用initialize函数,初始化值函数V和策略policy

9. 使用蒙特卡洛法更新值函数和策略

python

V, policy = monte_carlo(V, policy, num_episodes=1000)

调用monte_carlo函数,传入初始化的值函数V、策略policy和情节数量 1000,对值函数和策略进行更新。

10. 可视化值函数

python

# 可视化值函数
grid_value = np.zeros((GRID_SIZE, GRID_SIZE))
for state in states:
    grid_value[state] = V[state]

plt.imshow(grid_value, cmap='viridis')
plt.colorbar(label='Value')
plt.title('State Value Function (Monte Carlo)')
plt.show()

将更新后的值函数V转换为一个二维数组grid_value,其中每个元素对应网格世界中一个状态的值函数。使用matplotlib.pyplotimshow函数绘制该二维数组的热图,并添加颜色条和标题,以可视化值函数。

11. 可视化最优策略

python

# 可视化最优策略
grid_policy = np.empty((GRID_SIZE, GRID_SIZE), dtype=object)
for state in states:
    grid_policy[state] = policy[state]

fig, ax = plt.subplots()
ax.imshow(np.ones((GRID_SIZE, GRID_SIZE)), cmap='gray')
for state in states:
    if state == END:
        ax.text(state[1], state[0], 'END', ha='center', va='center')
        continue
    action = policy[state]
    if action == 'up':
        ax.arrow(state[1], state[0], 0, -0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
    elif action == 'down':
        ax.arrow(state[1], state[0], 0, 0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
    elif action == 'left':
        ax.arrow(state[1], state[0], -0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
    elif action == 'right':
        ax.arrow(state[1], state[0], 0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
ax.set_title('Optimal Policy (Monte Carlo)')
plt.show()

将更新后的最优策略policy转换为一个二维数组grid_policy,其中每个元素对应网格世界中一个状态的最优动作。使用matplotlib.pyplot绘制一个灰色背景的图形,并在每个状态上绘制箭头表示最优动作(如果是目标状态,则显示'END'),以可视化最优策略。

四、注意事项

  1. 代码中的蒙特卡洛方法采用了首次访问蒙特卡洛法,这意味着每个情节中每个状态只在首次出现时用于更新值函数。
  2. 网格世界的定义和参数(如大小、奖励、折扣因子等)可以根据具体问题进行调整。
  3. 可视化部分使用了matplotlib库,确保该库已经正确安装并配置。如果在运行代码时遇到图形显示问题,可以检查matplotlib的设置或尝试其他可视化工具。
  4. 蒙特卡洛方法的性能和收敛速度可能受到采样情节数量的影响,可根据实际情况调整num_episodes参数。

完整代码

import numpy as np
import matplotlib.pyplot as plt

# 定义网格世界的大小
GRID_SIZE = 5
ACTIONS = ['up', 'down', 'left', 'right']  # 可能的动作
REWARD_END = 10  # 到达终点的奖励
REWARD_STEP = -1  # 每步的惩罚
GAMMA = 0.9  # 折扣因子

# 定义网格世界的起点和终点
START = (0, 0)
END = (GRID_SIZE - 1, GRID_SIZE - 1)

# 定义状态转移函数
def transition(state, action):
    """
    根据当前状态和动作,返回下一个状态。
    :param state: 当前状态 (x, y)
    :param action: 动作 ('up', 'down', 'left', 'right')
    :return: 下一个状态 (x, y)
    """
    x, y = state
    if action == 'up':
        x = max(x - 1, 0)
    elif action == 'down':
        x = min(x + 1, GRID_SIZE - 1)
    elif action == 'left':
        y = max(y - 1, 0)
    elif action == 'right':
        y = min(y + 1, GRID_SIZE - 1)
    return (x, y)

# 定义奖励函数
def reward(state, next_state):
    """
    根据当前状态和下一个状态,返回奖励。
    :param state: 当前状态 (x, y)
    :param next_state: 下一个状态 (x, y)
    :return: 奖励值
    """
    if next_state == END:
        return REWARD_END
    else:
        return REWARD_STEP

# 初始化值函数和策略
def initialize():
    """
    初始化值函数和策略。
    :return: 值函数和策略
    """
    states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
    V = {state: 0 for state in states}  # 值函数
    policy = {state: np.random.choice(ACTIONS) for state in states}  # 随机策略
    policy[END] = None  # 终点没有动作
    return V, policy

# 采样一个 episode
def generate_episode(policy):
    """
    根据当前策略采样一个 episode。
    :param policy: 策略,字典形式,键为状态,值为动作
    :return: episode,包含状态、动作和奖励的列表
    """
    episode = []
    state = START
    while state != END:
        action = policy[state]
        next_state = transition(state, action)
        r = reward(state, next_state)
        episode.append((state, action, r))
        state = next_state
    return episode

# 蒙特卡洛法:更新值函数和策略
def monte_carlo(V, policy, num_episodes=1000):
    """
    使用蒙特卡洛法更新值函数和策略。
    :param V: 值函数,字典形式,键为状态,值为值函数值
    :param policy: 策略,字典形式,键为状态,值为动作
    :param num_episodes: 采样的 episode 数量
    :return: 更新后的值函数和策略
    """
    returns = {state: [] for state in V.keys()}  # 记录每个状态的回报
    for _ in range(num_episodes):
        episode = generate_episode(policy)
        G = 0  # 累积回报
        # 从后向前遍历 episode
        for t in reversed(range(len(episode))):
            state, action, r = episode[t]
            G = r + GAMMA * G
            # 首次访问蒙特卡洛法
            if state not in [x[0] for x in episode[:t]]:
                returns[state].append(G)
                V[state] = np.mean(returns[state])
                # 策略改进
                best_action = None
                best_value = -np.inf
                for a in ACTIONS:
                    next_state = transition(state, a)
                    value = reward(state, next_state) + GAMMA * V[next_state]
                    if value > best_value:
                        best_value = value
                        best_action = a
                policy[state] = best_action
    return V, policy

# 生成所有状态
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]

# 初始化值函数和策略
V, policy = initialize()

# 使用蒙特卡洛法更新值函数和策略
V, policy = monte_carlo(V, policy, num_episodes=1000)

# 可视化值函数
grid_value = np.zeros((GRID_SIZE, GRID_SIZE))
for state in states:
    grid_value[state] = V[state]

plt.imshow(grid_value, cmap='viridis')
plt.colorbar(label='Value')
plt.title('State Value Function (Monte Carlo)')
plt.show()

# 可视化最优策略
grid_policy = np.empty((GRID_SIZE, GRID_SIZE), dtype=object)
for state in states:
    grid_policy[state] = policy[state]

fig, ax = plt.subplots()
ax.imshow(np.ones((GRID_SIZE, GRID_SIZE)), cmap='gray')
for state in states:
    if state == END:
        ax.text(state[1], state[0], 'END', ha='center', va='center')
        continue
    action = policy[state]
    if action == 'up':
        ax.arrow(state[1], state[0], 0, -0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
    elif action == 'down':
        ax.arrow(state[1], state[0], 0, 0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
    elif action == 'left':
        ax.arrow(state[1], state[0], -0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
    elif action == 'right':
        ax.arrow(state[1], state[0], 0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
ax.set_title('Optimal Policy (Monte Carlo)')
plt.show()


http://www.kler.cn/a/596769.html

相关文章:

  • 使用netDxf扩充LaserGRBL使它支持Dxf文件格式
  • 在刀刃上发力:如何精准把握计划关键节点
  • uniapp 微信小程序 手机号快速验证组件 解密 encryptedData 获取手机号
  • docker速通
  • OAuth 2.0认证
  • doris:负载均衡
  • 【数据预测】基于遗传算法GA的LSTM光伏功率预测 GA-LSTM光伏功率预测【Matlab代码#91】
  • OpenHarmony 开源硬件学习全指南:从入门到实战
  • JAVA - OJ沙箱(JAVA默认模板沙箱,JAVA操作dokcer沙箱)
  • MacOS安装 nextcloud 的 Virtual File System
  • LangChain组件Tools/Toolkits详解(6)——特殊类型注解Annotations
  • llama源码学习·model.py[4]Attention注意力(2)源码分析
  • 洛谷 [语言月赛 202503] 题解(C++)
  • (滑动窗口)算法训练篇11--力扣3.无重复字符的最长字串(难度中等)
  • ROM(只读存储器) 、SRAM(静态随机存储器) 和 Flash(闪存) 的详细解析
  • Centos编译升级libcurl
  • DeepSeek自学手册:《从理论(模型训练)到实践(模型应用)》|73页|附PPT下载方法
  • NVM 多版本node.js管理工具
  • Linux用户管理实操指南
  • 【 <二> 丹方改良:Spring 时代的 JavaWeb】之 Spring Boot 中的异常处理:全局异常与自定义异常