当前位置: 首页 > article >正文

【pytorch】Mujoco + 常用强化学习算法(持续更新)

持续更新常用的强化学习算法,采用单python文件实现,简单易读

  • 2024.11.09 更新:PPO(GAE)、SAC。
"PPO"
import copy
import time
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

import gymnasium as gym
import matplotlib.pyplot as plt

from tqdm import trange
from torch.distributions import Normal

class Actor(nn.Module):

    def __init__(self, state_size, action_size):
        super().__init__()

        self.fc1 = nn.Linear(state_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.mu = nn.Linear(128, action_size)
        self.sigma = nn.Linear(128, action_size)

    def forward(self, x):

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        mu = F.tanh(self.mu(x))
        sigma = F.softplus(self.sigma(x))

        return mu, sigma

class Critic(nn.Module):

    def __init__(self, state_size):
        super().__init__()

        self.fc1 = nn.Linear(state_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, x):

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))

        return self.fc3(x)

def ppo_training(trajectory, actor, critic, actor_optimizer, critic_optimizer,
                clip=0.2, k_epochs=10, gamma=0.99, lam=0.95, device='cpu', T=1e-2):

    states, actions, log_probs, rewards, next_states, dones = map(
        lambda x: torch.from_numpy(np.array(x)).to(device),
        zip(*trajectory)
    )
    rewards = rewards.view(-1, 1)
    dones = dones.view(-1, 1).int()

    with torch.no_grad():

        next_values = critic(next_states.float())
        td_target = rewards + gamma * next_values * (1 - dones)
        td_value = critic(states.float())
        td_delta = td_target - td_value

    td_delta = td_delta.detach().cpu().numpy()

    adv = 0.0
    advantages = []

    for delta in td_delta[::-1]:
        adv = gamma * lam * adv + delta
        advantages.append(adv)

    advantages.reverse()

    advantages = torch.from_numpy(np.array(advantages)).float().to(device)

    advantages = (advantages - advantages.mean()) / advantages.std()

    for k in range(k_epochs):

        mu, sigma = actor(states.float())
        dist = Normal(mu, sigma)
        new_log_probs = dist.log_prob(actions)

        entropy = dist.entropy()

        ratio = torch.exp(new_log_probs - log_probs.detach())

        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1.0 - clip, 1 + clip) * advantages

        actor_loss = - torch.min(surr1, surr2).mean() - entropy.mean() * T
        critic_loss = F.mse_loss(critic(states.float()), td_target.float().detach())

        actor_optimizer.zero_grad()
        critic_optimizer.zero_grad()

        actor_loss.backward()
        actor_optimizer.step()

        critic_loss.backward()
        critic_optimizer.step()

if __name__ == '__main__':

    device = torch.device("cpu")
    env = gym.make('Walker2d')

    episodes = 1000
    train_timesteps = 1024

    clip = 0.2
    k_epochs = 40
    gamma = 0.9
    lam = 0.95
    T = 1e-2
    lr = 1e-4

    actor = Actor(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
    critic = Critic(env.observation_space.shape[0]).to(device)

    actor_optimizer = torch.optim.Adam(actor.parameters(), lr=lr)
    critic_optimizer = torch.optim.Adam(critic.parameters(), lr=lr)

    trajectory = []
    timestep = 0

    pbar = trange(1, episodes+1)

    score_list = []

    for e in pbar:

        state, _ = env.reset()
        scores = 0.0

        while True:

            timestep += 1

            s = torch.from_numpy(state).float().to(device)

            mu, sigma = actor(s)
            dist = Normal(mu, sigma)
            a = dist.sample()

            log_prob = dist.log_prob(a).detach().cpu().numpy()

            action = a.detach().cpu().numpy()

            next_state, reward, done, _, _ = env.step(action)
            scores += reward

            trajectory.append([state, action, log_prob, reward, next_state, done])

            if timestep % train_timesteps == 0:

                ppo_training(
                    trajectory,
                    actor,
                    critic,
                    actor_optimizer,
                    critic_optimizer,
                    clip,
                    k_epochs,
                    gamma,
                    lam,
                    device,
                    T
                )
                trajectory = []

            state = copy.deepcopy(next_state)

            if done: break

        score_list.append(scores)
        pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}".format(e, episodes, scores, timestep))

"SAC"
from torch.distributions import Normal
from collections import deque
from tqdm import trange

import torch
import torch.nn as nn
import torch.nn.functional as F

import copy
import time
import random
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

class ActorNetwork(nn.Module):

    def __init__(self, state_size, action_size):
        super().__init__()

        self.fc1 = nn.Linear(state_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.mu = nn.Linear(128, action_size)
        self.sigma = nn.Linear(128, action_size)

    def forward(self, x):

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        mu = self.mu(x)
        sigma = F.softplus(self.sigma(x))

        return mu, sigma

class QNetwork(nn.Module):

    def __init__(self, state_size, action_size):
        super().__init__()

        self.fc1 = nn.Linear(state_size + action_size, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, s, a):

        x = torch.cat((s, a), dim=-1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))

        return self.fc3(x)

class ReplayBuffer:

    def __init__(self, capacity):

        self.memory = deque(maxlen=capacity)

    def __len__(self):

        return len(self.memory)

    def save_memory(self, state, action, reward, next_state, done):

        self.memory.append([state, action, reward, next_state, done])

    def sample(self, batch_size):

        sample_size = min(len(self), batch_size)

        experiences = random.sample(self.memory, sample_size)

        return experiences

def soft_update(target, source, tau=0.05):
    for param, target_param in zip(source.parameters(), target.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

def choice_action(actor, state):

    mu, sigma = actor(state)

    normal_dist = Normal(torch.zeros_like(mu), torch.ones_like(sigma))
    epsilon = normal_dist.sample()

    action = torch.tanh(mu + sigma * epsilon)

    log_prob = normal_dist.log_prob(epsilon)
    log_prob -= torch.log(1 - action.pow(2) + 1e-6)
    log_prob = log_prob.sum(-1, keepdim=True)

    return action, log_prob

def training(gamma, replay_buffer, models, log_alpha, target_entropy, optimizers, batch_size, tau):

    (
        actor,
        q1_net,
        target_q1_net,
        q2_net,
        target_q2_net
    ) = models

    (
        actor_optimizer,
        q1_optimizer,
        q2_optimizer,
        alpha_optimizer
    ) = optimizers

    batch_data = replay_buffer.sample(batch_size)

    states, actions, rewards, next_states, dones = map(
        lambda x: torch.from_numpy(np.array(x)).float().to(device),
        zip(*batch_data)
    )

    with torch.no_grad():
        alpha = torch.exp(log_alpha)

    with torch.no_grad():

        next_state_actions, next_state_log_probs = choice_action(actor, next_states)
        target_q1_next = target_q1_net(next_states, next_state_actions)
        target_q2_next = target_q2_net(next_states, next_state_actions)
        min_q_next_target = torch.min(target_q1_next, target_q2_next) - alpha * next_state_log_probs
        td_target_value = rewards.view(-1, 1) + (1 - dones.view(-1, 1)) * gamma * min_q_next_target

    q1 = q1_net(states, actions)
    q2 = q2_net(states, actions)

    q1_loss = F.mse_loss(q1, td_target_value)
    q2_loss = F.mse_loss(q2, td_target_value)

    q1_optimizer.zero_grad()
    q2_optimizer.zero_grad()

    q1_loss.backward()
    q2_loss.backward()

    q1_optimizer.step()
    q2_optimizer.step()

    state_actions, state_log_probs = choice_action(actor, states)

    q = torch.min(q1_net(states, state_actions), q2_net(states, state_actions))

    actor_loss = torch.mean((alpha * state_log_probs) - q)
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()

    with torch.no_grad():
        _, log_prob = choice_action(actor, states)

    alpha_loss = torch.mean(- log_alpha.exp() * (log_prob + target_entropy))
    alpha_optimizer.zero_grad()
    alpha_loss.backward()
    alpha_optimizer.step()

    soft_update(target_q1_net, q1_net, tau)
    soft_update(target_q2_net, q2_net, tau)


if __name__ == '__main__':

    device = torch.device("cpu")
    env = gym.make('Walker2d')

    episodes = 1000
    train_timesteps = 4

    policy_lr = 1e-4
    q_lr = 1e-4
    alpha_lr = 1e-2

    tau = 0.05

    buffer_capacity = int(1e6)
    batch_size = 64
    gamma = 0.9

    state_size = env.observation_space.shape[0]
    action_size = env.action_space.shape[0]

    target_entropy = - torch.prod(torch.tensor(env.observation_space.shape, device=device))

    actor = ActorNetwork(state_size, action_size).to(device)
    q1_net = QNetwork(state_size, action_size).to(device)
    target_q1_net = QNetwork(state_size, action_size).to(device)
    q2_net = QNetwork(state_size, action_size).to(device)
    target_q2_net = QNetwork(state_size, action_size).to(device)

    target_q1_net.load_state_dict(q1_net.state_dict())
    target_q2_net.load_state_dict(q2_net.state_dict())

    log_alpha = torch.tensor(0.0, requires_grad=True, device=device)

    actor_optimizer = torch.optim.Adam(actor.parameters(), lr=policy_lr)
    q1_optimizer = torch.optim.Adam(q1_net.parameters(), lr=q_lr)
    q2_optimizer = torch.optim.Adam(q2_net.parameters(), lr=q_lr)
    alpha_optimizer = torch.optim.Adam([log_alpha], lr=alpha_lr)

    replay_buffer = ReplayBuffer(buffer_capacity)

    pbar = trange(1, episodes+1)
    timestep = 0
    score_list = []

    for episode in pbar:

        state, _ = env.reset()
        scores = 0.0

        while True:

            timestep += 1

            if timestep % train_timesteps == 0:
                training(
                    gamma,
                    replay_buffer,
                    (
                        actor,
                        q1_net,
                        target_q1_net,
                        q2_net,
                        target_q2_net
                    ),
                    log_alpha,
                    target_entropy,
                    (
                        actor_optimizer,
                        q1_optimizer,
                        q2_optimizer,
                        alpha_optimizer
                    ),
                    batch_size,
                    tau
                )

            action, _ = choice_action(actor, torch.from_numpy(state).float().to(device))
            action = action.detach().cpu().numpy()

            next_state, reward, done, _, _ = env.step(action)
            scores += reward

            replay_buffer.save_memory(state, action, reward, next_state, done)

            state = copy.deepcopy(next_state)

            if done: break

        score_list.append(scores)
        pbar.set_description("Episode {}/{}: Score: {:.2f}, Timesteps: {}, Log Alpha: {:.2f}".format(
            episode, episodes, scores, timestep, log_alpha.item()))


http://www.kler.cn/a/385905.html

相关文章:

  • SpringMVC数据校验、数据格式化处理、国际化设置
  • python面向对象基础入门
  • TypeORM在Node.js中的高级应用
  • SobarQube实现PDF报告导出
  • Android 6年经验面试总结 2024.11.15
  • vue2/vue3中使用的富文本编辑器vue-quill
  • npm安装记录
  • 羽星股份引领连锁业数智化转型,厦门羽星科技公司逆势增长剑指纳斯达克
  • HCIP-HarmonyOS Application Developer 习题(二十)
  • Java实现营业执照OCR识别API接口
  • Spring Mvc中拦截器Interceptor详解
  • Dify 本地部署指南
  • 解决方案 | 部署更快,自动化程度高!TOSUN同星线控底盘解决方案
  • spring boot 项目配置https服务
  • Terraform-阿里云- ECS实验
  • 微服务设计模式 - 事件溯源模式(Event Sourcing Pattern)
  • [Redis] Redis哨兵机制
  • python安装selenium,geckodriver,chromedriver,Selenium IDE
  • 学习笔记:黑马程序员JavaWeb开发教程(2024.11.8)
  • flink 内存配置(一):设置Flink进程内存
  • 设计模式小结一观察者(Observer)模式
  • 如何在微服务架构中优化微信 Access Token 管理:解决频率限制与过期问题的最佳实践
  • 哈夫曼树(HuffmanTree)
  • c++:模板和STL
  • 自动驾驶---“火热的”时空联合规划
  • Unity3D 包体裁剪与优化详解