基于蒙特卡洛方法的网格世界求解
一、概述
该代码主要实现了在一个简单的网格世界环境中,使用蒙特卡洛方法来求解最优值函数和最优策略。网格世界由一个固定大小的方格组成,智能体可以在其中执行特定的动作(上、下、左、右),并根据状态转移规则和奖励函数获得相应的奖励。蒙特卡洛方法通过采样多个情节(episode),根据每个情节中状态的回报来更新值函数,并通过策略改进逐步得到最优策略。
二、依赖库
numpy
:用于数值计算,包括数组操作、求均值等。matplotlib.pyplot
:用于数据可视化,绘制值函数和最优策略的图形。
三、代码详解
1. 环境参数定义
python
# 定义网格世界的大小
GRID_SIZE = 5
ACTIONS = ['up', 'down', 'left', 'right'] # 可能的动作
REWARD_END = 10 # 到达终点的奖励
REWARD_STEP = -1 # 每步的惩罚
GAMMA = 0.9 # 折扣因子
# 定义网格世界的起点和终点
START = (0, 0)
END = (GRID_SIZE - 1, GRID_SIZE - 1)
GRID_SIZE
:表示网格世界的边长,这里设置为 5,即网格世界是一个 5x5 的方格。ACTIONS
:列出了智能体在网格世界中可以采取的四个动作,分别是向上、向下、向左、向右。REWARD_END
:当智能体到达终点时所获得的奖励值,设定为 10。REWARD_STEP
:智能体在每一步移动时所受到的惩罚值,为 - 1。GAMMA
:折扣因子,用于计算未来奖励的现值,取值为 0.9。START
:定义了智能体的起始状态,坐标为 (0, 0),即网格世界的左上角方格。END
:定义了目标状态,坐标为 (GRID_SIZE - 1, GRID_SIZE - 1),即网格世界的右下角方格。
2. 状态转移函数
python
def transition(state, action):
"""
根据当前状态和动作,返回下一个状态。
:param state: 当前状态 (x, y)
:param action: 动作 ('up', 'down', 'left', 'right')
:return: 下一个状态 (x, y)
"""
x, y = state
if action == 'up':
x = max(x - 1, 0)
elif action == 'down':
x = min(x + 1, GRID_SIZE - 1)
elif action == 'left':
y = max(y - 1, 0)
elif action == 'right':
y = min(y + 1, GRID_SIZE - 1)
return (x, y)
该函数根据输入的当前状态state
(一个坐标元组 (x, y))和动作action
,计算并返回下一个状态。通过对当前状态的坐标进行相应的增减操作来实现状态转移,同时确保坐标值不会超出网格世界的边界范围。
3. 奖励函数
python
def reward(state, next_state):
"""
根据当前状态和下一个状态,返回奖励。
:param state: 当前状态 (x, y)
:param next_state: 下一个状态 (x, y)
:return: 奖励值
"""
if next_state == END:
return REWARD_END
else:
return REWARD_STEP
此函数根据当前状态state
和下一个状态next_state
,返回相应的奖励值。如果下一个状态是目标状态END
,则返回到达终点的奖励REWARD_END
;否则返回每步的惩罚值REWARD_STEP
。
4. 初始化值函数和策略
python
def initialize():
"""
初始化值函数和策略。
:return: 值函数和策略
"""
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
V = {state: 0 for state in states} # 值函数
policy = {state: np.random.choice(ACTIONS) for state in states} # 随机策略
policy[END] = None # 终点没有动作
return V, policy
- 首先生成网格世界中的所有状态,存储在
states
列表中。 - 初始化值函数
V
,将每个状态的值初始化为 0。 - 初始化策略
policy
,为每个状态随机选择一个动作(从ACTIONS
中随机选取),但终点状态END
设置为None
,表示在终点没有动作可执行。 - 最后返回初始化的值函数
V
和策略policy
。
5. 采样一个情节(episode)
python
def generate_episode(policy):
"""
根据当前策略采样一个 episode。
:param policy: 策略,字典形式,键为状态,值为动作
:return: episode,包含状态、动作和奖励的列表
"""
episode = []
state = START
while state != END:
action = policy[state]
next_state = transition(state, action)
r = reward(state, next_state)
episode.append((state, action, r))
state = next_state
return episode
- 该函数根据输入的策略
policy
,从起始状态START
开始,不断执行动作并转移状态,直到到达终点状态END
。 - 在每次循环中,获取当前状态对应的动作
action
,计算下一个状态next_state
和奖励r
,并将当前状态、动作和奖励以元组的形式添加到episode
列表中。 - 最终返回一个包含状态、动作和奖励的列表
episode
,表示一个完整的情节。
6. 蒙特卡洛法:更新值函数和策略
python
def monte_carlo(V, policy, num_episodes=1000):
"""
使用蒙特卡洛法更新值函数和策略。
:param V: 值函数,字典形式,键为状态,值为值函数值
:param policy: 策略,字典形式,键为状态,值为动作
:param num_episodes: 采样的 episode 数量
:return: 更新后的值函数和策略
"""
returns = {state: [] for state in V.keys()} # 记录每个状态的回报
for _ in range(num_episodes):
episode = generate_episode(policy)
G = 0 # 累积回报
# 从后向前遍历 episode
for t in reversed(range(len(episode))):
state, action, r = episode[t]
G = r + GAMMA * G
# 首次访问蒙特卡洛法
if state not in [x[0] for x in episode[:t]]:
returns[state].append(G)
V[state] = np.mean(returns[state])
# 策略改进
best_action = None
best_value = -np.inf
for a in ACTIONS:
next_state = transition(state, a)
value = reward(state, next_state) + GAMMA * V[next_state]
if value > best_value:
best_value = value
best_action = a
policy[state] = best_action
return V, policy
- 参数说明:
V
:当前的值函数,是一个字典,键为状态,值为对应状态的值函数值。policy
:当前的策略,也是一个字典,键为状态,值为对应状态的动作。num_episodes
:要采样的情节数量,默认为 1000。
- 函数逻辑:
- 初始化
returns
字典,用于记录每个状态的回报列表。 - 循环采样
num_episodes
个情节:- 生成一个情节
episode
。 - 初始化累积回报
G
为 0,从后向前遍历情节中的每个步骤:- 计算当前步骤的回报
G
,通过即时奖励r
和折扣因子GAMMA
更新累积回报。 - 如果当前状态是首次访问(即之前在该情节中未出现过),将当前回报
G
添加到returns[state]
中,并更新值函数V[state]
为该状态所有回报的均值。 - 进行策略改进:遍历所有可能的动作
a
,计算采取该动作后的下一个状态的价值value
,选择使价值最大的动作作为当前状态的新策略policy[state]
。
- 计算当前步骤的回报
- 生成一个情节
- 最后返回更新后的值函数
V
和策略policy
。
- 初始化
7. 生成所有状态
python
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
使用列表推导式生成一个包含网格世界中所有状态的列表,每个状态由一个坐标元组(x, y)
表示。
8. 初始化值函数和策略
python
V, policy = initialize()
调用initialize
函数,初始化值函数V
和策略policy
。
9. 使用蒙特卡洛法更新值函数和策略
python
V, policy = monte_carlo(V, policy, num_episodes=1000)
调用monte_carlo
函数,传入初始化的值函数V
、策略policy
和情节数量 1000,对值函数和策略进行更新。
10. 可视化值函数
python
# 可视化值函数
grid_value = np.zeros((GRID_SIZE, GRID_SIZE))
for state in states:
grid_value[state] = V[state]
plt.imshow(grid_value, cmap='viridis')
plt.colorbar(label='Value')
plt.title('State Value Function (Monte Carlo)')
plt.show()
将更新后的值函数V
转换为一个二维数组grid_value
,其中每个元素对应网格世界中一个状态的值函数。使用matplotlib.pyplot
的imshow
函数绘制该二维数组的热图,并添加颜色条和标题,以可视化值函数。
11. 可视化最优策略
python
# 可视化最优策略
grid_policy = np.empty((GRID_SIZE, GRID_SIZE), dtype=object)
for state in states:
grid_policy[state] = policy[state]
fig, ax = plt.subplots()
ax.imshow(np.ones((GRID_SIZE, GRID_SIZE)), cmap='gray')
for state in states:
if state == END:
ax.text(state[1], state[0], 'END', ha='center', va='center')
continue
action = policy[state]
if action == 'up':
ax.arrow(state[1], state[0], 0, -0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
elif action == 'down':
ax.arrow(state[1], state[0], 0, 0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
elif action == 'left':
ax.arrow(state[1], state[0], -0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
elif action == 'right':
ax.arrow(state[1], state[0], 0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
ax.set_title('Optimal Policy (Monte Carlo)')
plt.show()
将更新后的最优策略policy
转换为一个二维数组grid_policy
,其中每个元素对应网格世界中一个状态的最优动作。使用matplotlib.pyplot
绘制一个灰色背景的图形,并在每个状态上绘制箭头表示最优动作(如果是目标状态,则显示'END'
),以可视化最优策略。
四、注意事项
- 代码中的蒙特卡洛方法采用了首次访问蒙特卡洛法,这意味着每个情节中每个状态只在首次出现时用于更新值函数。
- 网格世界的定义和参数(如大小、奖励、折扣因子等)可以根据具体问题进行调整。
- 可视化部分使用了
matplotlib
库,确保该库已经正确安装并配置。如果在运行代码时遇到图形显示问题,可以检查matplotlib
的设置或尝试其他可视化工具。 - 蒙特卡洛方法的性能和收敛速度可能受到采样情节数量的影响,可根据实际情况调整
num_episodes
参数。
完整代码
import numpy as np
import matplotlib.pyplot as plt
# 定义网格世界的大小
GRID_SIZE = 5
ACTIONS = ['up', 'down', 'left', 'right'] # 可能的动作
REWARD_END = 10 # 到达终点的奖励
REWARD_STEP = -1 # 每步的惩罚
GAMMA = 0.9 # 折扣因子
# 定义网格世界的起点和终点
START = (0, 0)
END = (GRID_SIZE - 1, GRID_SIZE - 1)
# 定义状态转移函数
def transition(state, action):
"""
根据当前状态和动作,返回下一个状态。
:param state: 当前状态 (x, y)
:param action: 动作 ('up', 'down', 'left', 'right')
:return: 下一个状态 (x, y)
"""
x, y = state
if action == 'up':
x = max(x - 1, 0)
elif action == 'down':
x = min(x + 1, GRID_SIZE - 1)
elif action == 'left':
y = max(y - 1, 0)
elif action == 'right':
y = min(y + 1, GRID_SIZE - 1)
return (x, y)
# 定义奖励函数
def reward(state, next_state):
"""
根据当前状态和下一个状态,返回奖励。
:param state: 当前状态 (x, y)
:param next_state: 下一个状态 (x, y)
:return: 奖励值
"""
if next_state == END:
return REWARD_END
else:
return REWARD_STEP
# 初始化值函数和策略
def initialize():
"""
初始化值函数和策略。
:return: 值函数和策略
"""
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
V = {state: 0 for state in states} # 值函数
policy = {state: np.random.choice(ACTIONS) for state in states} # 随机策略
policy[END] = None # 终点没有动作
return V, policy
# 采样一个 episode
def generate_episode(policy):
"""
根据当前策略采样一个 episode。
:param policy: 策略,字典形式,键为状态,值为动作
:return: episode,包含状态、动作和奖励的列表
"""
episode = []
state = START
while state != END:
action = policy[state]
next_state = transition(state, action)
r = reward(state, next_state)
episode.append((state, action, r))
state = next_state
return episode
# 蒙特卡洛法:更新值函数和策略
def monte_carlo(V, policy, num_episodes=1000):
"""
使用蒙特卡洛法更新值函数和策略。
:param V: 值函数,字典形式,键为状态,值为值函数值
:param policy: 策略,字典形式,键为状态,值为动作
:param num_episodes: 采样的 episode 数量
:return: 更新后的值函数和策略
"""
returns = {state: [] for state in V.keys()} # 记录每个状态的回报
for _ in range(num_episodes):
episode = generate_episode(policy)
G = 0 # 累积回报
# 从后向前遍历 episode
for t in reversed(range(len(episode))):
state, action, r = episode[t]
G = r + GAMMA * G
# 首次访问蒙特卡洛法
if state not in [x[0] for x in episode[:t]]:
returns[state].append(G)
V[state] = np.mean(returns[state])
# 策略改进
best_action = None
best_value = -np.inf
for a in ACTIONS:
next_state = transition(state, a)
value = reward(state, next_state) + GAMMA * V[next_state]
if value > best_value:
best_value = value
best_action = a
policy[state] = best_action
return V, policy
# 生成所有状态
states = [(x, y) for x in range(GRID_SIZE) for y in range(GRID_SIZE)]
# 初始化值函数和策略
V, policy = initialize()
# 使用蒙特卡洛法更新值函数和策略
V, policy = monte_carlo(V, policy, num_episodes=1000)
# 可视化值函数
grid_value = np.zeros((GRID_SIZE, GRID_SIZE))
for state in states:
grid_value[state] = V[state]
plt.imshow(grid_value, cmap='viridis')
plt.colorbar(label='Value')
plt.title('State Value Function (Monte Carlo)')
plt.show()
# 可视化最优策略
grid_policy = np.empty((GRID_SIZE, GRID_SIZE), dtype=object)
for state in states:
grid_policy[state] = policy[state]
fig, ax = plt.subplots()
ax.imshow(np.ones((GRID_SIZE, GRID_SIZE)), cmap='gray')
for state in states:
if state == END:
ax.text(state[1], state[0], 'END', ha='center', va='center')
continue
action = policy[state]
if action == 'up':
ax.arrow(state[1], state[0], 0, -0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
elif action == 'down':
ax.arrow(state[1], state[0], 0, 0.3, head_width=0.2, head_length=0.1, fc='r', ec='r')
elif action == 'left':
ax.arrow(state[1], state[0], -0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
elif action == 'right':
ax.arrow(state[1], state[0], 0.3, 0, head_width=0.2, head_length=0.1, fc='r', ec='r')
ax.set_title('Optimal Policy (Monte Carlo)')
plt.show()