当前位置: 首页 > article >正文

基于 PyTorch 的树叶分类任务:从数据准备到模型训练与测试

基于 PyTorch 的树叶分类任务:从数据准备到模型训练与测试


1. 引言

在计算机视觉领域,图像分类是一个经典的任务。本文将详细介绍如何使用 PyTorch 实现一个树叶分类任务。我们将从数据准备开始,逐步构建模型、训练模型,并在测试集上进行预测,最终生成提交文件。
在这里插入图片描述


2. 环境准备

首先,确保已安装以下 Python 库:

pip install torch torchvision pandas d2l
  • torch:PyTorch 核心库。
  • torchvision:提供计算机视觉相关的工具。
  • pandas:用于处理 CSV 文件。
  • d2l:深度学习工具库,提供辅助函数。

3. 数据准备

竞赛链接:https://www.kaggle.com/competitions/classify-leaves/leaderboard?tab=public

3.1 数据集结构

假设数据集位于 classify-leaves 目录下,包含以下文件:

classify-leaves/
├── train.csv
├── test.csv
├── images/
    ├── image1.jpg
    ├── image2.jpg
    ...
  • train.csv:包含训练图像的路径和标签。
  • test.csv:包含测试图像的路径。

3.2 数据加载与预处理

import os
import pandas as pd
import random

imgpath = "classify-leaves"
trainlist = pd.read_csv(f"{imgpath}/train.csv")
num2name = list(trainlist["label"].value_counts().index)
random.shuffle(num2name)
name2num = {}
for i in range(len(num2name)):
    name2num[num2name[i]] = i
  • num2name:获取所有类别标签,并按类别数量排序。
  • name2num:将类别名称映射到数字编号。

4. 自定义数据集类

为了加载数据,我们需要定义一个自定义数据集类 Leaf_data

from torch.utils.data import Dataset
from d2l import torch as d2l

class Leaf_data(Dataset):
    def __init__(self, path, train, transform=lambda x: x):
        super().__init__()
        self.path = path
        self.transform = transform
        self.train = train
        if train:
            self.datalist = pd.read_csv(f"{path}/train.csv")
        else:
            self.datalist = pd.read_csv(f"{path}/test.csv")

    def __getitem__(self, index):
        res = ()
        tmplist = self.datalist.iloc[index, :]
        for i in tmplist.index:
            if i == "image":
                res += (self.transform(d2l.Image.open(f"{self.path}/{tmplist[i]}")),)
            else:
                res += (name2num[tmplist[i]],)
        if len(res) < 2:
            res += (tmplist[i],)
        return res

    def __len__(self):
        return len(self.datalist)
  • __getitem__:根据索引返回一个样本,包括图像和标签。
  • __len__:返回数据集的长度。

5. 模型定义与初始化

我们使用预训练的 ResNet34 模型,并修改最后一层以适应分类任务:

import torch
import torchvision
from torch import nn

def init_weight(m):
    if type(m) in [nn.Linear, nn.Conv2d]:
        nn.init.xavier_normal_(m.weight)

net = torchvision.models.resnet34(weights=torchvision.models.ResNet34_Weights.IMAGENET1K_V1)
net.fc = nn.Linear(in_features=512, out_features=len(name2num), bias=True)
net.fc.apply(init_weight)
net.to(try_gpu())
  • init_weight:使用 Xavier 初始化方法初始化全连接层的权重。
  • net:加载预训练的 ResNet34 模型,并修改最后一层全连接层。

6. 训练过程

6.1 优化器与损失函数

lr = 1e-4
parames = [parame for name, parame in net.named_parameters() if name not in ["fc.weight", "fc.bias"]]
trainer = torch.optim.Adam([{"params": parames}, {"params": net.fc.parameters(), "lr": lr * 10}], lr=lr)
LR_con = torch.optim.lr_scheduler.CosineAnnealingLR(trainer, 1, 0)
loss = nn.CrossEntropyLoss(reduction='none')
  • trainer:使用 Adam 优化器,对全连接层使用更高的学习率。
  • LR_con:使用余弦退火学习率调度器。
  • loss:使用交叉熵损失函数。

6.2 训练函数


def train_batch(features, labels, net, loss, trainer, device):
    # 将数据移动到指定设备(如 GPU)
    features, labels = features.to(device), labels.to(device)
    
    # 前向传播
    outputs = net(features)
    l = loss(outputs, labels).mean()  # 计算损失
    
    # 反向传播和优化
    trainer.zero_grad()  # 梯度清零
    l.backward()         # 反向传播
    trainer.step()      # 更新参数
    
    # 计算准确率
    acc = (outputs.argmax(dim=1) == labels).float().mean()
    
    return l.item(), acc.item()

def train(train_data, test_data, net, loss, trainer, num_epochs, device=try_gpu()):
    best_acc = 0
    timer = d2l.Timer()
    plot = d2l.Animator(xlabel="epoch", xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test loss'], ylim=[0, 1])
    for epoch in range(num_epochs):
        metric = d2l.Accumulator(4)
        for i, (features, labels) in enumerate(train_data):
            timer.start()
            l, acc = train_batch(features, labels, net, loss, trainer, device)
            metric.add(l, acc, labels.shape[0], labels.numel())
            timer.stop()
        test_acc = d2l.evaluate_accuracy_gpu(net, test_data, device=device)
        if test_acc > best_acc:
            save_model(net)
            best_acc = test_acc
        plot.add(epoch + 1, (metric[0] / metric[2], metric[1] / metric[3], test_acc))
        print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
    print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}')
    print(f"best acc {best_acc}")
    return metric[0] / metric[2], metric[1] / metric[3], test_acc
  • train:训练模型,记录损失和准确率,并在验证集上评估模型。

7. 测试与结果保存

在测试集上进行预测,并保存结果到 CSV 文件:

net.load_state_dict(torch.load(model_path))
augs = torchvision.transforms.Compose([
    torchvision.transforms.Resize(224),
    torchvision.transforms.ToTensor(), norm
])
test_data = Leaf_data(imgpath, False, augs)
test_dataloader = Data.DataLoader(test_data, batch_size=64, shuffle=False)
res = pd.DataFrame(columns=["image", "label"], index=range(len(test_data)))
net = net.cpu()
count = 0
for X, y in test_dataloader:
    preds = net(X).detach().argmax(dim=-1).numpy()
    preds = pd.DataFrame(y, index=map(lambda x: num2name[x], preds))
    preds.loc[:, 1] = preds.index
    preds.index = range(count, count + len(y))
    res.iloc[preds.index] = preds
    count += len(y)
    print(f"loaded {count}/{len(test_data)} datas")
res.to_csv('./submission.csv', index=False)
  • test_dataloader:加载测试数据。
  • res:保存预测结果到 CSV 文件。

8. 总结

本文详细介绍了如何使用 PyTorch 实现一个树叶分类任务,包括数据准备、模型定义、训练、验证和测试。通过本文,您可以掌握以下技能:

  1. 自定义数据集类的实现。
  2. 使用预训练模型进行迁移学习。
  3. 训练模型并保存最佳模型。
  4. 在测试集上进行预测并生成提交文件。

希望本文对您有所帮助!如果有任何问题,欢迎在评论区留言讨论。😊

完整代码

import os
import torch
from torch.utils import data as Data
import torchvision
from torch import nn
from d2l import torch as d2l
import pandas as pd
import random

# 数据准备
imgpath = "classify-leaves"
trainlist = pd.read_csv(f"{imgpath}/train.csv")
num2name = list(trainlist["label"].value_counts().index)
random.shuffle(num2name)
name2num = {}
for i in range(len(num2name)):
    name2num[num2name[i]] = i

# GPU 检查
def try_gpu():
    if torch.cuda.device_count() > 0:
        return torch.device('cuda')
    return torch.device('cpu')

# 模型保存路径
model_dir = './models'
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model_path = os.path.join(model_dir, 'pre_res_model.ckpt')

def save_model(net):
    torch.save(net.state_dict(), model_path)

# 自定义数据集类
class Leaf_data(Data.Dataset):
    def __init__(self, path, train, transform=lambda x: x):
        super().__init__()
        self.path = path
        self.transform = transform
        self.train = train
        if train:
            self.datalist = pd.read_csv(f"{path}/train.csv")
        else:
            self.datalist = pd.read_csv(f"{path}/test.csv")

    def __getitem__(self, index):
        res = ()
        tmplist = self.datalist.iloc[index, :]
        for i in tmplist.index:
            if i == "image":
                res += (self.transform(d2l.Image.open(f"{self.path}/{tmplist[i]}")),)
            else:
                res += (name2num[tmplist[i]],)
        if len(res) < 2:
            res += (tmplist[i],)
        return res

    def __len__(self):
        return len(self.datalist)

def train_batch(features, labels, net, loss, trainer, device):
    # 将数据移动到指定设备(如 GPU)
    features, labels = features.to(device), labels.to(device)
    
    # 前向传播
    outputs = net(features)
    l = loss(outputs, labels).mean()  # 计算损失
    
    # 反向传播和优化
    trainer.zero_grad()  # 梯度清零
    l.backward()         # 反向传播
    trainer.step()      # 更新参数
    
    # 计算准确率
    acc = (outputs.argmax(dim=1) == labels).float().mean()
    
    return l.item(), acc.item()


# 训练函数
def train(train_data, test_data, net, loss, trainer, num_epochs, device=try_gpu()):
    best_acc = 0
    timer = d2l.Timer()
    plot = d2l.Animator(xlabel="epoch", xlim=[1, num_epochs], legend=['train loss', 'train acc', 'test loss'], ylim=[0, 1])
    for epoch in range(num_epochs):
        metric = d2l.Accumulator(4)
        for i, (features, labels) in enumerate(train_data):
            timer.start()
            l, acc = train_batch(features, labels, net, loss, trainer, device)
            metric.add(l, acc, labels.shape[0], labels.numel())
            timer.stop()
        test_acc = d2l.evaluate_accuracy_gpu(net, test_data, device=device)
        if test_acc > best_acc:
            save_model(net)
            best_acc = test_acc
        plot.add(epoch + 1, (metric[0] / metric[2], metric[1] / metric[3], test_acc))
        print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
    print(f'loss {metric[0] / metric[2]:.3f}, train acc {metric[1] / metric[3]:.3f}, test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec on {str(device)}')
    print(f"best acc {best_acc}")
    return metric[0] / metric[2], metric[1] / metric[3], test_acc

# 模型初始化
def init_weight(m):
    if type(m) in [nn.Linear, nn.Conv2d]:
        nn.init.xavier_normal_(m.weight)

net = torchvision.models.resnet34(weights=torchvision.models.ResNet34_Weights.IMAGENET1K_V1)
net.fc = nn.Linear(in_features=512, out_features=len(name2num), bias=True)
net.fc.apply(init_weight)
net.to(try_gpu())

# 优化器和损失函数
lr = 1e-4
parames = [parame for name, parame in net.named_parameters() if name not in ["fc.weight", "fc.bias"]]
trainer = torch.optim.Adam([{"params": parames}, {"params": net.fc.parameters(), "lr": lr * 10}], lr=lr)
LR_con = torch.optim.lr_scheduler.CosineAnnealingLR(trainer, 1, 0)
loss = nn.CrossEntropyLoss(reduction='none')

# 数据增强和数据加载
batch = 64
num_epochs = 10
norm = torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
augs = torchvision.transforms.Compose([
    torchvision.transforms.Resize(224),
    torchvision.transforms.RandomHorizontalFlip(p=0.5),
    torchvision.transforms.ToTensor(), norm
])
train_data, valid_data = Data.random_split(
    dataset=Leaf_data(imgpath, True, augs),
    lengths=[0.8, 0.2]
)
train_dataloder = Data.DataLoader(train_data, batch, True)
valid_dataloder = Data.DataLoader(valid_data, batch, True)

# 训练模型
train(train_dataloder, valid_dataloder, net, loss, trainer, num_epochs)

# 测试模型
net.load_state_dict(torch.load(model_path))
augs = torchvision.transforms.Compose([
    torchvision.transforms.Resize(224),
    torchvision.transforms.ToTensor(), norm
])
test_data = Leaf_data(imgpath, False, augs)
test_dataloader = Data.DataLoader(test_data, batch_size=64, shuffle=False)
res = pd.DataFrame(columns=["image", "label"], index=range(len(test_data)))
net = net.cpu()
count = 0
for X, y in test_dataloader:
    preds = net(X).detach().argmax(dim=-1).numpy()
    preds = pd.DataFrame(y, index=map(lambda x: num2name[x], preds))
    preds.loc[:, 1] = preds.index
    preds.index = range(count, count + len(y))
    res.iloc[preds.index] = preds
    count += len(y)
    print(f"loaded {count}/{len(test_data)} datas")
res.to_csv('./submission.csv', index=False)

参考链接:

  • PyTorch 官方文档
  • torchvision 官方文档
  • d2l 深度学习工具库

http://www.kler.cn/a/542446.html

相关文章:

  • [7] 游戏机项目说明
  • 问题:通过策略模式+工厂模式+模板方法模式实现ifelse优化
  • 30~32.ppt
  • 生信云服务器:让生物信息学分析更高效、更简单【附带西柚云优惠码】
  • deepseek+“D-id”或“即梦AI”快速生成短视频
  • 基于Python的人工智能驱动基因组变异算法:设计与应用(下)
  • 25考研电子信息复试面试常见核心问题真题汇总,电子信息考研复试没有项目怎么办?电子信息考研复试到底该如何准备?
  • 进阶版MATLAB 3D柱状图
  • 【系统架构设计师】操作系统 - 进程管理 ① ( 进程概念 | 进程组成 | 进程 与 程序 | 进程 与 线程 | 线程 可共享的资源 - ☆考点 )
  • 工具模块新增JSON格式化打印工具类
  • 什么是容器化,它有什么好处,对后端开发有什么影响?
  • 【2025-ICLR-未中】教授多模态大语言模型理解心电图图像
  • 【C#零基础从入门到精通】(五)——C# {n:format} 占位符
  • C#调用Python的函数(编译为pyd,避免源码泄露)
  • 朝天椒USB服务器:破解银企直连中Ukey管理难题
  • 算法-反转链表
  • MATLAB电机四阶轨迹规划考虑jerk、Djerk
  • MarsCode AI插件在IntelliJ IDEA中使用
  • 2025最新版Node.js下载安装~保姆级教程
  • Bash (Bourne-Again Shell)、Zsh (Z Shell)
  • 【AI-28】RAG的深入浅出
  • OpenCV2D 特征框架 (19)目标检测类cv::CascadeClassifier的使用
  • 基于Win XDMA的PCIE 2.0 X8速率不足原因分析与解决方案
  • 深度学习中的梯度相关问题
  • Python----PyQt开发(PyQt基础,环境搭建,Pycharm中PyQttools工具配置,第一个PyQt程序)
  • 记录docker 卡住不动了