当前位置: 首页 > article >正文

25/1/7 算法笔记<强化学习> sac_learn代码拆解

昨天我们看了V-REP中一个github项目的环境代码,今天我们来分析下他的强化学习代码。

git链接:

https://github.com/deep-reinforcement-learning-book/Chapter16-Robot-Learning-in-Simulation.

首先导入了库

import math
import random

import gym
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal   #正态分布概率函数

设置多进程启动方式

torch.multiprocessing.set_start_method('forkserver', force=True)

在使用 torTorch.multiprocessing 模块时,启动子进程的方式非常重要,尤其是在使用 CUDA 时。默认的启动方式('fork')可能会导致问题,因为 CUDA 对进程分叉(fork)的处理方式不太友好。

from IPython.display import clear_output  #Ipython显示功能
import matplotlib.pyplot as plt       
from matplotlib import animation
from IPython.display import display   

from sawyer_grasp_env_boundingbox import GraspEnv  #导入昨天的自定义环境
import argparse            #用于解析命令行
import time                #用于时间相关操作
import pickle              #用于序列化和反序列化

import torch.multiprocessing as mp    #多进程处理
from torch.multiprocessing import Process#多进程工具

from multiprocessing import Process, Manager#多进程共享工具
from multiprocessing.managers import BaseManager#创建自定义的共享对象管理器

根据条件选择GPU还是CPU,并打印出当前设备

GPU = True
device_idx = 0
if GPU:
    device = torch.device("cuda:" + str(device_idx) if torch.cuda.is_available() else "cpu")
else:
    device = torch.device("cpu")
print(device)

使用python的argparse模块解析命令行,让用户可以通过命令行指定是进行训练(--train)还是测试(--test)。

parser = argparse.ArgumentParser(description='Train or test neural net motor controller.')
parser.add_argument('--train', dest='train', action='store_true', default=False)
parser.add_argument('--test', dest='test', action='store_true', default=False)  #添加命令行参数

args = parser.parse_args() #解析命令行参数

设置经验回放缓冲区

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity  #设置缓冲区最大存储量
        self.buffer = []    #开辟空间存储
        self.position = 0
    
    def push(self, state, action, reward, next_state, done): #存放动作
        if len(self.buffer) < self.capacity:    
            self.buffer.append(None)  #这种做法通常用于初始化缓冲区,或者在某些特殊情况下确保缓冲区的长度与容量一致。
        self.buffer[self.position] = (state, action, reward, next_state, done)  #存放状态,动作,奖励,下一个状态,动作有没有结束
        self.position = int((self.position + 1) % self.capacity)   #更新当前位置写入的索引,并确保索引在缓冲区容量范围内循环

sample函数,用于从缓冲区随机抽取一批经验数据,供训练模型使用

def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)   #batch是一个列表,表示一条经验数据
        state, action, reward, next_state, done = map(np.stack, zip(*batch)) # stack for each element
        ''' 
        the * serves as unpack: sum(a,b) <=> batch=(a,b), sum(*batch) ;
        zip: a=[1,2], b=[2,3], zip(a,b) => [(1, 2), (2, 3)] ;
        the map serves as mapping the function on each list element: map(square, [2,3]) => [4,9] ;
        np.stack((1,2)) => array([1, 2])
        '''
        return state, action, reward, next_state, done

设置len和get_length函数

def __len__(self):  # cannot work in multiprocessing case, len(replay_buffer) is not available in proxy of manager!
        return len(self.buffer)

    def get_length(self):
        return len(self.buffer)

标准化函数

class NormalizedActions(gym.ActionWrapper):
    def _action(self, action):
        low  = self.action_space.low
        high = self.action_space.high
        
        action = low + (action + 1.0) * 0.5 * (high - low)
        action = np.clip(action, low, high)
        
        return action

    def _reverse_action(self, action):
        low  = self.action_space.low
        high = self.action_space.high
        
        action = 2 * (action - low) / (high - low) - 1
        action = np.clip(action, low, high)
        
        return action

_action函数将标准化动作映射回原始动作空间

_reverse_action将原始动作映射到标准化动作空间

 为什么这样设计?

  1. 线性映射

    • 线性映射保持了动作的相对比例,不会改变动作的分布特性。

    • 这对于强化学习算法的训练非常重要,因为非线性映射可能会引入额外的复杂性。

  2. 边界对齐

    • 确保原始动作的最小值和最大值分别映射到标准化动作的 -1 和 1

    • 这样可以充分利用标准化动作空间的范围。

  3. 可逆性

    • 标准化和反标准化公式是互逆的,可以方便地在两个空间之间转换。

    • 这对于算法的实现和调试非常有用。

  4. 数值稳定性

    • 标准化后的动作范围是 [-1, 1],避免了过大或过小的数值,提高了数值稳定性。

价值网络设置

表示在状态s下智能体未来可能获得的累计回报的期望值

class ValueNetwork(nn.Module):
    def __init__(self, state_dim, hidden_dim, init_w=3e-3):
        super(ValueNetwork, self).__init__()
        
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, hidden_dim)
        self.linear4 = nn.Linear(hidden_dim, 1)
        # 对权重和偏置初始化,用均匀分布将权重和偏置初始化为范围内的随机值
        self.linear4.weight.data.uniform_(-init_w, init_w)
        self.linear4.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = self.linear4(x)
        return x

为什么需要权重初始化?

权重初始化对神经网络的训练非常重要,原因如下:

  1. 避免梯度消失或爆炸

    • 如果权重初始化过大或过小,可能导致梯度在反向传播时消失或爆炸。

    • 合适的初始化可以缓解这一问题。

  2. 加速收敛

    • 良好的初始化可以使网络更快地收敛到最优解。

  3. 打破对称性

    • 如果所有权重初始化为相同的值,网络中的神经元会学习相同的特征,导致性能下降。

    • 随机初始化可以打破对称性,使每个神经元学习不同的特征。

softQ网络

它的核心思想是在传统的Q-learning思想上引入熵正则化,从而鼓励策略的探索性。

class SoftQNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, init_w=3e-3):
        super(SoftQNetwork, self).__init__()
        
        self.linear1 = nn.Linear(num_inputs + num_actions, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, hidden_size)
        self.linear4 = nn.Linear(hidden_size, 1)
        
        self.linear4.weight.data.uniform_(-init_w, init_w)
        self.linear4.bias.data.uniform_(-init_w, init_w)
        
    def forward(self, state, action):
        x = torch.cat([state, action], 1) # the dim 0 is number of samples
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = self.linear4(x)
        return x
        

输入的是状态s和动作a的拼接。

策略网络

策略网络的目标是直接学习策略,即在给定状态s下,智能体应该采取的动作a,策略可以是确定性的或随机性的

class PolicyNetwork(nn.Module):
    def __init__(self, num_inputs, num_actions, hidden_size, action_range=1., init_w=3e-3, log_std_min=-20, log_std_max=2):
        super(PolicyNetwork, self).__init__()
        
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        
        self.linear1 = nn.Linear(num_inputs, hidden_size)
        self.linear2 = nn.Linear(hidden_size, hidden_size)
        self.linear3 = nn.Linear(hidden_size, hidden_size)
        self.linear4 = nn.Linear(hidden_size, hidden_size)

        self.mean_linear = nn.Linear(hidden_size, num_actions)
        self.mean_linear.weight.data.uniform_(-init_w, init_w)
        self.mean_linear.bias.data.uniform_(-init_w, init_w)
        
        self.log_std_linear = nn.Linear(hidden_size, num_actions)
        self.log_std_linear.weight.data.uniform_(-init_w, init_w)
        self.log_std_linear.bias.data.uniform_(-init_w, init_w)

        self.action_range = action_range
        self.num_actions = num_actions

        
    def forward(self, state):
        x = F.relu(self.linear1(state))
        x = F.relu(self.linear2(x))
        x = F.relu(self.linear3(x))
        x = F.relu(self.linear4(x))

        mean    = (self.mean_linear(x))
        log_std = self.log_std_linear(x)
        log_std = torch.clamp(log_std, self.log_std_min, self.log_std_max)
        
        return mean, log_std
    

evaluate函数

用来评估策略网络在给定状态下的动作和对数概率。

def evaluate(self, state, epsilon=1e-6):
        '''
        generate sampled action with state as input wrt the policy network;
        '''
        mean, log_std = self.forward(state)
        std = log_std.exp() # no clip in evaluation, clip affects gradients flow
        
        normal = Normal(0, 1)
        z      = normal.sample() 
        action_0 = torch.tanh(mean + std*z.to(device)) # TanhNormal distribution as actions; reparameterization trick
        action = self.action_range*action_0
        log_prob = Normal(mean, std).log_prob(mean+ std*z.to(device)) - torch.log(1. - action_0.pow(2) + epsilon) -  np.log(self.action_range)
        # both dims of normal.log_prob and -log(1-a**2) are (N,dim_of_action); 
        # the Normal.log_prob outputs the same dim of input features instead of 1 dim probability, 
        # needs sum up across the features dim to get 1 dim prob; or else use Multivariate Normal.
        log_prob = log_prob.sum(dim=1, keepdim=True)
        return action, log_prob, z, mean, log_std

选择动作函数

def get_action(self, state, deterministic):
    #将输入的状态s转为pytorch张量
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
    #调用策略网络的前向传播方法,输出动作均值和对数标准差
        mean, log_std = self.forward(state)
        std = log_std.exp() #将对数标准差转化为标准差
         
        normal = Normal(0, 1)  #创建一个标准正态分布
        z      = normal.sample().to(device)#从正态分布中采样一个随即值
        action = self.action_range* torch.tanh(mean + std*z) #计算采样动作,缩放动作
        
        action = self.action_range* torch.tanh(mean).detach().cpu().numpy()[0] if deterministic else action.detach().cpu().numpy()[0]
        return action

随机采取动作函数

从均匀分布中随机采样一个动作

def sample_action(self,):
        a=torch.FloatTensor(self.num_actions).uniform_(-1, 1)
        return self.action_range*a.numpy()

SAC训练函数

核心函数,用SAC方法来训练智能体

def __init__(self, replay_buffer, hidden_dim, action_range):
        self.replay_buffer = replay_buffer #经验回放缓冲区
        #softq网络
        self.soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        self.soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        #目标Q网络
        self.target_soft_q_net1 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        self.target_soft_q_net2 = SoftQNetwork(state_dim, action_dim, hidden_dim).to(device)
        #策略网络
        self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim, action_range).to(device)
        #熵系数,用于自动调整熵正则化的强度
        self.log_alpha = torch.zeros(1, dtype=torch.float32, requires_grad=True, device=device)
        print('Soft Q Network (1,2): ', self.soft_q_net1)
        print('Policy Network: ', self.policy_net)
        
        #初始化目标网络的参数
        for target_param, param in zip(self.target_soft_q_net1.parameters(), self.soft_q_net1.parameters()):
            target_param.data.copy_(param.data)
        for target_param, param in zip(self.target_soft_q_net2.parameters(), self.soft_q_net2.parameters()):
            target_param.data.copy_(param.data)
        
        #损失函数
        self.soft_q_criterion1 = nn.MSELoss()
        self.soft_q_criterion2 = nn.MSELoss()

        #学习率
        soft_q_lr = 3e-4
        policy_lr = 3e-4
        alpha_lr  = 3e-4
        #优化器
        self.soft_q_optimizer1 = optim.Adam(self.soft_q_net1.parameters(), lr=soft_q_lr)
        self.soft_q_optimizer2 = optim.Adam(self.soft_q_net2.parameters(), lr=soft_q_lr)
        self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=policy_lr)
        self.alpha_optimizer = optim.Adam([self.log_alpha], lr=alpha_lr)

更新函数

def update(self, batch_size, reward_scale=10., auto_entropy=True, use_demons=False, target_entropy=-2, gamma=0.99,soft_tau=1e-2):
        #获取状态动作奖励,下一个动作,是否完成
        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
        
        if use_demons==True:   #从文件中加载预先收集的演示数据。,并于当前数据合并
            data_file=open('./demons_data/demon_data.pickle', "rb")
            demons_data = pickle.load(data_file)
            state_, action_, reward_, next_state_, done_=map(np.stack, zip(*demons_data))
            state = np.concatenate((state, state_), axis=0)
            action = np.concatenate((action, action_), axis=0)
            reward = np.concatenate((reward, reward_), axis=0)
            next_state = np.concatenate((next_state, next_state_), axis=0)
            done = np.concatenate((done, done_), axis=0)

        #将数据转换为PyTorch张量并移动到设备
        state      = torch.FloatTensor(state).to(device)
        next_state = torch.FloatTensor(next_state).to(device)
        action     = torch.FloatTensor(action).to(device)
        reward     = torch.FloatTensor(reward).unsqueeze(1).to(device)  # reward is single value, unsqueeze() to add one dim to be [reward] at the sample dim;
        done       = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(device)

        #计算Q网络的预测值
        predicted_q_value1 = self.soft_q_net1(state, action)
        predicted_q_value2 = self.soft_q_net2(state, action)
        
        #评估策略网络
        new_action, log_prob, z, mean, log_std = self.policy_net.evaluate(state)
        new_next_action, next_log_prob, _, _, _ = self.policy_net.evaluate(next_state)
        #奖励归一化
        reward = reward_scale * (reward - reward.mean(dim=0)) / (reward.std(dim=0) + 1e-6) # normalize with batch mean and std; plus a small number to prevent numerical problem
    # 更新熵系数
        # alpha = 0.0  # trade-off between exploration (max entropy) and exploitation (max Q) 
        if auto_entropy is True:
            alpha_loss = -(self.log_alpha * (log_prob + target_entropy).detach()).mean()
            self.alpha_optimizer.zero_grad()
            alpha_loss.backward()
            self.alpha_optimizer.step()
            self.alpha = self.log_alpha.exp()
        else:
            self.alpha = 1.
            alpha_loss = 0
  • 演示数据通常由专家策略生成,用于引导智能体的学习。

  • 合并数据

    • 将演示数据与当前的经验数据合并,形成更大的数据集。

    • 合并后的数据可以用于训练智能体,从而提高学习效率。

熵系数alpha的作用:

alpha用于控制探索和利用之间的权衡。具体来说alpha时熵正则化项的系数,它决定了策略的随机性在优化目标中的重要性。

SAC的目标函数中引入了熵正则化项:

H(π(⋅∣st​)) 是策略 π 在状态 st​ 下的熵。

alpha是熵正则化系数。

  • 当 α较大时,算法更倾向于探索(高熵策略)。

  • 当 α较小时,算法更倾向于利用(低熵策略)。

Q函数的训练

通过训练Q函数,智能体可以学习到哪些动作在特定状态下更有价值。

# Training Q Function
        #计算目标q值
        #选择两个目标Q值中较小值,以减少过估计问题
        target_q_min = torch.min(self.target_soft_q_net1(next_state, new_next_action),self.target_soft_q_net2(next_state, new_next_action)) - self.alpha * next_log_prob
        target_q_value = reward + (1 - done) * gamma * target_q_min # if done==1, only reward
        #计算Q网络损失,目标 Q 值,使用 .detach() 断开计算图,避免梯度传播到目标网络。
        q_value_loss1 = self.soft_q_criterion1(predicted_q_value1, target_q_value.detach())  # detach: no gradients for the variable
        q_value_loss2 = self.soft_q_criterion2(predicted_q_value2, target_q_value.detach())


        #参数更新,通过反向传播算法,计算损失函数对模型参数的梯度。
        self.soft_q_optimizer1.zero_grad()#清空优化器的梯度缓存
        q_value_loss1.backward()      #计算梯度
        self.soft_q_optimizer1.step() #更新参数
        self.soft_q_optimizer2.zero_grad()
        q_value_loss2.backward()
        self.soft_q_optimizer2.step() 

目标q值:它表示在当前状态 ss 下采取动作 aa 后,智能体未来可能获得的累积回报的期望值。

训练策略网络

 # Training Policy Function
        #选择Q值中小的那个,以减少过估计
        predicted_new_q_value = torch.min(self.soft_q_net1(state, new_action),self.soft_q_net2(state, new_action))
        policy_loss = (self.alpha * log_prob - predicted_new_q_value).mean()

        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        self.policy_optimizer.step()

软更新价值网络

# Soft update the target value net
        for target_param, param in zip(self.target_soft_q_net1.parameters(), self.soft_q_net1.parameters()):
            target_param.data.copy_(  # copy data value into target parameters
                target_param.data * (1.0 - soft_tau) + param.data * soft_tau
            )
        for target_param, param in zip(self.target_soft_q_net2.parameters(), self.soft_q_net2.parameters()):
            target_param.data.copy_(  # copy data value into target parameters
                #核心公式
                target_param.data * (1.0 - soft_tau) + param.data * soft_tau
            )
        return predicted_new_q_value.mean()

SAC中使用两个独立的Q网络来减少过估计问题,并为每个Q网络维护一个对应的目标网

  • 两个 Q 网络可以互相监督,避免单个 Q 网络的偏差影响整体训练。

保存,加载模型函数

def save_model(self, path):
        torch.save(self.soft_q_net1.state_dict(), path+'_q1')  # have to specify different path name here!
        torch.save(self.soft_q_net2.state_dict(), path+'_q2')
        torch.save(self.policy_net.state_dict(), path+'_policy')

    def load_model(self, path):
        #将加载的参数加载到模型中
        self.soft_q_net1.load_state_dict(torch.load(path+'_q1'))
        self.soft_q_net2.load_state_dict(torch.load(path+'_q2'))
        self.policy_net.load_state_dict(torch.load(path+'_policy'))

        self.soft_q_net1.eval()
        self.soft_q_net2.eval()
        self.policy_net.eval()#将模型设置为评估模式,用于推理或测试

用于采样数据和训练模型的worker函数

通过多进程与其他worker并行执行

def worker(id, sac_trainer, rewards_queue, replay_buffer, max_episodes, max_steps, batch_size, explore_steps, \
            update_itr, AUTO_ENTROPY, DETERMINISTIC, USE_DEMONS, hidden_dim, model_path, headless):

    print(sac_trainer, replay_buffer)  # sac_tainer are not the same, but all networks and optimizers in it are the same; replay  buffer is the same one.
    env = GraspEnv(headless=headless)  #自定义的环境类,用于模拟抓取任务

    action_dim = env.action_space.shape[0]
    state_dim  = env.observation_space.shape[0]

    frame_idx=0

    for eps in range(max_episodes):
        
        episode_reward = 0
        state =  env.reset()
    #每间隔20个episode重新初始化环境,避免环境中的问题
        if eps%20==0 and eps>0:
            env.reinit()   
        
        for step in range(max_steps):
            #如果当前部署超过探索步数,则使用策略网络生成动作
            if frame_idx > explore_steps:
                action = sac_trainer.policy_net.get_action(state, deterministic = DETERMINISTIC)
            else:
                action = sac_trainer.policy_net.sample_action()
    
            try:
                next_state, reward, done, _ = env.step(action) 
            except KeyboardInterrupt:
                print('Finished')
                sac_trainer.save_model(model_path)
            #将经验数据存入回放缓冲区
            replay_buffer.push(state, action, reward, next_state, done)
            
            #更新当前状态
            state = next_state
            episode_reward += reward
            frame_idx += 1 #更新总步数
              
            # 如果缓冲区的数据量大小大于批量大小,则开始更新模型
            if replay_buffer.get_length() > batch_size:
                for i in range(update_itr):
                    #更新SAC算法的参数
                    _=sac_trainer.update(batch_size, reward_scale=10., auto_entropy=AUTO_ENTROPY, use_demons=USE_DEMONS, target_entropy=-1.*action_dim)
            #每间隔10个episode保存一次模型
            if eps % 10 == 0 and eps>0:
                sac_trainer.save_model(model_path)
            
            if done:
                break
        print('Episode: ', eps, '| Episode Reward: ', episode_reward)
        rewards_queue.put(episode_reward)

    sac_trainer.save_model(model_path)
    env.shutdown()
            

多进程环境中共享 Adam 优化器的参数函数

在多进程训练中,多个进程可能同时访问和更新优化器的状态(如动量项和平方梯度项)。为了确保这些状态在进程之间保持一致,需要将它们共享到内存中

def ShareParameters(adamoptim):
    ''' share parameters of Adamoptimizers for multiprocessing '''
    for group in adamoptim.param_groups:
        for p in group['params']:
            state = adamoptim.state[p] #获取优化器的状态
            # 优化器的步数
            state['step'] = 0
            state['exp_avg'] = torch.zeros_like(p.data) #动量项
            state['exp_avg_sq'] = torch.zeros_like(p.data)#票房梯度项

            # 将张量共享到内存中,以便多进程可以访问和修改同一份数据。
            state['exp_avg'].share_memory_()
            state['exp_avg_sq'].share_memory_()

绘制曲线并保存为图像的函数

def plot(rewards):
    clear_output(True)
    # plt.figure(figsize=(20,5))
    plt.plot(rewards)
    plt.savefig('sac_multi.png')
    # plt.show()
    plt.clf()

主函数

if __name__ == '__main__':

    replay_buffer_size = 1e6  #设置回访缓冲区大小
    BaseManager.register('ReplayBuffer', ReplayBuffer)#注册回放缓冲区类,使其可以通过管理器创建共享对象。
    manager = BaseManager()   #创建管理器对象,用于管理共享对象
    manager.start()           #启动管理器,使其可以创建和共享对象
    replay_buffer = manager.ReplayBuffer(replay_buffer_size)  #创建共享的回放缓冲区

定义超参数

 # hyper-parameters for RL training, no need for sharing across processes
    max_episodes  = 500000
    max_steps   = 30 
    explore_steps = 0  
    batch_size=128
    update_itr = 1    #每次采样后更新模型的迭代次数
    AUTO_ENTROPY=True #是否自动调整熵系数
    DETERMINISTIC=False #是否使用确定性策略
    USE_DEMONS = False  #是否使用演示数据
    hidden_dim = 512   #神经网络的隐藏层
    model_path = './model/sac_multi'
    num_workers=6  
    headless = True    #是否以无头模式运行环境

创建实例用于训练SAC算法

    sac_trainer=SAC_Trainer(replay_buffer, hidden_dim=hidden_dim, action_range=action_range )
if args.train:
        #sac_trainer.load_model(model_path)  # 选择加载预训练模型
            
        #共享全局参数
        sac_trainer.soft_q_net1.share_memory()
        sac_trainer.soft_q_net2.share_memory()
        sac_trainer.target_soft_q_net1.share_memory()
        sac_trainer.target_soft_q_net2.share_memory()
        sac_trainer.policy_net.share_memory()
        sac_trainer.log_alpha.share_memory_()
        #共享优化器状态
        ShareParameters(sac_trainer.soft_q_optimizer1)
        ShareParameters(sac_trainer.soft_q_optimizer2)
        ShareParameters(sac_trainer.policy_optimizer)
        ShareParameters(sac_trainer.alpha_optimizer)
        
        rewards_queue=mp.Queue()  # 多进程队列,用于存储每个进程的奖励值,主进程可以从队列中获取奖励值并绘制训练曲线
        processes=[]
        rewards=[]

        for i in range(num_workers):
            process = Process(target=worker, args=(i, sac_trainer, rewards_queue, replay_buffer, max_episodes, max_steps, \
            batch_size, explore_steps, update_itr, AUTO_ENTROPY, DETERMINISTIC, USE_DEMONS, hidden_dim, model_path, headless))  
            process.daemon=True   #将进程设置为守护进程,当主进程结束时,所有子进程也会自动结束
            processes.append(process) #将进程对象添加到列表中,方便后续管理

        [p.start() for p in processes #启动所有工作进程
        while True:    #收集奖励
            r = rewards_queue.get()
            if r is not None:
                rewards.append(r)
            else:
                break

            if len(rewards)%20==0 and len(rewards)>0:
                # plot(rewards)
                np.save('reward_log', rewards)


        [p.join() for p in processes]  #等待所有进程结束
        sac_trainer.save_model(model_path)

测试模式

if args.test:
        env = GraspEnv(headless=False, control_mode='joint_velocity') # for visualizing in test
        #加载预训练模型
        trained_model_path1 = './model/trained_model/augmented_dense_reward/sac_multi'  # pre-trained model with augmented dense reward
        trained_model_path2 = './model/trained_model/dense_reward/sac_multi'  # pre-trained model with dense reward
        sac_trainer.load_model(model_path)  # new model after training
        
        for eps in range(30):
            state =  env.reset()
            episode_reward = 0

            for step in range(max_steps):
            #使用策略网络生成动作
                action = sac_trainer.policy_net.get_action(state, deterministic = DETERMINISTIC)
                next_state, reward, done, _ = env.step(action)  
                episode_reward += reward
                state=next_state

            print('Episode: ', eps, '| Episode Reward: ', episode_reward)

        env.shutdown()

以上就是今天的SAC代码分析

SAC的核心关键我认为是

1.引入了熵奖励

2.使用两种独立的神经网络来分别表示A和C,并且这个C(价值网络)由两个Q函数组成,减少了估计偏差

3.有离线策略,可以从经验回放池中采样训练数据,使得学习更稳定。

4.熵系数能够动态调整。


http://www.kler.cn/a/471436.html

相关文章:

  • HarmonyOS开发:传参方式
  • nginx-灰度发布策略(基于cookie)
  • 基于RK3568/RK3588大车360度环视影像主动安全行车辅助系统解决方案,支持ADAS/DMS
  • w~自动驾驶~合集16
  • 概率基本概念 --- 离散型随机变量实例
  • UDP -- 简易聊天室
  • 云安全博客阅读(二)
  • PHP语言的数据库编程
  • 调整Python+Pytest+Allure+Yaml+Pymysql框架中需要执行的用例顺序
  • 自定义字典转换器用于easyExcel 导入导出
  • npm-npm install时rollbackFailedOptional: verb npm-session ce210dc17dd264aa报错
  • 13个热门.Net开源项目
  • 安卓悬浮可跳动自动吸附可设置不同的吸附距离view
  • 【机器学习】机器学习的基本分类-自监督学习-自回归方法(Autoregressive Methods)
  • 计算机网络——数据链路层-流量控制和可靠传输
  • Docker - 6.设置SSH自动启动并使用root登录
  • 【工业场景】用YOLOv8实现工业配电柜开关状态识别
  • 鸿蒙ArkUI实现部门树列表
  • 入门嵌入式(四)——IICOLED
  • 用JAVA 源码角度看 客户端请求服务器流程中地址是域名情况下解析域名流程
  • CSS语言的文件操作
  • excel如何将小数转换为百分比
  • lec1-计算机概述
  • 深度学习:探索人工智能的未来
  • 深入解析 Python 中的函数也是对象及其内存分析
  • springboot+vue使用easyExcel实现导出功能