您现在的位置是:首页 >技术杂谈 >基于遗传算法(GA)的进化强化(ERL)算法中进化(EA)部分详解网站首页技术杂谈

基于遗传算法(GA)的进化强化(ERL)算法中进化(EA)部分详解

addiction z 2025-12-23 12:01:02
简介基于遗传算法(GA)的进化强化(ERL)算法中进化(EA)部分详解

最近阅读了多智能体进化强化学习的算法RACE论文中的代码https://github.com/yeshenpy/RACE,之前对代码的进化部分还不是很理解。因此做一笔记来记录,帮助理解。


1.进化算法应用部分

每当RL智能体rollout一个epoch时,首先对每个智能体进行评估(无噪声),将评估经验加入缓冲区,并且增加step。评估最好的个体会在后续和RL个体一同评估多次得到best_score。

 if self.args.EA and self.rl_agent_frames >= 10000:
            self.evo_times += 1
            random_num_num = random.random()
            if random_num_num < self.args.theta:
                for i, net in enumerate(self.pop):
                    for _ in range(self.args.num_evals):
                        episode = self.evaluate(net, self.rl_agent.state_embeddings, is_render=False,
                                                is_action_noise=False, net_index=i)
                        real_rewards[i] += episode['reward']
                real_rewards /= self.args.num_evals
                all_fitness = real_rewards
            else:
                for i, net in enumerate(self.pop):
                    episode = self.evaluate(net, self.rl_agent.state_embeddings, is_render=False, is_action_noise=False,
                                            net_index=i, use_n_step_return=True, PeVFA=self.rl_agent.PVN)
                    fake_rewards[i] += episode['n_step_discount_reward']
                    MC_n_steps_rewards[i] += episode['reward']
                all_fitness = fake_rewards

        else:
            all_fitness = np.zeros(len(self.pop))

随后启用多个EA epoch,此处启用的EA epoch个数和智能体个数相同,因为每个epoch只对某个特定的智能体(agent_index)进行操作,这里每个epoch都是根据之前得来的fitness进行的。

        if self.args.EA:
            if random.random() <= self.args.agent_level_prob:

                for agent_index in range(self.args.n_agents):
                    elite_index = self.evolver.epoch(self.pop, all_fitness, agent_index, agent_level=True)
            else:
                for agent_index in range(self.args.n_agents):
                    elite_index = self.evolver.epoch(self.pop, all_fitness, agent_index, agent_level=False)
        else:
            elite_index = 0

EA epoch迭代完后,根据args.rl_to_ea_synch_period,将RL个体的参数复制到种群最弱的个体中,当最弱个体和精英个体索引相同时(step<10000,所有个体fitness是0),对索引进行+1并整除。

if self.args.EA and self.args.RL and self.rl_agent_frames >= 10000:
    # if self.args.EA and self.args.RL and self.num_frames> 50e3:
    if self.iterations % self.args.rl_to_ea_synch_period == 0:
        # Replace any index different from the new elite
        replace_index = np.argmin(all_fitness)
        if replace_index == elite_index:
            replace_index = (replace_index + 1) % len(self.pop)
        for index in range(self.args.n_agents):
            self.rl_to_evo(self.rl_agent, self.pop[replace_index], index)
        self.evolver.rl_policy = replace_index
        print('Sync from RL --> Nevo')

2.EA epoch逻辑

1.锦标赛规则:

传入index_rank(基于适应度排序后的个体索引列表(从高到低,0是最优个体)),num_offsprings(需要选择的后代数量),和tournament_size(参与竞争的后代数量)。

    def selection_tournament(self, index_rank, num_offsprings, tournament_size):
        total_choices = len(index_rank)
        offsprings = []
        for i in range(num_offsprings):
            winner = np.min(np.random.randint(total_choices, size=tournament_size))
            offsprings.append(index_rank[winner])

        offsprings = list(set(offsprings))  # Find unique offsprings
        if len(offsprings) % 2 != 0:  # Number of offsprings should be even
            offsprings.append(offsprings[fastrand.pcg32bounded(len(offsprings))])
        return offsprings

1.循环:从 index_rank中随机选择 tournament_size 个个体作为候选人。从这 tournament_size 个候选人中,选择适应度最好的一个,加入offsprings。

2.去重:用集合对offsprings去重保证没有重复。

3.保证偶数个后代:如果选择出的后代个体数量是奇数,就从offsprings中随机选择一个个体,并将其加入到 offsprings中,使得后代个体的数量变为偶数,以便后续交叉。

之后将既不是offsprings也不是精英(代码中为1个)的个体视作unselects。

这个选取过程中有较大的随机性,体现了EA的探索性。

2.精英保留步骤

        for i in elitist_index:
            try: replacee = unselects.pop(0)
            except: replacee = offsprings.pop(0)
            new_elitists.append(replacee)
            self.clone(master=pop[i], replacee=pop[replacee], agent_index=agent_index)

用精英个体的参数替换unselects或offsprings(当unselects不存在时)的个体,并用new_elitists列表来记录这个被替换的个体。

3.交叉

            if len(unselects) % 2 != 0:  # Number of unselects left should be even
                unselects.append(unselects[fastrand.pcg32bounded(len(unselects))])

            for i, j in zip(unselects[0::2], unselects[1::2]):
                off_i = random.choice(new_elitists)#从new_elitists中选取精英个体
                off_j = random.choice(offsprings)#从offsprings中选取被繁殖个体
                #将精英个体和offspring个体的参数传递给unselects中的个体
                self.clone(master=pop[off_i], replacee=pop[i],agent_index= agent_index)
                self.clone(master=pop[off_j], replacee=pop[j],agent_index = agent_index)
                
                if agent_level:
                    #直接对父代的参数进行复制

                    if random.random() < 0.5:
                        self.clone(master=pop[i], replacee=pop[j], agent_index=agent_index)
                    else :
                        self.clone(master=pop[j], replacee=pop[i], agent_index=agent_index)
                else :
                    #交叉
                    self.crossover_inplace(pop[i], pop[j],agent_index)

交叉的代码:

    def crossover_inplace(self, gene1: GeneticAgent, gene2: GeneticAgent, agent_index:int):
        # Evaluate the parents


        b_1 = None 
        b_2 = None 
        for param1, param2 in zip(gene1.actors[agent_index].parameters(), gene2.actors[agent_index].parameters()):
            # References to the variable tensors
            W1 = param1.data
            W2 = param2.data 
            if len(W1.shape) == 1:
                b_1 = W1
                b_2 = W2

        for param1, param2 in zip(gene1.actors[agent_index].parameters(), gene2.actors[agent_index].parameters()):
            # References to the variable tensors
            W1 = param1.data
            W2 = param2.data

            if len(W1.shape) == 2: #Weights no bias
                num_variables = W1.shape[0]
                # Crossover opertation [Indexed by row]
                num_cross_overs = fastrand.pcg32bounded(num_variables * 2)  # Lower bounded on full swaps
                for i in range(num_cross_overs):
                    receiver_choice = random.random()  # Choose which gene to receive the perturbation
                    if receiver_choice < 0.5:
                        ind_cr = fastrand.pcg32bounded(W1.shape[0])  #
                        W1[ind_cr,:] = W2[ind_cr,:]
                        b_1[ind_cr] = b_2[ind_cr]
                    else:
                        ind_cr = fastrand.pcg32bounded(W1.shape[0])  #
                        W2[ind_cr,:] = W1[ind_cr,:]
                        b_2[ind_cr] = b_1[ind_cr]

4.变异

        for i in range(self.population_size):
            if i not in new_elitists:  # Spare the new elitists
                if random.random() < self.args.mutation_prob:
                    if self.args.proximal_mut:
                        self.proximal_mutate(pop[i], mag=self.args.mutation_mag)
                    else:
                        self.mutate_inplace(pop[i],agent_index=agent_index,agent_level=agent_level)

一定概率对new_elitists的个体进行变异。

以下是变异代码:

 def mutate_inplace(self, gene: GeneticAgent, agent_index, agent_level= False):
        trials = 5

        mut_strength = 0.1
        num_mutation_frac = 0.1
        super_mut_strength = 10
        super_mut_prob = self.prob_reset_and_sup
        reset_prob = super_mut_prob + self.prob_reset_and_sup

        num_params = len(list(gene.actors[agent_index].parameters()))
        ssne_probabilities = np.random.uniform(0, 1, num_params) * 2
        model_params = gene.actors[agent_index].state_dict()

        for i, key in enumerate(model_params): #Mutate each param

            if is_lnorm_key(key):
                continue

            # References to the variable keys
            W = model_params[key]
            if len(W.shape) == 2: #Weights, no bias
                if agent_level:
                    ssne_prob = ssne_probabilities[i]
                    action_prob = 1.0
                else :
                    ssne_prob = 1.0
                    action_prob = ssne_probabilities[i]

                if random.random() < ssne_prob:
                    num_variables = W.shape[0]
                    # Crossover opertation [Indexed by row]
                    for index in range(num_variables):
                        random_num_num = random.random()
                        if random_num_num <= action_prob :
                            #print(W)
                            index_list = random.sample(range(W.shape[1]), int(W.shape[1] * self.frac))
                            random_num = random.random()
                            if random_num < super_mut_prob:  # Super Mutation probability
                                for ind in index_list:
                                    W[index, ind] += random.gauss(0, super_mut_strength * W[index, ind])
                            elif random_num < reset_prob:  # Reset probability
                                for ind in index_list:
                                    W[index, ind] = random.gauss(0, 1)
                            else:  # mutation even normal
                                for ind in index_list:
                                    W[index, ind] += random.gauss(0, mut_strength * W[index, ind])

                            # Regularization hard limit
                            W[index, :] = np.clip(W[index, :], a_min=-1000000, a_max=1000000)

关于agent_level:agent_level代表在智能体层面执行交叉和变异,如果agent_leval为true,则交叉直接复制整个智能体的参数,变异的概率为ssne_probabilities,否则采用常规交叉的方式,并且变异的概率为1.

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。