每天学一点强化学习(二)
《动手学强化学习》模仿学习代码的修改
由于版本的不同,模仿学习中的代码需要有一些做出修改,
def test_agent(agent, env, n_episode):
return_list = []
for episode in range(n_episode):
episode_return = 0
state = env.reset()[0] # 修改位置
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _ ,info= env.step(action) #修改的位置
state = next_state
episode_return += reward
return_list.append(episode_return)
return np.mean(return_list)
class BehaviorClone:
def __init__(self, state_dim, hidden_dim, action_dim, lr):
self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
def learn(self, states, actions):
states = torch.tensor(states, dtype=torch.float).to(device)
actions = torch.tensor(actions, dtype=torch.long).view(-1, 1).to(device) # Specify dtype=torch.long 类型需要进行转换
log_probs = torch.log(self.policy(states).gather(1, actions))
bc_loss = torch.mean(-log_probs) # 最大似然估计 损失函数
self.optimizer.zero_grad()
bc_loss.backward()
self.optimizer.step()
修改后的rl_utils代码:
from tqdm import tqdm
import numpy as np
import torch
import collections
import random
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = collections.deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
def size(self):
return len(self.buffer)
def moving_average(a, window_size):
cumulative_sum = np.cumsum(np.insert(a, 0, 0))
middle = (cumulative_sum[window_size:] - cumulative_sum[:-window_size]) / window_size
r = np.arange(1, window_size-1, 2)
begin = np.cumsum(a[:window_size-1])[::2] / r
end = (np.cumsum(a[:-window_size:-1])[::2] / r)[::-1]
return np.concatenate((begin, middle, end))
def train_on_policy_agent(env, agent, num_episodes):
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes/10)):
episode_return = 0
transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}
state = env.reset()
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
transition_dict['states'].append(state)
transition_dict['actions'].append(action)
transition_dict['next_states'].append(next_state)
transition_dict['rewards'].append(reward)
transition_dict['dones'].append(done)
state = next_state
episode_return += reward
return_list.append(episode_return)
agent.update(transition_dict) # 每条轨迹更新一次 更新时存在很多的重复计算epoch次
if (i_episode+1) % 10 == 0:
pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})
pbar.update(1)
return return_list
def train_off_policy_agent(env, agent, num_episodes, replay_buffer, minimal_size, batch_size):
return_list = []
for i in range(10):
with tqdm(total=int(num_episodes/10), desc='Iteration %d' % i) as pbar:
for i_episode in range(int(num_episodes/10)):
episode_return = 0
state = env.reset()
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _ = env.step(action)
replay_buffer.add(state, action, reward, next_state, done)
state = next_state
episode_return += reward
if replay_buffer.size() > minimal_size:
b_s, b_a, b_r, b_ns, b_d = replay_buffer.sample(batch_size)
transition_dict = {'states': b_s, 'actions': b_a, 'next_states': b_ns, 'rewards': b_r, 'dones': b_d}
agent.update(transition_dict)
return_list.append(episode_return)
if (i_episode+1) % 10 == 0:
pbar.set_postfix({'episode': '%d' % (num_episodes/10 * i + i_episode+1), 'return': '%.3f' % np.mean(return_list[-10:])})
pbar.update(1)
return return_list
def compute_advantage(gamma, lmbda, td_delta):
td_delta = td_delta.detach().numpy()
advantage_list = []
advantage = 0.0
for delta in td_delta[::-1]:
advantage = gamma * lmbda * advantage + delta
advantage_list.append(advantage)
advantage_list.reverse()
return torch.tensor(advantage_list, dtype=torch.float)
模仿学习的完整代码,行为克隆方法
import gym
import torch
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import random
import rl_utils
class PolicyNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(PolicyNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x):
x = F.relu(self.fc1(x))
return F.softmax(self.fc2(x), dim=1)
class ValueNet(torch.nn.Module):
def __init__(self, state_dim, hidden_dim):
super(ValueNet, self).__init__()
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
self.fc2 = torch.nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
return self.fc2(x)
class PPO:
''' PPO算法,采用截断方式 '''
def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
lmbda, epochs, eps, gamma, device):
self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.critic = ValueNet(state_dim, hidden_dim).to(device)
self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),
lr=actor_lr)
self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),
lr=critic_lr)
self.gamma = gamma
self.lmbda = lmbda
self.epochs = epochs # 一条序列的数据用于训练轮数
self.eps = eps # PPO中截断范围的参数
self.device = device
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
probs = self.actor(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()
def update(self, transition_dict):
states = torch.tensor(transition_dict['states'],
dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions'],dtype=torch.long).view(-1, 1).to(
self.device)
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'],
dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device)
td_target = rewards + self.gamma * self.critic(next_states) * (1 -
dones)
td_delta = td_target - self.critic(states)
advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,
td_delta.cpu()).to(self.device)
old_log_probs = torch.log(self.actor(states).gather(1,
actions)).detach()
for _ in range(self.epochs):
log_probs = torch.log(self.actor(states).gather(1, actions))
ratio = torch.exp(log_probs - old_log_probs)
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1 - self.eps,
1 + self.eps) * advantage # 截断
actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数
critic_loss = torch.mean(
F.mse_loss(self.critic(states), td_target.detach()))
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
actor_loss.backward()
critic_loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 50
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
"cpu")
env_name = 'CartPole-v0'
env = gym.make(env_name)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
ppo_agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda,
epochs, eps, gamma, device)
return_list = rl_utils.train_on_policy_agent(env, ppo_agent, num_episodes)
def sample_expert_data(n_episode):
states = []
actions = []
for episode in range(n_episode):
state = env.reset()[0]
done = False
while not done:
action = ppo_agent.take_action(state)
states.append(state)
actions.append(action)
next_state, reward, done, _ ,info= env.step(action)
state = next_state
return np.array(states), np.array(actions)
torch.manual_seed(0)
random.seed(0)
n_episode = 1
expert_s, expert_a = sample_expert_data(n_episode)
n_samples = 30 # 采样30个数据
random_index = random.sample(range(expert_s.shape[0]), n_samples)
expert_s = expert_s[random_index]
expert_a = expert_a[random_index]
class BehaviorClone:
def __init__(self, state_dim, hidden_dim, action_dim, lr):
self.policy = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
def learn(self, states, actions):
states = torch.tensor(states, dtype=torch.float).to(device)
actions = torch.tensor(actions, dtype=torch.long).view(-1, 1).to(device) # Specify dtype=torch.long
log_probs = torch.log(self.policy(states).gather(1, actions))
bc_loss = torch.mean(-log_probs) # 最大似然估计 损失函数
self.optimizer.zero_grad()
bc_loss.backward()
self.optimizer.step()
def take_action(self, state):
state = torch.tensor([state], dtype=torch.float).to(device)
probs = self.policy(state)
action_dist = torch.distributions.Categorical(probs)
action = action_dist.sample()
return action.item()
def test_agent(agent, env, n_episode):
return_list = []
for episode in range(n_episode):
episode_return = 0
state = env.reset()[0]
done = False
while not done:
action = agent.take_action(state)
next_state, reward, done, _ ,info= env.step(action)
state = next_state
episode_return += reward
return_list.append(episode_return)
return np.mean(return_list)
torch.manual_seed(0)
np.random.seed(0)
lr = 1e-3
bc_agent = BehaviorClone(state_dim, hidden_dim, action_dim, lr)
n_iterations = 1000
batch_size = 64
test_returns = []
with tqdm(total=n_iterations, desc="进度条") as pbar:
for i in range(n_iterations):
sample_indices = np.random.randint(low=0,
high=expert_s.shape[0],
size=batch_size)
bc_agent.learn(expert_s[sample_indices], expert_a[sample_indices])# action是专家数据
current_return = test_agent(bc_agent, env, 5)
test_returns.append(current_return)
if (i + 1) % 10 == 0:
pbar.set_postfix({'return': '%.3f' % np.mean(test_returns[-10:])})
pbar.update(1)
iteration_list = list(range(len(test_returns)))
plt.plot(iteration_list, test_returns)
plt.xlabel('Iterations')
plt.ylabel('Returns')
plt.title('BC on {}'.format(env_name))
plt.show()
强化学习资料
- 动手学强化学习:代码比较简单易懂
- 强化学习的数学原理:讲述强化学习原理,常看常新
- Spinning Up:适合新手入门,代码可以参考