使用 NVIDIA FLARE 建立穩健且可廣泛應用的人工智慧模型

作者 NVIDIA

聯合學習(Federated Learning)已實際應用於許多真實世界中。聯合學習實現了全球規模的跨國協作,建立起更穩健、更可廣泛應用的機器學習和人工智慧(AI)模型。如需更多資訊,請參閱用於預測 Covid-19 患者臨床結果的聯合學習。

NVIDIA FLARE v2.0 是一種聯合學習開放原始碼 SDK,讓資料科學家僅需要分享模型權重,而非私密資料,即能輕鬆地進行協作,開發出更廣泛應用且穩健的 AI 模型。

在醫療應用方面,對於患者資料受到保護、某些患者類型和疾病的資料可能較稀少,或資料缺乏儀器類型、性別和地理多樣性的情況特別有益。

NVIDIA FLARE

NVIDIA FLARE 代表聯合學習應用程式執行階段環境(Federated Learning Application Runtime Environment)。它是 NVIDIA Clara Train 聯合學習軟體的基礎引擎,已經應用於醫學成像、基因分析、腫瘤學及 COVID-19 研究的 AI 應用上。此 SDK 讓研究人員和資料科學家可以將現有的機器學習和深度學習工作流程調整為分散式典範,並使平台開發人員可以針對分散式多方協作,打造安全、保護隱私的產品。

NVIDIA FLARE 是以不限基礎訓練函式庫的 Python 建置而成,一種輕量、靈活與可擴充的分散式學習框架。您可以將使用 PyTorch、TensorFlow,甚至是 NumPy 建置的資料科學工作流程,應用至聯合環境中。

您可能想要建置常見的聯合平均(FedAvg)演算法。各個聯合學習用戶端都是從初始全域模型開始,在區域資料上訓練模型一段時間,並將模型更新傳送至伺服器進行聚合。之後伺服器會使用聚合後的更新,更新全域模型,以進行下一輪訓練。多次反覆進行此過程,直至模型收斂。

NVIDIA FLARE 提供的可自訂控制器工作流程,可以以協助您建置 FedAvg 及其他聯合學習演算法,例如循環權重轉移。它可以調度在參與之聯合學習用戶端上執行的不同任務,例如深度學習訓練。您可以使用此工作流程,從各個用戶端收集結果(例如模型更新)與進行聚合,以更新全域模型,然後傳回更新後的全域模型,以持續進行訓練。圖 1 為其原理。

各個聯合學習用戶端均做為工作器(woker),要求下一個執行的任務,例如模型訓練。在控制器提供任務之後,由工作器執行以及將結果回傳至控制器(controller)。在每一次通訊時,皆可透過選用的篩選器處理任務資料或結果,例如同態加密和解密或差分隱私。

This diagram describes the NVIDIA FLARE workflow.
圖 1:NVIDIA FLARE 工作流程

建置 FedAvg 的任務可以是一個簡單的 PyTorch 程式,用於訓練 CIFAR-10 的分類模型。區域訓練器可能與以下程式碼範例類似。為了簡易之目的,本文省略了完整的訓練迴圈。

import torch
import torch.nn as nn
import torch.nn.functional as F

from nvflare.apis.dxo import DXO, DataKind, MetaKey, from_shareable
from nvflare.apis.executor import Executor
from nvflare.apis.fl_constant import ReturnCode
from nvflare.apis.fl_context import FLContext
from nvflare.apis.shareable import Shareable, make_reply
from nvflare.apis.signal import Signal
from nvflare.app_common.app_constant import AppConstants


class SimpleNetwork(nn.Module):
    def __init__(self):
        super(SimpleNetwork, self).__init__()

        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class SimpleTrainer(Executor):
    def __init__(self, train_task_name: str = AppConstants.TASK_TRAIN):
        super().__init__()
        self._train_task_name = train_task_name
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.model = SimpleNetwork()
        self.model.to(self.device)
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        self.criterion = nn.CrossEntropyLoss()

    def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) -> Shareable:
        """
        This function is an extended function from the superclass.
        As a supervised learning-based trainer, the train function will run
        training based on model weights from `shareable`.
        After finishing training, a new `Shareable` object will be submitted
        to server for aggregation."""

        if task_name == self._train_task_name:
            epoch_len = 1

            # Get current global model weights
            dxo = from_shareable(shareable)

            # Ensure data kind is weights.
            if not dxo.data_kind == DataKind.WEIGHTS:
                self.log_exception(fl_ctx, f"data_kind expected WEIGHTS but got {dxo.data_kind} instead.")
                return make_reply(ReturnCode.EXECUTION_EXCEPTION)  # creates an empty Shareable with the return code

            # Convert weights to tensor and run training
            torch_weights = {k: torch.as_tensor(v) for k, v in dxo.data.items()}
            self.local_train(fl_ctx, torch_weights, epoch_len, abort_signal)

            # compute the differences between torch_weights and the now locally trained model
            model_diff = ...

            # build the shareable using a Data Exchange Object (DXO)
            dxo = DXO(data_kind=DataKind.WEIGHT_DIFF, data=model_diff)
            dxo.set_meta_prop(MetaKey.NUM_STEPS_CURRENT_ROUND, epoch_len)

            self.log_info(fl_ctx, "Local training finished. Returning shareable")
            return dxo.to_shareable()
        else:
            return make_reply(ReturnCode.TASK_UNKNOWN)

    def local_train(self, fl_ctx, weights, epoch_len, abort_signal):
        # Your training routine should respect the abort_signal.
        ...
        # Your local training loop ...
        for e in range(epoch_len):
        ...
            if abort_signal.triggered:
                self._abort_execution()
        ...

    def _abort_execution(self, return_code=ReturnCode.ERROR) -> Shareable:
        return make_reply(return_code)

如您所見,任務建置可能會執行許多不同的任務。您可以計算各個用戶端的摘要統計資料,並分享給伺服器(注意隱私限制)、執行區域資料預處理或評估已訓練的模型。

聯合學習訓練期間,您可以在每一輪訓練開始時繪製全域模型的效能。以此範例而言,我們在 CIFAR-10 的異質資料分割上,透過 8 個用戶端執行。下圖(圖 2)是顯示 NVIDIA FLARE 2.0 中預設提供的不同配置:

This diagram shows the different federated learning models and their accuracies.
圖 2:訓練期間之不同聯合學習演算法的全域模型驗證準確度

雖然 FediVG、Fedivg He 和 FedProx 在此任務的表現相當,您可以使用將 SGD 與動量搭配的 FedOpt 設定,觀察改善後的收斂,以更新伺服器上的全域模型。

整個聯合學習系統都可以使用管理員 API 進行控制,以自動啟動和操作配置不同的任務與工作流程。NVIDIA 同時提供了一個全面的布建系統,讓您可以在現實世界中輕鬆、安全地部署聯合學習應用程式,並從事概念驗證研究,以執行區域聯合學習模擬。

This diagram shows the components of NVIDIA FLARE and their relationship.
圖 3:NVIDIA FLARE 布建、啟動、操作(PSO)元件及其 API

立即開始

NVIDIA FLARE 使聯合學習可以運用至更多應用中。可能的使用案例,包括協助能源公司分析地震和井眼資料、協助製造商最佳化工廠運作,以及協助金融公司改善詐騙偵測模型。

在需要更多資訊和逐步範例時,請參閱 GitHub 上的 NVIDIA/NVFlare