您现在的位置是:首页 >其他 >强化学习中算法网站首页其他

强化学习中算法

勤劳的小dong 2025-12-23 12:01:02
简介强化学习中算法

如何写一个深度学习网络

1.下面写了一个最简单的网络

class Qnet(torch.nn.Module):
    ''' 只有一层隐藏层的Q网络 '''
    def __init__(self, state_dim, hidden_dim, action_dim):
调用super方法对Module进行继承操作
        super(Qnet, self).__init__()
        #第一个线性,
        #假设(1)->(64)
        # 全链接网络处理办法
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        # 二位卷积,batch ,通道,长,宽
        self.conv = torch.nn.Conv2d(in_channels=1, out_channels=4, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)  # 最大池化层
        self.conv1 = torch.nn.Conv2d(in_channels=4, out_channels=4, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()  # ReLU 激活函数
        self.fc3 = torch.nn.Linear(hidden_dim, action_dim)
    def forward(self, x):
        print(x)
        # (-1,4)->(-1,64)
        x = self.fc1(x)
        # (-1,64)->(-1,1,8,8)
        x = x.view(-1, 1, 8, 8)
        # (-1, 1, 8, 8)->(-1,4,8,8)
        x = self.conv(x)
        x = self.relu1(x)  # ReLU 激活函数
        # (-1,4,8,8)->(-1,4,4,4)
        x = self.pool(x)
        # (-1,4,4,4)->(-1,4*4*4)
        x = x.view(-1, 4*4*4)
        # (-1,64)->(-1,2)
        x = F.relu(self.fc3(x))
        return x

上面主要是用到了

​ # 卷积Conv2d,道数随机给,padding为1长宽不会变

​ # 池化MaxPool2d、

​ # Linear线性操作(全连接网络)

​ # view变换维度

打印网络

def test_network():
    # 初始化网络
    model = Qnet(4,64,2)

    # 打印模型结构
    print("Model Structure:")
    print(model)

    # 测试单个样本输入
    input_data = torch.randn(1, 4)  # 生成单个随机输入
    output = model(input_data)  # 前向传播
    print("Output shape:", output.shape)

2.如何训练

loss_fn = nn.CrossEntropyLoss()                             # 损失函数设置,交叉熵损失
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)    # 学习率设置
epochs = 5                                                  # 训练迭代次数设置
def train(train_dataloader, model, loss_fn, optimizer):
    """
        训练网络
        输入:
            train_dataloader:   训练集的dataloader
            model:              网络模型
            loss_fn:            损失函数
            optimizer:          优化器
        """
    # 切换到train模式
    model.train()
    # 遍历dataloader
    for images, labels in train_dataloader:
        # 将数据和标签加载到device上
        images, labels = images.to(device), labels.to(device)
        # 输入数据到模型里得到输出
        pred = model(images)
        # 计算输出和标签的loss
        loss = loss_fn(pred, labels)
        # 反向推导
        optimizer.zero_grad() #梯度清零。
        loss.backward()   #求解梯度
        # 步进优化器
        optimizer.step()  #更新参数

主要是包含损失函数的类型,学习率,优化器的类型

	**损失函数自己去设计**
    **优化器使用的时候需要清零(optimizer.zero_grad),基于本次的loss进行反向传播 (loss.backward),迭代求解更新网络(optimizer.step)**

3.如何进行网络测试

def test(test_dataloader, model, loss_fn):
    """
        测试网络
        输入:
            test_dataloader:    测试集的dataloader
            model:              网络模型
            loss_fn:            损失函数
        """
    # 测试集大小
    size = len(test_dataloader.dataset)
    # 测试集的batch数量
    num_batches = len(test_dataloader)
    # 切换到测试模型
    model.eval()
    # 记录loss和准确率
    test_loss, correct = 0, 0
    # 梯度截断
    with torch.no_grad():
        for images, labels in test_dataloader:  # 遍历batch
            # 加载到device
            images, labels = images.to(device), labels.to(device)
            # 输入数据到模型里得到输出
            pred = model(images)
            # 累加loss
            test_loss += loss_fn(pred, labels).item()
            # 累加正确率
            correct += (pred.argmax(1) == labels).sum().item()
    # 计算平均loss和准确率
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: 
 Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} 
")
    # 保存模型
    torch.save(model.state_dict(), "model.pth")

主要是注意梯度截断。

强化学习代码中的一些设计

1.经验池

''' 经验回放池 '''
class ReplayBuffer:
    def __init__(self, capacity):
        #这是一个固定队列的定义,容量为capacity
        self.buffer = collections.deque(maxlen=capacity)  # 队列,先进先出
    def add(self, state, action, reward, next_state, done):  # 将数据加入buffer
        #state: (n, 4), action: (n, ), reward: (n, ), next_state: (n, 4), done: (n, )
        self.buffer.append((state, action, reward, next_state, done))
    def sample(self, batch_size):  # 从buffer中采样数据,数量为batch_size
        #random.sample随机采样,返回一个元组
        transitions = random.sample(self.buffer, batch_size)
        # 拆包元组,其中*transitions表示遍历transitions中的所有元素
        state, action, reward, next_state, done = zip(*transitions)
        return np.array(state), action, reward, np.array(next_state), done
    def size(self):  # 目前buffer中数据的数量
        return len(self.buffer)

2.状态学习-DQN的TD更新


# transition_dict是一个经验池的数据,对于每个回合里面的每一个步骤都可以更新(当然也可以不进行更新,这个是任意的)
states = torch.tensor(transition_dict['states'],
                      dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
rewards = torch.tensor(transition_dict['rewards'],
                       dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict['next_states'],
                           dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict['dones'],
                     dtype=torch.float).view(-1, 1).to(self.device)

# 将状态输入到网络里面,输出得到状态
q_values = self.q_net(states).gather(1, actions)  # Q值

#基于下一个状态的最大Q值计算bellman误差
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)  # TD误差目标

# 均方误差损失函数
dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))
self.optimizer.zero_grad()  # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
self.q_net.train()
dqn_loss.backward()  # 反向传播更新参数
self.optimizer.step()

if self.count % self.target_update == 0:
    self.target_q_net.load_state_dict(
        self.q_net.state_dict())  # 更新目标网络
self.count += 1

3.REINFORCE策略学习

# 跟梯度更新的算法必须是整个回合的步数加载一起才能更新
self.optimizer.zero_grad()
for i in reversed(range(len(reward_list))):  # 从最后一步算起
    reward = reward_list[i]
    print("reward的大小:",reward)
    # 1个向量,含有4个数字
    state = torch.tensor([state_list[i]],
                         dtype=torch.float).to(self.device)
    print("state的维数:",state.size()) #torch.Size([1, 4])

    # 1个向量,含有1个数字
    action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device)
    print("动作大小:",action_list[i])
    print("action的维数:",action.size())

    # 输入策略的得到,得到策略。
    log_prob = torch.log(self.policy_net(state).gather(1, action))
    
     #这里的Q是蒙特卡洛采样得到的
    G = self.gamma * G + reward
    loss = -log_prob * G  # 每一步的损失函数
    loss.backward()  # 反向传播计算梯度
    print(loss)

# 梯度计算是基于每个步数的
# 但是更新网络是基于一个回合的
self.optimizer.step()  # 梯度下降

4.AC网络(有错误需要更新)

##每个回合中的每个步数都可以更新,但是会存在batch扩大的情况,因为batch是由,回车的次数决定的。
def update(self, transition_dict):
    # 训练集
    states = torch.tensor(transition_dict['states'], dtype=torch.float)
    print(states)
    actions = torch.tensor(transition_dict['actions']).view(-1, 1)
    rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1)
    next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float)
    dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1)

    # 预测的当前时刻的state_value
    td_value = self.critic(states)
    # 目标的当前时刻的state_value
    td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
    # 时序差分的误差计算,目标的state_value与预测的state_value之差
    td_delta = td_target - td_value

    # 对每个状态对应的动作价值用log函数
    log_probs = torch.log(self.actor(states).gather(1, actions))
    # 策略梯度损失
    actor_loss = torch.mean(-log_probs * td_delta.detach())
    # 值函数损失,预测值和目标值之间
    critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))

    # 优化器梯度清0
    self.actor_optimizer.zero_grad()  # 策略梯度网络的优化器
    self.critic_optimizer.zero_grad()  # 价值网络的优化器
    # 反向传播
    actor_loss.backward()
    critic_loss.backward()
    # 参数更新
    self.actor_optimizer.step()
    self.critic_optimizer.step()

5.学会定义一个环境

5.1首先环境是由gym.Env给定的,因此需要继承:

def init(self):

​ 初始化,需要给出->动作空间,状态空间,观测空间

def seed(self, seed=None):

​ 随机种子(无所谓)

def step(self, action):

​ 核心实现

​ 函数入口是动作,输出是np.array(self.state), reward, done, {}

def reset(self):

​ 随机初始化

def render(self, mode=‘humn’):

​ 整个窗口的显示动画(可以不定义)

def close(self):

​ 关闭动作

    high = np.array([self.x_threshold * 2,  # 4.8
                     np.finfo(np.float32).max,  # 取float的最大值
                     self.theta_threshold_radians * 2,  # 24°
                     np.finfo(np.float32).max],  # 取float的最大值
                    dtype=np.float32)

    self.action_space = spaces.Discrete(2)  # 离散动作定义(2个动作)为0,1

    self.observation_space = spaces.Box(-high, high, dtype=np.float32)  # 连续状态定义(四种都是连续的)
    
    
    ## 常用的打印     
    print('观测空间 = {}'.format(env.observation_space))
    # 打印维数
    print(env.observation_space.shape[0])
    # 连续空间
    print(type(env.observation_space))

    
    #对于离散的状态空间输出为Discrete(2){离散的取和1}
    print('动作空间 = {}'.format(env.action_space))
    # 打印维数
    print('动作数 = {}'.format(env.action_space.n))
    # 动作空间数据类型
    print(type(env.action_space))
       
    ##
    env.reset()  ##必须先进行初始化
    action = 0
    next_state, reward, done, _ = env.step(action)
    print(next_state)
    print(type(next_state)) # <class 'gym.spaces.discrete.Discrete'>
    print(reward)
    print(type(reward))
    print(done)
    print(type(done))
    return env
    

        
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。