Pytorch реализует алгоритм DDPG

искусственный интеллект обучение с подкреплением

Мало знаний, большой вызов! Эта статья участвует в "Необходимые знания для программистов«Творческая деятельность.

предисловие

DDPG — классический алгоритм обучения с подкреплением. Я подробно описал принцип работы алгоритма в предыдущей статье:Введение в обучение с подкреплением 8 - Глубокое понимание DDPG).

В туториале по обучению Мо Фаня Бог Мо Фань использовал DDPG, реализованный с помощью tensorflow. Потому что pytorch обычно используется больше, а версия tensorflow, используемая богами в то время, тоже низкая, поэтому я использую pytorch для воспроизведения DDPG.

Примечание: эта статья относится к коду tf-версии Мо Фаня и указывает путь ?modelfriends.com/tutorials/no…

Обзор DDPG

在这里插入图片描述

DDPG расшифровывается как Глубокий детерминированный градиент политики, алгоритм детерминированного градиента политики. Его структура основана на принципе «актор-критик» в сочетании с идеей алгоритма DQN, поэтому он может решать не только проблемы с дискретным действием, но и проблемы с непрерывным действием.

выполнить

Нечего сказать, сразу к коду

Во-первых, определить две сети: Актер и Критик.

В сочетании с приведенной выше диаграммойActorВход — это текущее состояние, а выход — детерминированное действие.

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, action_bound):
        super(Actor,self).__init__()
        self.action_bound = torch.FloatTensor(action_bound)
        # layer
        self.layer_1 = nn.Linear(state_dim, 30)
        nn.init.normal_(self.layer_1.weight, 0., 0.3) # 初始化权重
        nn.init.constant_(self.layer_1.bias, 0.1)
        
				self.output = nn.Linear(30, action_dim)
        self.output.weight.data.normal_(0.,0.3) # 初始化权重
        self.output.bias.data.fill_(0.1)

    def forward(self, s):
        a = torch.relu(self.layer_1(s))
        a = torch.tanh(self.output(a))
        # 对action进行放缩,实际上a in [-1,1]
        scaled_a = a * self.action_bound
        return scaled_a

В выходных данных действия добавьтеaction boundВо, всего лишь для ограничения дальности действия.

CriticВ дополнение к текущему состоянию сеть вводит действие, выводимое Актером, а затем выводит Q-значение, то естьQ(s,a)

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic,self).__init__()
        n_layer = 30
        # layer
        self.layer_1 = nn.Linear(state_dim, n_layer)
        nn.init.normal_(self.layer_1.weight, 0., 0.1)
        nn.init.constant_(self.layer_1.bias, 0.1)
        
        self.layer_2 = nn.Linear(action_dim, n_layer)
        nn.init.normal_(self.layer_2.weight, 0., 0.1)
        nn.init.constant_(self.layer_2.bias, 0.1)
        
        self.output = nn.Linear(n_layer, 1)

    def forward(self,s,a):
        s = self.layer_1(s)
        a = self.layer_2(a)
        q_val = self.output(torch.relu(s+a))
        return q_val

Определение сетевой структуры относительно простое. НижеКак DDPG учатсяЭта ключевая часть.

Мы знаем, что DDPG имеет банк памяти, и данные для каждого обучения случайным образом выбираются из банка памяти. Эта часть реализации относительно проста.

Следует отметить, что в процессе обучения ошибки двух сетей различны. Поскольку DDPG опирается на идею DQN, в дополнение к двум сетям Актера и Критика он также определяет свою собственную целевую сеть. Роль целевой сети состоит в том, чтобы вычислить значение Q в следующий момент, чтобы облегчить обновление TD-ошибки. Определение TD-ошибки:

td_error=MSE(Q(s,a),r+γQtarget(s',a'))td\_error=MSE(Q(s,a), r+\gamma Q_{target}(s',a'))

Критическая сеть обновляется в соответствии с TD-ошибкой.

# 训练critic
a_ = self.actor_target(bs_)
q_ = self.critic_target(bs_, a_)
q_target = br + self.gamma * q_
q_eval = self.critic(bs, ba)
td_error = self.mse_loss(q_target,q_eval)

Сеть Актера обновляется в соответствии со значением Q следующим образом:

# 训练Actor
a = self.actor(bs)
q = self.critic(bs, a)
a_loss = -torch.mean(q)

Полный код этой части выглядит следующим образом:

class DDPG(object):
    def __init__(self, state_dim, action_dim, action_bound, replacement,memory_capacticy=1000,gamma=0.9,lr_a=0.001, lr_c=0.002,batch_size=32) :
        super(DDPG,self).__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.memory_capacticy = memory_capacticy
        self.replacement = replacement
        self.t_replace_counter = 0
        self.gamma = gamma
        self.lr_a = lr_a
        self.lr_c = lr_c
        self.batch_size = batch_size
        # 记忆库
        self.memory = np.zeros((memory_capacticy, state_dim * 2 + action_dim + 1))
        self.pointer = 0
        # 初始化 Actor 网络
        self.actor = Actor(state_dim, action_dim, action_bound)
        self.actor_target = Actor(state_dim, action_dim, action_bound)
        # 初始化 Critic 网络
        self.critic = Critic(state_dim,action_dim)
        self.critic_target = Critic(state_dim,action_dim)
        # 定义优化器
        self.aopt = torch.optim.Adam(self.actor.parameters(), lr=lr_a)
        self.copt = torch.optim.Adam(self.critic.parameters(), lr=lr_c)
        # 选取损失函数
        self.mse_loss = nn.MSELoss()
    
		# 从记忆库中随机采样    
    def sample(self):
        indices = np.random.choice(self.memory_capacticy, size=self.batch_size)
        return self.memory[indices, :] 

    def choose_action(self, s):
        s = torch.FloatTensor(s)
        action = self.actor(s)
        return action.detach().numpy()

    def learn(self):

        # soft replacement and hard replacement
        # 用于更新target网络的参数
        if self.replacement['name'] == 'soft':
            # soft的意思是每次learn的时候更新部分参数
            tau = self.replacement['tau']
            a_layers = self.actor_target.named_children()
            c_layers = self.critic_target.named_children()
            for al in a_layers:
                a = self.actor.state_dict()[al[0]+'.weight']
                al[1].weight.data.mul_((1-tau))
                al[1].weight.data.add_(tau * self.actor.state_dict()[al[0]+'.weight'])
                al[1].bias.data.mul_((1-tau))
                al[1].bias.data.add_(tau * self.actor.state_dict()[al[0]+'.bias'])
            for cl in c_layers:
                cl[1].weight.data.mul_((1-tau))
                cl[1].weight.data.add_(tau * self.critic.state_dict()[cl[0]+'.weight'])
                cl[1].bias.data.mul_((1-tau))
                cl[1].bias.data.add_(tau * self.critic.state_dict()[cl[0]+'.bias'])
            
        else:
            # hard的意思是每隔一定的步数才更新全部参数
            if self.t_replace_counter % self.replacement['rep_iter'] == 0:
                self.t_replace_counter = 0
                a_layers = self.actor_target.named_children()
                c_layers = self.critic_target.named_children()
                for al in a_layers:
                    al[1].weight.data = self.actor.state_dict()[al[0]+'.weight']
                    al[1].bias.data = self.actor.state_dict()[al[0]+'.bias']
                for cl in c_layers:
                    cl[1].weight.data = self.critic.state_dict()[cl[0]+'.weight']
                    cl[1].bias.data = self.critic.state_dict()[cl[0]+'.bias']
            
            self.t_replace_counter += 1

        # 从记忆库中采样bacth data
        bm = self.sample()
        bs = torch.FloatTensor(bm[:, :self.state_dim])
        ba = torch.FloatTensor(bm[:, self.state_dim:self.state_dim + self.action_dim])
        br = torch.FloatTensor(bm[:, -self.state_dim - 1: -self.state_dim])
        bs_ = torch.FloatTensor(bm[:,-self.state_dim:])
        
        # 训练Actor
        a = self.actor(bs)
        q = self.critic(bs, a)
        a_loss = -torch.mean(q)
        self.aopt.zero_grad()
        a_loss.backward(retain_graph=True)
        self.aopt.step()
        
        # 训练critic
        a_ = self.actor_target(bs_)
        q_ = self.critic_target(bs_, a_)
        q_target = br + self.gamma * q_
        q_eval = self.critic(bs, ba)
        td_error = self.mse_loss(q_target,q_eval)
        self.copt.zero_grad()
        td_error.backward()
        self.copt.step()

    # 存储序列数据
    def store_transition(self, s, a, r, s_):
        transition = np.hstack((s,a,[r],s_))
        index = self.pointer % self.memory_capacticy
        self.memory[index, :] = transition
        self.pointer += 1

некоторые трюки

soft-replacement

В процессе реализации наша целевая сеть также нуждается в обновлении параметров.На самом деле обновление целевых параметров происходит из сети оценки (то есть исходной сети Актера и Критика). Существует два метода обновления параметров:

  • hard replacementВсе параметры обновляются через определенное количество шагов, то есть все параметры расчетной сети заменяются на целевую сеть.
  • soft replacementОн обновляется на каждом шаге, но обновляется только часть параметров.

add noise to action

Добавлен выбор стратегии ДействияПроводить исследования.

На самом деле, это увеличивает исследование некоторого пространства действия, добавляя случайный шум. Реализация заключается в корректировке дисперсии.


Наконец, полный код был помещен вgithub.