您现在的位置是:首页 >其他 >强化学习中算法网站首页其他
强化学习中算法
简介强化学习中算法
如何写一个深度学习网络
1.下面写了一个最简单的网络
class Qnet(torch.nn.Module):
''' 只有一层隐藏层的Q网络 '''
def __init__(self, state_dim, hidden_dim, action_dim):
调用super方法对Module进行继承操作
super(Qnet, self).__init__()
#第一个线性,
#假设(1)->(64)
# 全链接网络处理办法
self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
# 二位卷积,batch ,通道,长,宽
self.conv = torch.nn.Conv2d(in_channels=1, out_channels=4, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(kernel_size=2, stride=2) # 最大池化层
self.conv1 = torch.nn.Conv2d(in_channels=4, out_channels=4, kernel_size=3, padding=1)
self.relu1 = nn.ReLU() # ReLU 激活函数
self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x):
print(x)
# (-1,4)->(-1,64)
x = self.fc1(x)
# (-1,64)->(-1,1,8,8)
x = x.view(-1, 1, 8, 8)
# (-1, 1, 8, 8)->(-1,4,8,8)
x = self.conv(x)
x = self.relu1(x) # ReLU 激活函数
# (-1,4,8,8)->(-1,4,4,4)
x = self.pool(x)
# (-1,4,4,4)->(-1,4*4*4)
x = x.view(-1, 4*4*4)
# (-1,64)->(-1,2)
x = F.relu(self.fc3(x))
return x
上面主要是用到了
# 卷积Conv2d,道数随机给,padding为1长宽不会变
# 池化MaxPool2d、
# Linear线性操作(全连接网络)
# view变换维度
打印网络
def test_network():
# 初始化网络
model = Qnet(4,64,2)
# 打印模型结构
print("Model Structure:")
print(model)
# 测试单个样本输入
input_data = torch.randn(1, 4) # 生成单个随机输入
output = model(input_data) # 前向传播
print("Output shape:", output.shape)
2.如何训练
loss_fn = nn.CrossEntropyLoss() # 损失函数设置,交叉熵损失
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # 学习率设置
epochs = 5 # 训练迭代次数设置
def train(train_dataloader, model, loss_fn, optimizer):
"""
训练网络
输入:
train_dataloader: 训练集的dataloader
model: 网络模型
loss_fn: 损失函数
optimizer: 优化器
"""
# 切换到train模式
model.train()
# 遍历dataloader
for images, labels in train_dataloader:
# 将数据和标签加载到device上
images, labels = images.to(device), labels.to(device)
# 输入数据到模型里得到输出
pred = model(images)
# 计算输出和标签的loss
loss = loss_fn(pred, labels)
# 反向推导
optimizer.zero_grad() #梯度清零。
loss.backward() #求解梯度
# 步进优化器
optimizer.step() #更新参数
主要是包含损失函数的类型,学习率,优化器的类型
**损失函数自己去设计**
**优化器使用的时候需要清零(optimizer.zero_grad),基于本次的loss进行反向传播 (loss.backward),迭代求解更新网络(optimizer.step)**
3.如何进行网络测试
def test(test_dataloader, model, loss_fn):
"""
测试网络
输入:
test_dataloader: 测试集的dataloader
model: 网络模型
loss_fn: 损失函数
"""
# 测试集大小
size = len(test_dataloader.dataset)
# 测试集的batch数量
num_batches = len(test_dataloader)
# 切换到测试模型
model.eval()
# 记录loss和准确率
test_loss, correct = 0, 0
# 梯度截断
with torch.no_grad():
for images, labels in test_dataloader: # 遍历batch
# 加载到device
images, labels = images.to(device), labels.to(device)
# 输入数据到模型里得到输出
pred = model(images)
# 累加loss
test_loss += loss_fn(pred, labels).item()
# 累加正确率
correct += (pred.argmax(1) == labels).sum().item()
# 计算平均loss和准确率
test_loss /= num_batches
correct /= size
print(f"Test Error:
Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f}
")
# 保存模型
torch.save(model.state_dict(), "model.pth")
主要是注意梯度截断。
强化学习代码中的一些设计
1.经验池
''' 经验回放池 '''
class ReplayBuffer:
def __init__(self, capacity):
#这是一个固定队列的定义,容量为capacity
self.buffer = collections.deque(maxlen=capacity) # 队列,先进先出
def add(self, state, action, reward, next_state, done): # 将数据加入buffer
#state: (n, 4), action: (n, ), reward: (n, ), next_state: (n, 4), done: (n, )
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size): # 从buffer中采样数据,数量为batch_size
#random.sample随机采样,返回一个元组
transitions = random.sample(self.buffer, batch_size)
# 拆包元组,其中*transitions表示遍历transitions中的所有元素
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
def size(self): # 目前buffer中数据的数量
return len(self.buffer)
2.状态学习-DQN的TD更新
# transition_dict是一个经验池的数据,对于每个回合里面的每一个步骤都可以更新(当然也可以不进行更新,这个是任意的)
states = torch.tensor(transition_dict['states'],
dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
rewards = torch.tensor(transition_dict['rewards'],
dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'],
dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
dtype=torch.float).view(-1, 1).to(self.device)
# 将状态输入到网络里面,输出得到状态
q_values = self.q_net(states).gather(1, actions) # Q值
#基于下一个状态的最大Q值计算bellman误差
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones) # TD误差目标
# 均方误差损失函数
dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))
self.optimizer.zero_grad() # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
self.q_net.train()
dqn_loss.backward() # 反向传播更新参数
self.optimizer.step()
if self.count % self.target_update == 0:
self.target_q_net.load_state_dict(
self.q_net.state_dict()) # 更新目标网络
self.count += 1
3.REINFORCE策略学习
# 跟梯度更新的算法必须是整个回合的步数加载一起才能更新
self.optimizer.zero_grad()
for i in reversed(range(len(reward_list))): # 从最后一步算起
reward = reward_list[i]
print("reward的大小:",reward)
# 1个向量,含有4个数字
state = torch.tensor([state_list[i]],
dtype=torch.float).to(self.device)
print("state的维数:",state.size()) #torch.Size([1, 4])
# 1个向量,含有1个数字
action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device)
print("动作大小:",action_list[i])
print("action的维数:",action.size())
# 输入策略的得到,得到策略。
log_prob = torch.log(self.policy_net(state).gather(1, action))
#这里的Q是蒙特卡洛采样得到的
G = self.gamma * G + reward
loss = -log_prob * G # 每一步的损失函数
loss.backward() # 反向传播计算梯度
print(loss)
# 梯度计算是基于每个步数的
# 但是更新网络是基于一个回合的
self.optimizer.step() # 梯度下降
4.AC网络(有错误需要更新)
##每个回合中的每个步数都可以更新,但是会存在batch扩大的情况,因为batch是由,回车的次数决定的。
def update(self, transition_dict):
# 训练集
states = torch.tensor(transition_dict['states'], dtype=torch.float)
print(states)
actions = torch.tensor(transition_dict['actions']).view(-1, 1)
rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1)
next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float)
dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1)
# 预测的当前时刻的state_value
td_value = self.critic(states)
# 目标的当前时刻的state_value
td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
# 时序差分的误差计算,目标的state_value与预测的state_value之差
td_delta = td_target - td_value
# 对每个状态对应的动作价值用log函数
log_probs = torch.log(self.actor(states).gather(1, actions))
# 策略梯度损失
actor_loss = torch.mean(-log_probs * td_delta.detach())
# 值函数损失,预测值和目标值之间
critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
# 优化器梯度清0
self.actor_optimizer.zero_grad() # 策略梯度网络的优化器
self.critic_optimizer.zero_grad() # 价值网络的优化器
# 反向传播
actor_loss.backward()
critic_loss.backward()
# 参数更新
self.actor_optimizer.step()
self.critic_optimizer.step()
5.学会定义一个环境
5.1首先环境是由gym.Env给定的,因此需要继承:
def init(self):
初始化,需要给出->动作空间,状态空间,观测空间
def seed(self, seed=None):
随机种子(无所谓)
def step(self, action):
核心实现
函数入口是动作,输出是np.array(self.state), reward, done, {}
def reset(self):
随机初始化
def render(self, mode=‘humn’):
整个窗口的显示动画(可以不定义)
def close(self):
关闭动作
high = np.array([self.x_threshold * 2, # 4.8
np.finfo(np.float32).max, # 取float的最大值
self.theta_threshold_radians * 2, # 24°
np.finfo(np.float32).max], # 取float的最大值
dtype=np.float32)
self.action_space = spaces.Discrete(2) # 离散动作定义(2个动作)为0,1
self.observation_space = spaces.Box(-high, high, dtype=np.float32) # 连续状态定义(四种都是连续的)
## 常用的打印
print('观测空间 = {}'.format(env.observation_space))
# 打印维数
print(env.observation_space.shape[0])
# 连续空间
print(type(env.observation_space))
#对于离散的状态空间输出为Discrete(2){离散的取和1}
print('动作空间 = {}'.format(env.action_space))
# 打印维数
print('动作数 = {}'.format(env.action_space.n))
# 动作空间数据类型
print(type(env.action_space))
##
env.reset() ##必须先进行初始化
action = 0
next_state, reward, done, _ = env.step(action)
print(next_state)
print(type(next_state)) # <class 'gym.spaces.discrete.Discrete'>
print(reward)
print(type(reward))
print(done)
print(type(done))
return env
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。





U8W/U8W-Mini使用与常见问题解决
QT多线程的5种用法,通过使用线程解决UI主界面的耗时操作代码,防止界面卡死。...
stm32使用HAL库配置串口中断收发数据(保姆级教程)
分享几个国内免费的ChatGPT镜像网址(亲测有效)
Allegro16.6差分等长设置及走线总结