持续更新常用的强化学习算法,采用单python文件实现,简单易读
- 2024.11.09 更新:PPO(GAE)、SAC。
"PPO"
import copy
import time
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import gymnasium as gym
import matplotlib.pyplot as plt
from tqdm import trange
from torch.distributions import Normal
class Actor(nn.Module):
def __init__(self, state_size, action_size):
super().__init__()
self.fc1 = nn.Linear(state_size, 256)
self.fc2 = nn.Linear(256, 128)
self.mu = nn.Linear(128, action_size)
self.sigma = nn.Linear(128, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
mu = F.tanh(self.mu(x))
sigma = F.softplus(self.sigma(x))
return mu, sigma
class Critic(nn.Module):
def __init__(self, state_size):
super().__init__()
self.fc1 = nn.Linear(state_size, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
def ppo_training(trajectory, actor, critic, actor_optimizer, critic_optimizer,
clip=0.2, k_epochs=10, gamma=0.99, lam=0.95, device='cpu', T=1e-2):
states, actions, log_probs, rewards, next_states, dones = map(
lambda x: torch.from_numpy(np.array(x)).to(device),
zip(*trajectory)
)
rewards = rewards.view(-1, 1)
dones = dones.view(-1, 1).int()
with torch.no_grad():
next_values = critic(next_states.float())
td_target = rewards + gamma * next_values * (1 - dones)
td_value = critic(states.float())
td_delta = td_target - td_value
td_delta = td_delta.detach().cpu().numpy()
adv = 0.0
advantages = []
for delta in td_delta[::-1]:
adv = gamma * lam * adv + delta
advantages.append(adv)
advantages.reverse()
advantages = torch.from_numpy(np.array(advantages)).float().to(device)
advantages = (advantages - advantages.mean()) / advantages.std()
for k in range(k_epochs):
mu, sigma = actor(states.float())
dist = Normal(mu, sigma)
new_log_probs = dist.log_prob(actions)
entropy = dist.entropy()
ratio = torch.exp(new_log_probs - log_probs.detach())
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantages
actor_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * T
critic_loss = F.mse_loss(critic(states.float()), td_target.float().detach())
actor_optimizer.zero_grad()
critic_optimizer.zero_grad()
actor_loss.backward()
actor_optimizer.step()
critic_loss.backward()
critic_optimizer.step()
if __name__ == '__main__':
device = torch.device("cpu")
env = gym.make('Walker2d')
episodes = 1000
train_timesteps = 1024
clip = 0.2
k_epochs = 40
gamma = 0.9
lam = 0.95
T = 1e-2
lr = 1e-4
actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
critic = Critic(env.observation_space.shape[0]).to(device)
actor_optimizer = torch.optim.Adam(actor.parameters(), lr=lr)
critic_optimizer = torch.optim.Adam(critic.parameters(), lr=lr)
trajectory = []
timestep = 0
pbar = trange(1, episodes+1)
score_list = []
for e in pbar:
state, _ = env.reset()
scores = 0.0
while True:
timestep += 1
s = torch.from_numpy(state).float().to(device)
mu, sigma = actor(s)
dist = Normal(mu, sigma)
a = dist.sample()
log_prob = dist.log_prob(a).detach().cpu().numpy()
action = a.detach().cpu().numpy()
next_state, reward, done, _, _ = env.step(action)
scores += reward
trajectory.append([state, action, log_prob, reward, next_state, done])
if timestep % train_timesteps == 0:
ppo_training(
trajectory,
actor,
critic,
actor_optimizer,
critic_optimizer,
clip,
k_epochs,
gamma,
lam,
device,
T
)
trajectory = []
state = copy.deepcopy(next_state)
if done: break
score_list.append(scores)
pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}".format(e, episodes, scores, timestep))
"SAC"
from torch.distributions import Normal
from collections import deque
from tqdm import trange
import torch
import torch.nn as nn
import torch.nn.functional as F
import copy
import time
import random
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
class ActorNetwork(nn.Module):
def __init__(self, state_size, action_size):
super().__init__()
self.fc1 = nn.Linear(state_size, 256)
self.fc2 = nn.Linear(256, 128)
self.mu = nn.Linear(128, action_size)
self.sigma = nn.Linear(128, action_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
mu = self.mu(x)
sigma = F.softplus(self.sigma(x))
return mu, sigma
class QNetwork(nn.Module):
def __init__(self, state_size, action_size):
super().__init__()
self.fc1 = nn.Linear(state_size + action_size, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 1)
def forward(self, s, a):
x = torch.cat((s, a), dim=-1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
class ReplayBuffer:
def __init__(self, capacity):
self.memory = deque(maxlen=capacity)
def __len__(self):
return len(self.memory)
def save_memory(self, state, action, reward, next_state, done):
self.memory.append([state, action, reward, next_state, done])
def sample(self, batch_size):
sample_size = min(len(self), batch_size)
experiences = random.sample(self.memory, sample_size)
return experiences
def soft_update(target, source, tau=0.05):
for param, target_param in zip(source.parameters(), target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
def choice_action(actor, state):
mu, sigma = actor(state)
normal_dist = Normal(torch.zeros_like(mu), torch.ones_like(sigma))
epsilon = normal_dist.sample()
action = torch.tanh(mu + sigma * epsilon)
log_prob = normal_dist.log_prob(epsilon)
log_prob -= torch.log(1 - action.pow(2) + 1e-6)
log_prob = log_prob.sum(-1, keepdim=True)
return action, log_prob
def training(gamma, replay_buffer, models, log_alpha, target_entropy, optimizers, batch_size, tau):
(
actor,
q1_net,
target_q1_net,
q2_net,
target_q2_net
) = models
(
actor_optimizer,
q1_optimizer,
q2_optimizer,
alpha_optimizer
) = optimizers
batch_data = replay_buffer.sample(batch_size)
states, actions, rewards, next_states, dones = map(
lambda x: torch.from_numpy(np.array(x)).float().to(device),
zip(*batch_data)
)
with torch.no_grad():
alpha = torch.exp(log_alpha)
with torch.no_grad():
next_state_actions, next_state_log_probs = choice_action(actor, next_states)
target_q1_next = target_q1_net(next_states, next_state_actions)
target_q2_next = target_q2_net(next_states, next_state_actions)
min_q_next_target = torch.min(target_q1_next, target_q2_next) - alpha * next_state_log_probs
td_target_value = rewards.view(-1, 1) + (1 - dones.view(-1, 1)) * gamma * min_q_next_target
q1 = q1_net(states, actions)
q2 = q2_net(states, actions)
q1_loss = F.mse_loss(q1, td_target_value)
q2_loss = F.mse_loss(q2, td_target_value)
q1_optimizer.zero_grad()
q2_optimizer.zero_grad()
q1_loss.backward()
q2_loss.backward()
q1_optimizer.step()
q2_optimizer.step()
state_actions, state_log_probs = choice_action(actor, states)
q = torch.min(q1_net(states, state_actions), q2_net(states, state_actions))
actor_loss = torch.mean((alpha * state_log_probs) - q)
actor_optimizer.zero_grad()
actor_loss.backward()
actor_optimizer.step()
with torch.no_grad():
_, log_prob = choice_action(actor, states)
alpha_loss = torch.mean(- log_alpha.exp() * (log_prob + target_entropy))
alpha_optimizer.zero_grad()
alpha_loss.backward()
alpha_optimizer.step()
soft_update(target_q1_net, q1_net, tau)
soft_update(target_q2_net, q2_net, tau)
if __name__ == '__main__':
device = torch.device("cpu")
env = gym.make('Walker2d')
episodes = 1000
train_timesteps = 4
policy_lr = 1e-4
q_lr = 1e-4
alpha_lr = 1e-2
tau = 0.05
buffer_capacity = int(1e6)
batch_size = 64
gamma = 0.9
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
target_entropy = - torch.prod(torch.tensor(env.observation_space.shape, device=device))
actor = ActorNetwork(state_size, action_size).to(device)
q1_net = QNetwork(state_size, action_size).to(device)
target_q1_net = QNetwork(state_size, action_size).to(device)
q2_net = QNetwork(state_size, action_size).to(device)
target_q2_net = QNetwork(state_size, action_size).to(device)
target_q1_net.load_state_dict(q1_net.state_dict())
target_q2_net.load_state_dict(q2_net.state_dict())
log_alpha = torch.tensor(0.0, requires_grad=True, device=device)
actor_optimizer = torch.optim.Adam(actor.parameters(), lr=policy_lr)
q1_optimizer = torch.optim.Adam(q1_net.parameters(), lr=q_lr)
q2_optimizer = torch.optim.Adam(q2_net.parameters(), lr=q_lr)
alpha_optimizer = torch.optim.Adam([log_alpha], lr=alpha_lr)
replay_buffer = ReplayBuffer(buffer_capacity)
pbar = trange(1, episodes+1)
timestep = 0
score_list = []
for episode in pbar:
state, _ = env.reset()
scores = 0.0
while True:
timestep += 1
if timestep % train_timesteps == 0:
training(
gamma,
replay_buffer,
(
actor,
q1_net,
target_q1_net,
q2_net,
target_q2_net
),
log_alpha,
target_entropy,
(
actor_optimizer,
q1_optimizer,
q2_optimizer,
alpha_optimizer
),
batch_size,
tau
)
action, _ = choice_action(actor, torch.from_numpy(state).float().to(device))
action = action.detach().cpu().numpy()
next_state, reward, done, _, _ = env.step(action)
scores += reward
replay_buffer.save_memory(state, action, reward, next_state, done)
state = copy.deepcopy(next_state)
if done: break
score_list.append(scores)
pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Log Alpha: {:.2f}".format(
episode, episodes, scores, timestep, log_alpha.item()))