当前位置: 首页 > article >正文

去中心化联邦学习-Python实现的2个案例

去中心化联邦学习python案例

  • 介绍
  • 原理
  • 实现案例1
  • 实现案例2
  • 结论

介绍

随着人工智能技术的发展,越来越多的数据被收集和使用。然而,这些数据通常分散在不同的设备上,例如移动设备、传感器等。联邦学习(Federated Learning)是一种解决这个问题的方法,它允许在不将数据传输到中央服务器的情况下进行模型训练。

但是,联邦学习也存在一些问题,例如数据隐私保护、通信效率等,并且有些应用场景需要更高的安全性和去中心化的特性。因此,出现了去中心化的联邦学习(Decentralized Federated Learning)。

本文将简要介绍去中心化联邦学习的原理和实现方式,并提供2个Python代码案例。

原理

去中心化联邦学习的主要思想是将一个大的联邦学习系统分解成多个小的联邦学习系统,每个小系统只包含一部分设备或节点。这样,每个设备或节点只需要与周围的几个设备或节点通信,就可以完成模型的训练和更新,从而保证了安全性和通信效率。

具体地,去中心化联邦学习可以分为两个阶段:选举和训练。在选举阶段,每个设备或节点都会向周围的几个设备或节点发送选票,选出一个领袖设备或节点,作为本次训练的中心节点。在训练阶段,领袖节点将模型参数广播给周围的设备或节点,这些设备或节点根据本地数据计算梯度并返回给领袖节点,领袖节点根据所有梯度更新模型参数。这个过程会循环迭代,直到模型收敛为止。


实现案例1

下面我们将演示如何使用Python实现一个简单的去中心化联邦学习系统,该系统包括三个设备或节点。每个设备或节点都有自己的数据集,我们将使用MNIST手写数字数据集作为例子。

首先,我们需要导入必要的库和模块:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random
import copy

然后,我们定义一个设备或节点类,其中包含数据集、模型、优化器等信息:

class Device:
    def __init__(self, device_id, train_data, test_data):
        self.id = device_id
        self.train_data = train_data
        self.test_data = test_data
        self.model = nn.Sequential(
            nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
            nn.LogSoftmax(dim=1)
        )
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)

接下来,我们定义一个简单的选举算法,选出一个领袖设备或节点:

def elect_leader(devices):
    leaders = []
    for device in devices:
        if random.random() < 0.5:
            leaders.append(device)
    if len(leaders) == 0:
        return None
    else:
        return max(leaders, key=lambda x: x.id)

然后,我们定义一个用于计算梯度的函数:

def calculate_gradients(device, model):
    loss_fn = nn.NLLLoss()
    data_loader = DataLoader(device.train_data, batch_size=32, shuffle=True)
    device_gradients = []
    for inputs, labels in data_loader:
        outputs = model(inputs.view(-1, 784))
        loss = loss_fn(outputs, labels)
        device.optimizer.zero_grad()
        loss.backward()
        device_gradients.append(copy.deepcopy(device.model.state_dict()))
	return device_gradients

最后,我们定义一个训练函数,用于每个节点或设备的本地训练和参数更新:

def train_device(device, leader, num_rounds):
    for i in range(num_rounds):
        if leader is None or leader.id == device.id:
            leader = elect_leader(devices)
        device.model.load_state_dict(leader.model.state_dict())
        gradients = calculate_gradients(device, device.model)
        if leader.id != device.id:
            leader_gradients = leader.receive_gradients(gradients)
            device.model.load_state_dict(leader.model.state_dict())
        else:
            leader_gradients = gradients
        average_gradients = {}
        for key in leader_gradients[0].keys():
            average_gradients[key] = torch.mean(torch.stack([gradients[key] for gradients in leader_gradients]), dim=0)
        device.model.load_state_dict({key: value - 0.01 * average_gradients[key] for key, value in device.model.state_dict().items()})
    return device

在整个系统中,每个节点或设备之间都需要通信,因此我们还需要为设备类添加一个接收梯度的方法:

class Device:
    def __init__(self, device_id, train_data, test_data):
        self.id = device_id
        self.train_data = train_data
        self.test_data = test_data
        self.model = nn.Sequential(
            nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
            nn.LogSoftmax(dim=1)
        )
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)

    def receive_gradients(self, gradients):
        return gradients

现在,我们可以创建三个设备或节点,并使用MNIST数据集进行训练:

train_set = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
test_set = datasets.MNIST(root='./data', train=False, download=True, transform=transforms.ToTensor())
train_size = len(train_set) // 3
test_size = len(test_set) // 3
device1 = Device(1, train_set[:train_size], test_set[:test_size])
device2 = Device(2, train_set[train_size:2*train_size], test_set[test_size:2*test_size])
device3 = Device(3, train_set[2*train_size:], test_set[2*test_size:])
devices = [device1, device2, device3]

for device in devices:
    train_device(device, None, 10)

test_loader = DataLoader(test_set, batch_size=32, shuffle=True)
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = device1.model(inputs.view(-1, 784))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the test set: %d %%' % (100 * correct / total))

这个代码会创建三个设备或节点,并将MNIST数据集分成三份分配给它们。然后,每个设备或节点将执行10轮的本地训练和参数更新,最后计算所有设备或节点的平均准确率。

注意,在这个实现中,我们使用了简单的选举算法和梯度平均算法,实际中可能需要更复杂和更安全的算法,以保证系统的可靠性和安全性。


实现案例2

CIFAR-10数据集进行去中心化联邦学习训练

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

class Device:
    def __init__(self, device_id, train_data, test_data):
        self.id = device_id
        self.train_data = train_data
        self.test_data = test_data
        self.model = nn.Sequential(
            nn.Conv2d(3, 6, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(6, 16, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(16 * 5 * 5, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, 10)
        )
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)

    def receive_gradients(self, gradients):
        return gradients

def calculate_gradients(device, model):
    device.model.load_state_dict(model.state_dict())
    device.optimizer.zero_grad()
    for inputs, labels in device.train_data:
        outputs = device.model(inputs)
        loss = nn.CrossEntropyLoss()(outputs, labels)
        loss.backward()
    gradients = copy.deepcopy(device.model.state_dict())
    device.optimizer.step()
    return gradients

def elect_leader(devices):
    return devices[torch.randint(0, len(devices), (1,)).item()]

def train_device(device, leader, num_rounds):
    for i in range(num_rounds):
        if leader is None or leader.id == device.id:
            leader = elect_leader(devices)
        device.model.load_state_dict(leader.model.state_dict())
        gradients = calculate_gradients(device, device.model)
        if leader.id != device.id:
            leader_gradients = leader.receive_gradients(gradients)
            device.model.load_state_dict(leader.model.state_dict())
        else:
            leader_gradients = gradients
        average_gradients = {}
        for key in leader_gradients.keys():
            average_gradients[key] = torch.mean(torch.stack([gradients[key] for gradients in leader_gradients]), dim=0)
        device.model.load_state_dict({key: value - 0.01 * average_gradients[key] for key, value in device.model.state_dict().items()})
    return device

class CIFAR10Dataset(Dataset):
    def __init__(self, data, labels):
        super().__init__()
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        img, label = self.data[index], self.labels[index]
        img = transforms.ToTensor()(img)
        return img, label

if __name__ == "__main__":
    transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

    train_size = len(train_set) // 3
    test_size = len(test_set) // 3

    device1 = Device(1, DataLoader(CIFAR10Dataset(train_set.data[:train_size], train_set.targets[:train_size]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[:test_size], test_set.targets[:test_size]), batch_size=16))
    device2 = Device(2, DataLoader(CIFAR10Dataset(train_set.data[train_size:2*train_size], train_set.targets[train_size:2*train_size]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[test_size:2*test_size], test_set.targets[test_size:2*test_size]), batch_size=16))
    device3 = Device(3, DataLoader(CIFAR10Dataset(train_set.data[2*train_size:], train_set.targets[2*train_size:]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[2*test_size:], test_set.targets[2*test_size:]), batch_size=16))

    devices = [device1, device2, device3]

    for device in devices:
        train_device(device, None, 10)
    
    # 测试模型并输出评估结果
	for device in devices:
    	device.model.eval()
    	correct = 0
    	total = 0
    	with torch.no_grad():
        	for data in device.test_data:
            	images, labels = data
            	outputs = device.model(images)
            	_, predicted = torch.max(outputs.data, 1)
            	total += labels.size(0)
            	correct += (predicted == labels).sum().item()
    	print(f"Device {device.id}: Accuracy on test set: {100 * correct / total}%")
    
    # 汇总所有设备的评估结果并输出平均准确率
    total_correct = sum(d.correct for d in devices)
    total_samples = sum(d.total for d in devices)
    print(f"Overall Accuracy on test set: {100 * total_correct / total_samples}%")

    # 将模型保存到文件中
    torch.save(devices[0].model.state_dict(), 'mnist_model.pt')

以上代码实现了一个使用CIFAR-10数据集进行去中心化联邦学习的案例。具体步骤如下:

定义了一个Device类,用于表示参与训练的设备,在初始化时需要指定设备id、训练数据和测试数据,并创建了一个包含卷积层、全连接层和激活函数的神经网络模型以及优化器。

实现了一个calculate_gradients函数,用于通过在本地设备上训练模型并计算梯度。

实现了一个elect_leader函数,用于从参与训练的设备中随机选择一个设备作为leader。

实现了一个train_device函数,用于训练指定设备的模型,并根据leader的选择来执行去中心化联邦学习的过程,最终返回训练后的设备对象。

定义了一个CIFAR10Dataset类,用于将CIFAR-10数据集转换为PyTorch Dataset对象。

在main函数中,定义了三个设备对象,并使用CIFAR10Dataset将数据集划分成三份分别分配给这三个设备。然后,针对每个设备,调用train_device函数进行训练,最终得到训练好的模型。

需要注意的是,由于CIFAR-10数据集较大,上述代码仅使用了数据集中的1/3作为样本进行训练和测试。如果需要使用完整的数据集进行训练,需要考虑分布式训练等技术来提高效率。

该部分定义了一个transforms.Compose对象,其中包含两个转换操作:

transforms.ToTensor()将输入的PIL图像或ndarray转换为张量,并进行标准化到范围[0, 1]。

transforms.Normalize(mean, std)用于对张量进行标准化。在这里,我们将每个通道的平均值设置为0.5,标准差设置为0.5。

这些操作的目的是确保模型能够接受相同形式的输入数据并对其进行正确的预测。在这里,我们使用标准化来使得数据的取值范围更加稳定,并且有助于提高模型的训练效果。

  • 定义数据预处理的转换。
  • 加载MNIST数据集。
  • 将MNIST数据集分配给每个设备。
  • 在每个设备上进行本地训练。
  • 测试模型并输出评估结果。
  • 汇总所有设备的评估结果并输出平均准确率。
  • 将模型保存到文件中。

需要注意的是,在这里我们假设所有设备都具有相同的计算能力和存储容量,并且在每个设备上训练的数据集大小相同。在实际情况下,不同设备的硬件配置和网络速度可能不同,因此需要根据实际情况对训练数据进行划分,以保证训练效果和时间开销的平衡。


结论

去中心化联邦学习是一种解决数据隐私保护和通信效率等问题的有效方法。在本文中,我们介绍了去中心化联邦学习的原理和实现方式,并提供了2个Python代码案例,演示了如何使用数据集训练简单的联邦学习系统。


http://www.kler.cn/a/2568.html

相关文章:

  • 计算机网络之---数据传输与比特流
  • 【微服务】1、引入;注册中心;OpenFeign
  • 攻防世界 ics-07
  • Selenium 的四种等待方式及使用场景
  • 反规范化带来的数据不一致问题的解决方案
  • 在 PhpStorm 中配置命令行直接运行 PHP 的步骤
  • Windows Server 2022 中文版、英文版下载 (updated Mar 2023)
  • 【数据结构】排序
  • 0x03数学预备
  • sql语句总结
  • Unity设计模式—服务定位器模式
  • 【Vue全家桶】带你全面了解通过Vue CLI初始化Vue项目
  • 【linux】进程信号——信号的产生
  • C++初级教程(二)
  • Linux内核4.14版本——drm框架分析(1)——drm简介
  • 一个使用 react+vite3+ts+react-router-dom6v Hooks Admin搭建的轻量级后台管理模板。
  • Guitar Pro8.1专业版新功能简谱介绍
  • postgresql查询json类型字段中的数据
  • Mybatis-Mapper代理开发
  • 不愧是2023年就业最难的一年,还好有车企顶着~
  • 十大Python可视化工具,太强了
  • gin框架使用websocket实现进入容器内部执行命令
  • GJB 9001C质量管理体系文件构建(2、质量手册编制要点)第9章、第10章
  • 如何利用学生身份申请使用免费的专业版pycahrm(详细 教程)
  • 模拟登入(验证码识别,Cookie)
  • 安装系统所需软件