PyTorch在Linux環境下的并行計算能力
PyTorch作為靈活的深度學習框架,在Linux系統上提供了多線程數據加載、多GPU數據并行(DataParallel
)、多GPU/多節點分布式并行(DistributedDataParallel
,簡稱DDP)等多層次的并行計算支持,能有效提升模型訓練與推理效率。
數據加載是深度學習流程中的常見瓶頸,PyTorch通過torch.utils.data.DataLoader
類實現多線程數據加載。通過設置num_workers
參數(指定用于數據加載的子進程數量),可并行處理數據讀取、預處理(如圖像縮放、歸一化)等I/O密集型任務,減少GPU等待時間。例如:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 定義數據預處理
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
# 使用4個子進程加載數據
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
注意:num_workers
需根據CPU核心數調整(通常設置為CPU核心數的1-2倍),避免過多進程導致系統資源競爭。
torch.nn.DataParallel
是PyTorch提供的簡單多GPU并行方案,它會自動將模型復制到所有指定的GPU上,輸入數據分割到各GPU進行前向/反向傳播,最后聚合梯度更新模型參數。使用步驟如下:
CUDA_VISIBLE_DEVICES
環境變量指定可用的GPU(如export CUDA_VISIBLE_DEVICES=0,1
表示僅使用GPU 0和1);DataParallel
包裝(需指定device_ids
參數);DataParallel
會自動處理數據分發與梯度聚合。import torch
import torch.nn as nn
# 定義模型
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(10, 10)
def forward(self, x):
return self.fc(x)
# 移動模型到GPU并包裝
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleModel().to(device)
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs!")
model = nn.DataParallel(model, device_ids=[0, 1]) # 使用GPU 0和1
注意:DataParallel
適合快速驗證多GPU效果,但在大規模訓練中,DistributedDataParallel
(DDP)的性能更優。
DistributedDataParallel
(DDP)是PyTorch推薦的分布式訓練方案,支持多GPU(單機多卡)和多節點(多機多卡)訓練,通過進程級并行(每個GPU對應一個獨立進程)和通信重疊計算、梯度分桶等技術,實現接近線性的加速比(如256個GPU的加速比可達0.9以上)。
torch.distributed.init_process_group
函數,指定后端(推薦nccl
,適合GPU訓練)、初始化方法(如tcp://master_ip:port
)、world_size
(總進程數=節點數×每個節點的GPU數)、rank
(當前進程的全局排名);model.to(rank)
),用DDP
包裝(device_ids=[rank]
);torch.utils.data.distributed.DistributedSampler
確保每個進程處理不同的數據子集(需設置num_replicas=world_size
、rank=rank
);torch.distributed.launch
工具或accelerate
庫啟動,自動管理進程啟動與同步。import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import torchvision.datasets as datasets
import torchvision.transforms as transforms
def main(rank, world_size):
# 初始化分布式環境(nccl后端適合GPU)
torch.distributed.init_process_group(
backend='nccl',
init_method='tcp://127.0.0.1:12345', # 主節點IP和端口
world_size=world_size,
rank=rank
)
# 創建模型并移動到對應GPU
model = nn.Sequential(
nn.Linear(784, 1024),
nn.ReLU(),
nn.Linear(1024, 10)
).to(rank)
ddp_model = DDP(model, device_ids=[rank])
# 定義損失函數和優化器
criterion = nn.CrossEntropyLoss().to(rank)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.01)
# 加載數據(使用DistributedSampler)
transform = transforms.Compose([transforms.ToTensor()])
dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
loader = DataLoader(dataset, batch_size=64, sampler=sampler)
# 訓練循環
for epoch in range(5):
sampler.set_epoch(epoch) # 確保每個epoch數據打亂順序不同
running_loss = 0.0
for data, target in loader:
data, target = data.to(rank), target.to(rank)
optimizer.zero_grad()
output = ddp_model(data.view(data.size(0), -1))
loss = criterion(output, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f'Rank {rank}, Epoch {epoch}, Loss: {running_loss/len(loader)}')
# 清理分布式環境
torch.distributed.destroy_process_group()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--world_size', type=int, default=2, help='Number of GPUs')
parser.add_argument('--rank', type=int, default=0, help='Current GPU rank')
args = parser.parse_args()
main(args.rank, args.world_size)
啟動命令(單機2卡):
python -m torch.distributed.launch --nproc_per_node=2 your_script.py
nccl
后端(NVIDIA集體通信庫),其性能優于gloo
;DataLoader
的num_workers
參數,使用pin_memory=True
(將數據固定在內存中,加速GPU傳輸);bucket_cap_mb
參數調整梯度分桶大?。ㄈ?code>bucket_cap_mb=25),減少通信次數。