您现在的位置是:首页 >技术杂谈 >基于遗传算法(GA)的进化强化(ERL)算法中进化(EA)部分详解网站首页技术杂谈
基于遗传算法(GA)的进化强化(ERL)算法中进化(EA)部分详解
最近阅读了多智能体进化强化学习的算法RACE论文中的代码https://github.com/yeshenpy/RACE,之前对代码的进化部分还不是很理解。因此做一笔记来记录,帮助理解。
1.进化算法应用部分
每当RL智能体rollout一个epoch时,首先对每个智能体进行评估(无噪声),将评估经验加入缓冲区,并且增加step。评估最好的个体会在后续和RL个体一同评估多次得到best_score。
if self.args.EA and self.rl_agent_frames >= 10000:
self.evo_times += 1
random_num_num = random.random()
if random_num_num < self.args.theta:
for i, net in enumerate(self.pop):
for _ in range(self.args.num_evals):
episode = self.evaluate(net, self.rl_agent.state_embeddings, is_render=False,
is_action_noise=False, net_index=i)
real_rewards[i] += episode['reward']
real_rewards /= self.args.num_evals
all_fitness = real_rewards
else:
for i, net in enumerate(self.pop):
episode = self.evaluate(net, self.rl_agent.state_embeddings, is_render=False, is_action_noise=False,
net_index=i, use_n_step_return=True, PeVFA=self.rl_agent.PVN)
fake_rewards[i] += episode['n_step_discount_reward']
MC_n_steps_rewards[i] += episode['reward']
all_fitness = fake_rewards
else:
all_fitness = np.zeros(len(self.pop))
随后启用多个EA epoch,此处启用的EA epoch个数和智能体个数相同,因为每个epoch只对某个特定的智能体(agent_index)进行操作,这里每个epoch都是根据之前得来的fitness进行的。
if self.args.EA:
if random.random() <= self.args.agent_level_prob:
for agent_index in range(self.args.n_agents):
elite_index = self.evolver.epoch(self.pop, all_fitness, agent_index, agent_level=True)
else:
for agent_index in range(self.args.n_agents):
elite_index = self.evolver.epoch(self.pop, all_fitness, agent_index, agent_level=False)
else:
elite_index = 0
EA epoch迭代完后,根据args.rl_to_ea_synch_period,将RL个体的参数复制到种群最弱的个体中,当最弱个体和精英个体索引相同时(step<10000,所有个体fitness是0),对索引进行+1并整除。
if self.args.EA and self.args.RL and self.rl_agent_frames >= 10000:
# if self.args.EA and self.args.RL and self.num_frames> 50e3:
if self.iterations % self.args.rl_to_ea_synch_period == 0:
# Replace any index different from the new elite
replace_index = np.argmin(all_fitness)
if replace_index == elite_index:
replace_index = (replace_index + 1) % len(self.pop)
for index in range(self.args.n_agents):
self.rl_to_evo(self.rl_agent, self.pop[replace_index], index)
self.evolver.rl_policy = replace_index
print('Sync from RL --> Nevo')
2.EA epoch逻辑
1.锦标赛规则:
传入index_rank(基于适应度排序后的个体索引列表(从高到低,0是最优个体)),num_offsprings(需要选择的后代数量),和tournament_size(参与竞争的后代数量)。
def selection_tournament(self, index_rank, num_offsprings, tournament_size):
total_choices = len(index_rank)
offsprings = []
for i in range(num_offsprings):
winner = np.min(np.random.randint(total_choices, size=tournament_size))
offsprings.append(index_rank[winner])
offsprings = list(set(offsprings)) # Find unique offsprings
if len(offsprings) % 2 != 0: # Number of offsprings should be even
offsprings.append(offsprings[fastrand.pcg32bounded(len(offsprings))])
return offsprings
1.循环:从 index_rank中随机选择 tournament_size 个个体作为候选人。从这 tournament_size 个候选人中,选择适应度最好的一个,加入offsprings。
2.去重:用集合对offsprings去重保证没有重复。
3.保证偶数个后代:如果选择出的后代个体数量是奇数,就从offsprings中随机选择一个个体,并将其加入到 offsprings中,使得后代个体的数量变为偶数,以便后续交叉。
之后将既不是offsprings也不是精英(代码中为1个)的个体视作unselects。
这个选取过程中有较大的随机性,体现了EA的探索性。
2.精英保留步骤
for i in elitist_index:
try: replacee = unselects.pop(0)
except: replacee = offsprings.pop(0)
new_elitists.append(replacee)
self.clone(master=pop[i], replacee=pop[replacee], agent_index=agent_index)
用精英个体的参数替换unselects或offsprings(当unselects不存在时)的个体,并用new_elitists列表来记录这个被替换的个体。
3.交叉
if len(unselects) % 2 != 0: # Number of unselects left should be even
unselects.append(unselects[fastrand.pcg32bounded(len(unselects))])
for i, j in zip(unselects[0::2], unselects[1::2]):
off_i = random.choice(new_elitists)#从new_elitists中选取精英个体
off_j = random.choice(offsprings)#从offsprings中选取被繁殖个体
#将精英个体和offspring个体的参数传递给unselects中的个体
self.clone(master=pop[off_i], replacee=pop[i],agent_index= agent_index)
self.clone(master=pop[off_j], replacee=pop[j],agent_index = agent_index)
if agent_level:
#直接对父代的参数进行复制
if random.random() < 0.5:
self.clone(master=pop[i], replacee=pop[j], agent_index=agent_index)
else :
self.clone(master=pop[j], replacee=pop[i], agent_index=agent_index)
else :
#交叉
self.crossover_inplace(pop[i], pop[j],agent_index)
交叉的代码:
def crossover_inplace(self, gene1: GeneticAgent, gene2: GeneticAgent, agent_index:int):
# Evaluate the parents
b_1 = None
b_2 = None
for param1, param2 in zip(gene1.actors[agent_index].parameters(), gene2.actors[agent_index].parameters()):
# References to the variable tensors
W1 = param1.data
W2 = param2.data
if len(W1.shape) == 1:
b_1 = W1
b_2 = W2
for param1, param2 in zip(gene1.actors[agent_index].parameters(), gene2.actors[agent_index].parameters()):
# References to the variable tensors
W1 = param1.data
W2 = param2.data
if len(W1.shape) == 2: #Weights no bias
num_variables = W1.shape[0]
# Crossover opertation [Indexed by row]
num_cross_overs = fastrand.pcg32bounded(num_variables * 2) # Lower bounded on full swaps
for i in range(num_cross_overs):
receiver_choice = random.random() # Choose which gene to receive the perturbation
if receiver_choice < 0.5:
ind_cr = fastrand.pcg32bounded(W1.shape[0]) #
W1[ind_cr,:] = W2[ind_cr,:]
b_1[ind_cr] = b_2[ind_cr]
else:
ind_cr = fastrand.pcg32bounded(W1.shape[0]) #
W2[ind_cr,:] = W1[ind_cr,:]
b_2[ind_cr] = b_1[ind_cr]
4.变异
for i in range(self.population_size):
if i not in new_elitists: # Spare the new elitists
if random.random() < self.args.mutation_prob:
if self.args.proximal_mut:
self.proximal_mutate(pop[i], mag=self.args.mutation_mag)
else:
self.mutate_inplace(pop[i],agent_index=agent_index,agent_level=agent_level)
一定概率对new_elitists的个体进行变异。
以下是变异代码:
def mutate_inplace(self, gene: GeneticAgent, agent_index, agent_level= False):
trials = 5
mut_strength = 0.1
num_mutation_frac = 0.1
super_mut_strength = 10
super_mut_prob = self.prob_reset_and_sup
reset_prob = super_mut_prob + self.prob_reset_and_sup
num_params = len(list(gene.actors[agent_index].parameters()))
ssne_probabilities = np.random.uniform(0, 1, num_params) * 2
model_params = gene.actors[agent_index].state_dict()
for i, key in enumerate(model_params): #Mutate each param
if is_lnorm_key(key):
continue
# References to the variable keys
W = model_params[key]
if len(W.shape) == 2: #Weights, no bias
if agent_level:
ssne_prob = ssne_probabilities[i]
action_prob = 1.0
else :
ssne_prob = 1.0
action_prob = ssne_probabilities[i]
if random.random() < ssne_prob:
num_variables = W.shape[0]
# Crossover opertation [Indexed by row]
for index in range(num_variables):
random_num_num = random.random()
if random_num_num <= action_prob :
#print(W)
index_list = random.sample(range(W.shape[1]), int(W.shape[1] * self.frac))
random_num = random.random()
if random_num < super_mut_prob: # Super Mutation probability
for ind in index_list:
W[index, ind] += random.gauss(0, super_mut_strength * W[index, ind])
elif random_num < reset_prob: # Reset probability
for ind in index_list:
W[index, ind] = random.gauss(0, 1)
else: # mutation even normal
for ind in index_list:
W[index, ind] += random.gauss(0, mut_strength * W[index, ind])
# Regularization hard limit
W[index, :] = np.clip(W[index, :], a_min=-1000000, a_max=1000000)
关于agent_level:agent_level代表在智能体层面执行交叉和变异,如果agent_leval为true,则交叉直接复制整个智能体的参数,变异的概率为ssne_probabilities,否则采用常规交叉的方式,并且变异的概率为1.





U8W/U8W-Mini使用与常见问题解决
QT多线程的5种用法,通过使用线程解决UI主界面的耗时操作代码,防止界面卡死。...
stm32使用HAL库配置串口中断收发数据(保姆级教程)
分享几个国内免费的ChatGPT镜像网址(亲测有效)
Allegro16.6差分等长设置及走线总结