0x00 сводка
Начиная с этой статьи, мы вводим параллелизм данных PyTorch, Эта статья является первой, знакомящей с DataPrallel, потому что слов слишком много (более 12 000 слов, поэтому она разбита на две статьи и опубликована).
Другие статьи из этой серии:
Автоматическая дифференциация инструментов глубокого обучения (1)
Автоматическая дифференциация инструментов глубокого обучения (2)
Автоматическая дифференциация оружия глубокого обучения (3) --- Пример интерпретации
[Анализ исходного кода] Как PyTorch реализует прямое распространение (1) --- Базовый класс (1)
[Анализ исходного кода] Как PyTorch реализует прямое распространение (2) --- Базовый класс (ниже)
[Анализ исходного кода] Как PyTorch реализует прямое распространение (3) --- конкретная реализация
[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка
[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм
[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор
[Анализ исходного кода] Как PyTorch использует GPU
Примечание. Эта статья во многом основана на следующих двух статьях, и я хотел бы выразить им свою глубокую признательность.
Distributed data parallel training using Pytorch on AWS
DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения
0x01 Обзор
Давайте начнем с рассмотрения DataParallel со всех сторон.
1.1 С точки зрения процесса
С точки зрения процесса DataParallel работает, загружая все данные мини-пакета в основной поток, а затем распределяя данные суб-мини-пакетов (суб-мини-пакетов) по всей сети GPU.
-
Перенесите данные мини-пакета из памяти с блокировкой страниц в GPU 0 (главный), главный GPU также содержит модель, а другие GPU имеют устаревшие копии модели.
-
Распределяйте мини-пакеты данных между графическими процессорами. В частности, данные, вводимые в мини-пакет, делятся на несколько частей и отправляются на соответствующий графический процессор для расчета.
-
Скопируйте модель между графическими процессорами. Все данные, относящиеся к Модулю, также будут продублированы.
-
Запустите прямой проход на каждом графическом процессоре, вычислив выходные данные. PyTorch использует многопоточность для параллельного прямого распространения, когда каждый графический процессор будет выполнять прямые вычисления независимо и параллельно со своими собственными входными данными в отдельном потоке.
-
Выходные данные собираются на главном графическом процессоре и вычисляются потери. То есть значение функции потерь вычисляется путем сравнения выходных данных сети с истинной меткой данных каждого элемента в пакете.
-
Распределите потери между графическими процессорами, запустите обратное распространение на каждом графическом процессоре и вычислите градиенты параметров.
-
Слияние градиентов на GPU 0.
-
Обновите параметры градиента.
- Выполните градиентный спуск и обновите параметры модели на основном графическом процессоре.
- Поскольку параметры модели обновляются только на ведущем графическом процессоре, а другие подчиненные графические процессоры в это время не обновляются синхронно, необходимо скопировать обновленные параметры модели на оставшиеся подчиненные графические процессоры для достижения параллелизма.
1.2 С точки зрения шаблона
Во-первых, давайте дадим технический обзор с точки зрения шаблона:
- DP можно рассматривать как приложение, подобное серверу параметров.
- DDP можно рассматривать как приложение коллективного общения.
Сервер параметров можно условно разделить на главный и рабочий, а DP основан на одной машине с несколькими картами, поэтому соответствующие отношения следующие:
- worker : все графические процессоры (включая GPU 0) являются рабочими и отвечают за вычисления и обучение сети.
- master: GPU 0 (не настоящая метка GPU, а первая позиция входного параметра device_ids) также отвечает за интеграцию градиентов и обновление параметров.
Итак, давайте сосредоточимся на GPU 0.
DataParallel по умолчанию поместит сетевую модель на GPU 0, а затем скопирует модель с GPU 0 на другие GPU. Каждый GPU начнет параллельное обучение, а затем GPU 0 будет использоваться в качестве основного для суммирования градиентов и обновления модели. Наконец, задача расчета будет загружена на другие GPU. Это очень похоже на механизм сервера параметров.
Эту же информацию можно увидеть и на официальной карте.
1.3 С точки зрения операционной системы
С точки зрения операционной системы DP и DDP различаются следующим образом (заранее спойлерим):
- DataParallel — это однопроцессный многопоточный метод параллельного обучения, который может работать только на одном компьютере.
- DistributedDataParallel поддерживает несколько процессов и работает как с обучением на одной, так и на нескольких машинах. Кроме того, DistributedDataParallel копирует модель заранее, а не на каждой итерации, и позволяет избежать глобальных блокировок интерпретатора.
1.4 Неэффективность
ДП имеет следующие дефекты:
-
избыточные копии данных
- Данные сначала копируются с хоста на основной графический процессор, а затем субмини-пакеты распределяются (разбрасываются) между другими графическими процессорами.
-
Перед прямым распространением требуется репликация модели между графическими процессорами.
- Поскольку параметры модели обновляются на основном графическом процессоре, модель необходимо повторно синхронизировать в начале каждого прямого прохода.
-
Каждый пакет будет иметь накладные расходы на создание/уничтожение потока.
- Параллельное прямое распространение реализовано в нескольких потоках (это может быть проблемой PyTorch).
-
Есть возможность снизить градиент конвейера, но она не используется.
- В реализации Pytorch 1.0.1 с параллельными данными градиентный спуск происходит в конце обратного распространения, которое может быть конвейерным.
-
Излишне собирать выходные данные модели на основном графическом процессоре.
-
Использование графического процессора неравномерно, и нагрузка неравномерна. Память и использование основного графического процессора будут выше, чем у других видеокарт, потому что:
- Выполните расчет потерь на основном графическом процессоре.
- Уменьшение градиента и параметры обновления происходят на основном графическом процессоре.
0x02 Обзор
2.1 Пример
Давайте используем пример, чтобы увидеть, что конкретная логика такова:
-
Установите видимый графический процессор для этой программы.
- Соответствующий код должен использовать args.gpu_id="2,7" и os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id для настройки серийного номера GPU, на самом деле цель состоит в том, чтобы установить os.environ['CUDA_VISIBLE_DEVICES' ] = "2,7", Таким образом, device_ids[0] соответствует физической карте №2, а device_ids[1] соответствует физической карте №7.
- Его также можно указать временно во время выполнения, например: CUDA_VISIBLE_DEVICES='2,7' Python train.py.
-
Поместите параметры модели и буферы в device_ids[0], блок распараллеливания должен иметь свои параметры и буферы в device_ids[0] перед запуском блока DataParallel.
- Код: model=model.cuda() .
-
Постройте модель DP. Преимущество ДП в том, что им очень удобно пользоваться, нужно только поменять оригинальный однокарточный модуль с ДП на многокарточный.
- Код модели = torch.nn.DaraParallel (модель).
- На самом деле DP — это Pytorch nn.Module, поэтому и модель, и оптимизатор должны использовать .module для получения фактической модели и оптимизатора.
-
Загрузите данные в основной графический процессор.
- data,label= data.cuda(),label.cuda()
-
прямое распространение.
- Модуль модели DP будет копировать на каждое устройство.
- DP разделит входные данные на несколько небольших фрагментов и распределит эти небольшие фрагменты данных по разным графическим процессорам для расчета.Каждой модели нужно только обрабатывать свои собственные выделенные данные.
-
Выполняется обратное распространение.
- DP будет накапливать градиенты, рассчитанные каждым GPU, в GPU 0 для суммирования.
Конкретный код выглядит следующим образом:
args.gpu_id="2,7" ; #指定gpu id
args.cuda = not args.no_cuda and torch.cuda.is_available() #是否使用cpu
# 配置环境 也可以在运行时临时指定,比如:CUDA_VISIBLE_DEVICES='2,7' Python train.py
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu_id # 赋值必须是字符串
device_ids=range(torch.cuda.device_count()) #torch.cuda.device_count()=2
# device_ids=[0,1] ---- 也可以这么使用。这里的0 就是上述指定 2,是主gpu, 1就是7,模型和数据由主gpu分发
if arg.cuda:
model=model.cuda() #将模型复制到gpu ,默认是cuda('0'),即转到第一个GPU 2
if len(device_id)>1:
model=torch.nn.DataParallel(model);#构建DP,前提是model已经.cuda()了
optimizer = torch.optim.SGD(model.parameters(), args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay)
#前向传播时,数据也要执行cuda(),即把数据复制到主gpu里
for batch_idx, (data, label) in pbar:
if args.cuda:
data,label= data.cuda(),label.cuda(); # 数据放到了默认GPU
data_v = Variable(data)
target_var = Variable(label)
prediction= model(data_v,target_var,args)
#这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里
#前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果归并到主gpu里
#prediction的长度等于batch_size
criterion = nn.CrossEntropyLoss()
loss = criterion(prediction,target_var) # 在默认GPU之上计算loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
2.2 Соответствующие знания
Перед каждым запуском сетевого распространения DP передает параметры и буферы на главном узле другим узлам, чтобы поддерживать единство состояния. Эта часть соответствующих знаний в основном касается того, как скопировать модель в GPU и как вызвать функцию ядра GPU.Подробности см. в предыдущей статье.[Анализ исходного кода] Как PyTorch использует GPU.
0x03 Определение
3.1 Определения
Давайте посмотрим на структуру DataParallel через функцию инициализации DataParallel.
__init__
Три входных параметра определяются следующим образом:
- модуль: модель,
- device_ids : обучающие устройства,
- output_device : Устройство, на котором сохраняются результаты вывода. По умолчанию это device_ids[0], что является первой картой.
код показывает, как показано ниже:
import operator
import torch
import warnings
from itertools import chain
from ..modules import Module
from .scatter_gather import scatter_kwargs, gather
from .replicate import replicate
from .parallel_apply import parallel_apply
from torch._utils import (
_get_all_device_indices,
_get_available_device_type,
_get_device_index,
_get_devices_properties
)
class DataParallel(Module):
# TODO: update notes/cuda.rst when this class handles 8+ GPUs well
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
# 得到可用的GPU
device_type = _get_available_device_type()
if device_type is None:
self.module = module
self.device_ids = []
return
# 没有输入的情况下,使用所有可见的GPU
if device_ids is None:
device_ids = _get_all_device_indices()
# 把GPU列表上第一个作为输出,也会作为master
if output_device is None:
output_device = device_ids[0]
self.dim = dim
self.module = module
self.device_ids = [_get_device_index(x, True) for x in device_ids]
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
# 检查负载均衡
_check_balance(self.device_ids)
# 单卡就直接使用
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
3.2 Балансировка нагрузки
Хотя входные данные делятся поровну и распределяются параллельно, потери на выходе будут каждый раз агрегироваться и рассчитываться на первом графическом процессоре, поэтому загрузка памяти и использование первого графического процессора будут больше, чем у других графических карт.
Функция _check_balance проверит, сбалансирована ли нагрузка, и предупредит, если память или процессор max/min > 0,75.
def _check_balance(device_ids):
imbalance_warn = """
There is an imbalance between your GPUs. You may want to exclude GPU {} which
has less than 75% of the memory or cores of GPU {}. You can do so by setting
the device_ids argument to DataParallel, or by setting the CUDA_VISIBLE_DEVICES
environment variable."""
device_ids = [_get_device_index(x, True) for x in device_ids]
dev_props = _get_devices_properties(device_ids)
def warn_imbalance(get_prop):
values = [get_prop(props) for props in dev_props]
min_pos, min_val = min(enumerate(values), key=operator.itemgetter(1))
max_pos, max_val = max(enumerate(values), key=operator.itemgetter(1))
if min_val / max_val < 0.75:
warnings.warn(imbalance_warn.format(device_ids[min_pos], device_ids[max_pos]))
return True
return False
if warn_imbalance(lambda props: props.total_memory):
return
if warn_imbalance(lambda props: props.multi_processor_count):
return
0x04 Прямое распространение
DataParallel существует только среди параллельных вычислений процесса распространения вперед.
4.1 Общие
В предыдущем примере функция cuda() использовалась для размещения модели на GPU[0], где GPU[0] уже имеет параметры и буферы модели.
model=model.cuda()
Поэтому в прямой функции она не используется для этого шага, а начинается с распространения модели и данных.Следует отметить, что модель будет распространяться каждый раз при прямом распространении. Он разделен на несколько шагов.
- Проверка: просмотрите параметры и буферы модуля, чтобы увидеть, все ли они выше GPU[0], если нет, сообщите об ошибке.
- Распределить (разбросать) входные данные: разделить входные данные на несколько копий в соответствии с их первым измерением (обычно размером пакета) и передать их на несколько графических процессоров;
- Реплицировать модель: скопировать модель на несколько графических процессоров соответственно;
- Параллельное применение (parallel_apply): параллельное прямое распространение по нескольким моделям. Поскольку device_ids[0] графического процессора и базовый параллельный модуль совместно используют хранилище, обновления на месте на устройстве[0] также сохраняются, а другие графические процессоры — нет.
- Сбор: сбор данных, отправленных обратно с нескольких графических процессоров;
Конкретный код выглядит следующим образом:
def forward(self, *inputs, **kwargs):
with torch.autograd.profiler.record_function("DataParallel.forward"):
# 如果机器上没有GPU,则直接用CPU运行
if not self.device_ids:
return self.module(*inputs, **kwargs)
# 遍历module的parameters和buffers,看看是否都在GPU[0]之上,如果不在,报错。
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
# 现在GPU[0]上有了模型,开始训练
# 首先分发输入
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not kwargs:
inputs = ((),)
kwargs = ({},)
# 如果只有单卡,直接使用
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 并行训练
outputs = self.parallel_apply(replicas, inputs, kwargs)
# 把前向传播的结果收集到master
return self.gather(outputs, self.output_device)
4.2 Распределение (вход)
В приведенном выше коде следующая инструкция завершает операцию распределения данных.
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
Нашему графику распространения соответствует:
Итак, давайте сначала посмотрим, как его распределить.
Scatter на самом деле представляет собой пакет scatter_kwargs, поэтому давайте посмотрим непосредственно на scatter_kwargs.
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
4.2.1 scatter_kwargs
scatter_kwargs вызывает scatter для распределения ввода и kwargs соответственно.
def scatter_kwargs(inputs, kwargs, target_gpus, dim=0):
r"""Scatter with support for kwargs dictionary"""
# 分发input
inputs = scatter(inputs, target_gpus, dim) if inputs else []
# 分发kwargs
kwargs = scatter(kwargs, target_gpus, dim) if kwargs else []
# 用空项补齐,这样可以让 inputs 和 kwargs 长度相等
if len(inputs) < len(kwargs):
inputs.extend([() for _ in range(len(kwargs) - len(inputs))])
elif len(kwargs) < len(inputs):
kwargs.extend([{} for _ in range(len(inputs) - len(kwargs))])
# 返回 tuple
inputs = tuple(inputs)
kwargs = tuple(kwargs)
return inputs, kwargs
4.2.2 scatter
Как вы можете понять из комментариев, тензор нарезается на примерно равные куски, а затем распределяется между заданными графическими процессорами. Это разделить пакет данных на более мелкие пакеты примерно поровну. Для других типов переменных будут выполняться разные операции в соответствии с разными типами, например вызов scatter_map для выполнения рекурсивной обработки внутри них.
def scatter(inputs, target_gpus, dim=0):
r"""
Slices tensors into approximately equal chunks and
distributes them across given GPUs. Duplicates
references to objects that are not tensors.
"""
def scatter_map(obj):
if isinstance(obj, torch.Tensor):
# 针对张量会调用Scatter.apply处理
return Scatter.apply(target_gpus, None, dim, obj)
if is_namedtuple(obj):
# 调用 scatter_map 对其子模块进行递归处理。
return [type(obj)(*args) for args in zip(*map(scatter_map, obj))]
if isinstance(obj, tuple) and len(obj) > 0:
# 调用 scatter_map 对其子模块进行递归处理。
return list(zip(*map(scatter_map, obj)))
if isinstance(obj, list) and len(obj) > 0:
# 调用 scatter_map 对其子模块进行递归处理。
return [list(i) for i in zip(*map(scatter_map, obj))]
if isinstance(obj, dict) and len(obj) > 0:
# 调用 scatter_map 对其子模块进行递归处理。
return [type(obj)(i) for i in zip(*map(scatter_map, obj.items()))]
return [obj for targets in target_gpus]
# After scatter_map is called, a scatter_map cell will exist. This cell
# has a reference to the actual function scatter_map, which has references
# to a closure that has a reference to the scatter_map cell (because the
# fn is recursive). To avoid this reference cycle, we set the function to
# None, clearing the cell
try:
res = scatter_map(inputs)
finally:
scatter_map = None
return res
4.2.3 Scatter
Как упоминалось ранее, Scatter.apply обрабатывает тензоры, так что давайте посмотрим. Scatter расширяет функцию по следующей логике:
- Если cuda доступна, получите список потоков, чтобы можно было выполнить копирование ЦП на ГП в фоновом потоке.
- Позвоните comm.scatter для распространения.
- Вызовите wait_stream и record_stream, чтобы синхронизировать поток копирования.
class Scatter(Function):
@staticmethod
def forward(ctx, target_gpus, chunk_sizes, dim, input):
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.dim = dim
ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
streams = None
# 对于cuda,进行处理
if torch.cuda.is_available() and ctx.input_device == -1:
# Perform CPU to GPU copies in a background stream
streams = [_get_stream(device) for device in target_gpus]
# 调用C++进行操作
outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
# Synchronize with the copy stream
if streams is not None:
for i, output in enumerate(outputs):
with torch.cuda.device(target_gpus[i]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[i]) # 同步
output.record_stream(main_stream) # 同步
return outputs
@staticmethod
def backward(ctx, *grad_output):
return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
4.2.4 comm.scatter
Эта функция в основном называетсяtorch._C._scatter
, тем самым входя в мир C++.
def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
"""Scatters tensor across multiple GPUs. """
tensor = _handle_complex(tensor)
if out is None:
devices = [_get_device_index(d) for d in devices]
return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
else:
return tuple(torch._C._scatter_out(tensor, out, dim, streams))
4.2.5 C++
В преобразованном файле мы видим, что разброс — это цель, которую мы хотим проанализировать.
.def(
"_scatter",
[](at::Tensor& tensor,
std::vector<int64_t>& devices,
c10::optional<std::vector<int64_t>> chunk_sizes,
int64_t dim,
c10::optional<py::object> py_streams) {
c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>> streams;
if (py_streams) {
py::handle handle = *py_streams;
streams = THPUtils_PySequence_to_CUDAStreamList(handle.ptr());
}
// Note: We're holding the GIL up to here.
pybind11::gil_scoped_release no_gil;
// 实际需要看这里
return scatter(tensor, devices, chunk_sizes, dim, streams);
},
py::arg("tensor"),
py::arg("devices"),
py::arg("chunk_sizes"),
py::arg("dim"),
py::arg("streams"))
Как вы можете видеть в scatter, scatter заключается в раздаче данных на каждый GPU, логика следующая:
- Сначала вызовите split_with_sizes или chunk, чтобы разделить тензор на куски.
- Во-вторых, куски распределяются между каждым GPU, что выполняется через распределение.
std::vector<at::Tensor> scatter(
const at::Tensor& tensor,
at::IntArrayRef devices,
const c10::optional<std::vector<int64_t>>& chunk_sizes,
int64_t dim,
const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
streams) {
dim = at::maybe_wrap_dim(dim, tensor);
// 首先把tensor分割成 chunks
std::vector<at::Tensor> chunks = chunk_sizes
? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
: tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
at::cuda::OptionalCUDAStreamGuard cuda_guard;
// 其次把 chunks 分布到各个GPU之上
for (size_t i = 0; i < chunks.size(); ++i) {
const auto device_index = static_cast<int16_t>(devices[i]);
if (device_index != tensor.get_device()) {
if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
cuda_guard.reset_stream(*(*streams)[i]);
}
chunks[i] = chunks[i].to( // 拷贝
{DeviceType::CUDA, device_index},
/*non_blocking=*/true,
/*copy=*/false,
/*memory_format=*/at::MemoryFormat::Preserve);
}
}
return chunks; // 返回结果
}
4.3 Копия (модель)
До сих пор мы использовали функцию Scatter для размещения и копирования данных с устройства [0] на разные карты, и мы будем использовать функцию Replicate для копирования модели с устройства [0] на разные карты.
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
Нашему графику распространения соответствует:
replicate — это всего лишь ретвит, его еще нужно посмотреть.
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
4.3.1 replicate
Конкретная логика репликации такова:
-
Используйте _replicatable_module, чтобы проверить, можно ли безопасно реплицировать модель.
-
Посмотрите, сколько графических процессоров и сколько копий нужно сделать.
-
Операция копирования.
-
Копировать параметры.
- Используйте _broadcast_coalesced_reshape для копирования параметров на каждый GPU.
-
Копировать буферы.
- Сначала подсчитайте буферы.
- Запишите индекс буфера, который нужно дифференцировать.
- Запишите буфер буфера, который не нуждается в руководстве.
- Используйте _broadcast_coalesced_reshape для двух буферов для копирования на каждый GPU.
-
Скопируйте модель.
- modules() возвращает итератор, содержащий все модули текущей модели. Преобразование в список можно рассматривать как выравнивание модели.
- Пройдитесь по модулям и добавьте каждый слой модели в каждый module_copies.
- Наконец, module_copies[j] содержит каждый слой модели, а именно
module_copies[j][i]
является i-м слоем модели.
-
-
Настройте действия.
-
Это необходимо для настройки модели сети и настройки ссылки данных в графическом процессоре на каждый модуль в массиве модулей, чтобы эти модули были полными моделями.
-
Потому что раньше сеть вложенной модели была разбита и скопирована на GPU: буферы и параметры также были скопированы на GPU. Теперь их нужно переконфигурировать в модель мелкого копирования, которая завершает логику модели.
-
Пройдитесь по каждому подмодулю модели и настройте только некоторые необходимые параметры.
- справиться со своим сыном
_modules_
. - Обработайте его _parameters.
- Обработайте его _buffers.
- справиться со своим сыном
-
-
В последующих параллельных операциях каждый воркер будет получать каждый модуль в массиве модулей и обучаться на этом модуле.
Конкретный код выглядит следующим образом:
def replicate(network, devices, detach=False):
if not _replicatable_module(network):
raise RuntimeError("Cannot replicate network where python modules are "
"childrens of ScriptModule")
if not devices:
return []
# 看看有多少个GPU,需要复制多少份
devices = [_get_device_index(x, True) for x in devices]
num_replicas = len(devices) # 复制这些份
# 1)复制操作
# 复制参数 parameters
params = list(network.parameters())
param_indices = {param: idx for idx, param in enumerate(params)}
# 拷贝到各个GPU,我们随后会讲解_broadcast_coalesced_reshape
param_copies = _broadcast_coalesced_reshape(params, devices, detach)
# 复制buffers
# 首先统计一下buffers
buffers = list(network.buffers())
buffers_rg = [] # 需要求导的
buffers_not_rg = [] # 不需要求导的
for buf in buffers:
if buf.requires_grad and not detach:
buffers_rg.append(buf)
else:
buffers_not_rg.append(buf)
# 记录需要求导的 buffer 的 index
buffer_indices_rg = {buf: idx for idx, buf in enumerate(buffers_rg)}
# 记录不需要求导的 buffer 的 index
buffer_indices_not_rg = {buf: idx for idx, buf in enumerate(buffers_not_rg)}
# 对于两种buffers分别拷贝到各个GPU
buffer_copies_rg = _broadcast_coalesced_reshape(buffers_rg, devices, detach=detach)
buffer_copies_not_rg = _broadcast_coalesced_reshape(buffers_not_rg, devices, detach=True)
# 准备拷贝模型网络
modules = list(network.modules()) # modules()返回一个包含当前模型所有模块的迭代器。转变成list,可以认为把模型打平了
module_copies = [[] for device in devices] # 为各个GPU准备好空list
module_indices = {}
# 得到模型的浅拷贝列表
for i, module in enumerate(modules): # 遍历模型 list
module_indices[module] = i
for j in range(num_replicas):
replica = module._replicate_for_data_parallel() # 获取浅拷贝
# This is a temporary fix for DDP. DDP needs to access the
# replicated model parameters. It used to do so through
# `mode.parameters()`. The fix added in #33907 for DP stops the
# `parameters()` API from exposing the replicated parameters.
# Hence, we add a `_former_parameters` dict here to support DDP.
replica._former_parameters = OrderedDict()
module_copies[j].append(replica) # 往每个module_copies里面添加模型的每一层
# 最终,module_copies[j] 里面包含了模型的每一层,即module_copies[j][i] 就是模型的第 i 层
# 2)配置操作
# 这一步的目的是:把GPU中数据的reference赋值到浅拷贝之中,变成完备模型。因为之前是把嵌套的模型网络打散了分别拷贝到GPU,buffers和parameters也分别拷贝到了GPU,现在把他们构建到浅拷贝的模型之中,把模型逻辑补齐。
for i, module in enumerate(modules): # 遍历模型每个子模块,只赋值需要的部分参数
# 处理其子_modules
for key, child in module._modules.items():
if child is None:
for j in range(num_replicas):
replica = module_copies[j][i] # module_copies[j]是第j个模型拷贝
replica._modules[key] = None
else:
module_idx = module_indices[child]
for j in range(num_replicas):
replica = module_copies[j][i] # module_copies[j]是第j个模型拷贝
setattr(replica, key, module_copies[j][module_idx]) # 设置第j个模型的对应部分,下同
# 处理_parameters
for key, param in module._parameters.items():
if param is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._parameters[key] = None
else:
param_idx = param_indices[param]
for j in range(num_replicas):
replica = module_copies[j][i]
param = param_copies[j][param_idx]
# parameters in replicas are no longer leaves,
# so setattr them as non-parameter attributes
setattr(replica, key, param)
# expose the parameter for DDP
replica._former_parameters[key] = param
# 处理 _buffers
for key, buf in module._buffers.items():
if buf is None:
for j in range(num_replicas):
replica = module_copies[j][i]
replica._buffers[key] = None
else:
if buf.requires_grad and not detach:
buffer_copies = buffer_copies_rg
buffer_idx = buffer_indices_rg[buf]
else:
buffer_copies = buffer_copies_not_rg
buffer_idx = buffer_indices_not_rg[buf]
for j in range(num_replicas):
replica = module_copies[j][i]
setattr(replica, key, buffer_copies[j][buffer_idx])
return [module_copies[j][0] for j in range(num_replicas)]
4.3.2 Проверка копий
_replicatable_module используется для проверки безопасности копирования модели.
# Check if we can safely replicate the module.
# there are two types of module:
# 1. python modules
# 2. ScriptModule
#
# currently a module cannot be replicated properly if the descendants of
# any ScriptModule contains python module (type 1 above)
def _replicatable_module(module, memo=None):
# module.modules() contains module itself as the first element
def descendant_modules(module):
gen = module.modules()
next(gen)
return gen
if not _is_jit_enabled():
return True
if memo is None:
memo = set()
# memoize visited modules
memo.add(module)
if _is_script_module(module):
memo.update(descendant_modules(module))
return all(_is_script_module(descendant) for
descendant in descendant_modules(module))
for child in module.children():
# since any unreplicatable module will cause the check to return
# False early, visited modules here can be safely ignored.
if child in memo:
continue
if not _replicatable_module(child, memo):
return False
return True
4.3.3 Общие копии
В PyTorch есть мелкие копии и глубокие копии.
Предполагая, что модель представляет собой серию матриц параметров, объект модели фактически указывает на каждую матрицу параметров.
- Поверхностная копия (теневая копия) копирует только самые внешние значения и указатели, а не копирует более глубокие объекты, то есть копирует только родительский объект. model.state_dict() также является мелкой копией.Если param=model.state_dict(), то при изменении param параметры модели также будут изменены соответствующим образом.
- Соответственно, глубокая копия (deepcopy): копировать значения, указатели и пространство глубокой памяти, на которое указывают указатели, то есть копировать родительский объект и его дочерние объекты.
Например:
import torch
import copy
# a引用指向某块内存空间
a = torch.nn.Linear(in_features=5, out_features=1, bias=True)
# 浅拷贝相当于拷贝一个引用,所以他们指向的内存空间是一样的
b = copy.copy(a)
# state_dict is shadow copy
p = a.state_dict()
print(id(a.state_dict()) == id(p)) # False,这两个不相等
# 通过引用p去修改内存空间
print(a.weight)
p['weight'][0][0] = 8.8888
# 可以看到a指向的内存空间也被修改了
print(a.weight)
Результат выглядит следующим образом:
False
Parameter containing:
tensor([[-0.2253, 0.0802, 0.3984, -0.1208, 0.3796]], requires_grad=True)
Parameter containing:
tensor([[ 8.8888, 0.0802, 0.3984, -0.1208, 0.3796]], requires_grad=True)
В частности, возвращаясь к нашему анализу, в классе модуля есть метод _replicate_for_data_parallel, который используется для возврата копии, которая разделяет хранилище с исходной моделью, которая является поверхностной копией.
def _replicate_for_data_parallel(self):
replica = self.__new__(type(self))
replica.__dict__ = self.__dict__.copy()
# replicas do not have parameters themselves, the replicas reference the original
# module.
replica._parameters = OrderedDict()
replica._buffers = replica._buffers.copy() # 浅拷贝
replica._modules = replica._modules.copy() # 浅拷贝模型内部的子模块
replica._is_replica = True
return replica
Можно считать, что до операции set копия выглядит следующим образом:
+---------------------------------------------------------------+
| +----------------------+ |
| CPU | Module | |
| | | |
| | _parameters | |
| | | |
| +--------------> _buffers <-------------+ |
| | | | | |
| | +-------> _modules <----------+ | |
| | | | | | | |
| | | +----------------------+ | | |
| +---------------------+ | +----------------------+ | | |
| | module_copies[0] | | | | module_copies[1] | | | |
| | | | | | | | | |
| | _parameters | | | | _parameters | | | |
| | | | | | | | | |
| | _buffers +----+ | | | _buffers +--------------+ |
| | | | | | | |
| | _modules +-------->+ | _modules +--------->+ |
| | | | | |
| +---------------------+ +----------------------+ |
+---------------------------------------------------------------+
+---------------------+ +----------------------+
| GPU 0 | | GPU 1 |
| | | |
| _parameters | | _parameters |
| | | |
| _buffers | | _buffers |
| | | |
| | | |
| | | |
+---------------------+ +----------------------+
После операции set это выглядит следующим образом:
+-----------------------------------------------------------------+
| CPU +----------------------+ |
| | Module | |
| | | |
| | _parameters | |
| | | |
| | _buffers | |
| | | |
| | _modules | |
| | | |
| +----------------------+ |
| +---------------------+ +----------------------+ |
| | module_copies[0] | | module_copies[1] | |
| | | | | |
+---------+ _parameters | | _parameters +-----------+ |
| | | | | | | |
| | | _buffers +------------+ | _buffers +-----------+ | |
| | | | | | | | | |
| | | _modules | | | _modules | | | |
| | | | | | | | | |
| | +---------------------+ | +----------------------+ | | |
| +-----------------------------------------------------------------+
| | | |
| +---------------------+ | +----------------------+ | |
| | GPU 0 | | | GPU 1 | | |
| | | | | | | |
+---------> _parameters | | | _parameters <----------+
| | | | | |
| _buffers <----------+ | _buffers <--------+
| | | |
| | | |
| | | |
+---------------------+ +----------------------+
4.3.4 Операция копирования
4.3.4.1 _broadcast_coalesced_reshape
Используются параметры копирования _broadcast_coalesced_reshape.
def _broadcast_coalesced_reshape(tensors, devices, detach=False):
from ._functions import Broadcast
if detach:
# 如果是detach,就直接调用
return comm.broadcast_coalesced(tensors, devices)
else:
# Use the autograd function to broadcast if not detach
if len(tensors) > 0:
# 否则先用Broadcast过度一下,最后还是调用broadcast_coalesced
tensor_copies = Broadcast.apply(devices, *tensors)
return [tensor_copies[i:i + len(tensors)]
for i in range(0, len(tensor_copies), len(tensors))]
else:
return []
4.3.4.2 Broadcast
Причин использования Broadcast слишком много: поскольку тензоры не отсоединяются, помимо трансляции нужно еще задать, какие градиенты не нужны в контексте. В некоторых случаях пользовательские функции должны знать об этом.
class Broadcast(Function):
@staticmethod
def forward(ctx, target_gpus, *inputs):
assert all(i.device.type != 'cpu' for i in inputs), (
'Broadcast function not implemented for CPU tensors'
)
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.target_gpus = target_gpus
if len(inputs) == 0:
return tuple()
ctx.num_inputs = len(inputs)
# input 放在 device[0]
ctx.input_device = inputs[0].get_device()
# 和 detach 的情形一样
outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
non_differentiables = []
# 在上下文中设置哪些不需要梯度
for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
if not input_requires_grad:
for output in outputs:
non_differentiables.append(output[idx])
ctx.mark_non_differentiable(*non_differentiables)
return tuple([t for tensors in outputs for t in tensors])
@staticmethod
def backward(ctx, *grad_outputs):
return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
Среди них mark_non_дифференцируемый определяется в torch/csrc/autograd/custom_function.cpp, где недифференцируемые переменные настраиваются в AutogradContext.
void AutogradContext::mark_non_differentiable(const variable_list &outputs) {
non_differentiable_.clear();
non_differentiable_.reserve(outputs.size());
for(auto& var : outputs) {
non_differentiable_.insert(var.unsafeGetTensorImpl());
}
}
4.3.4.3 broadcast_coalesced
Broadcast_coalesced прыгает в мир C++.
def broadcast_coalesced(tensors, devices, buffer_size=10485760):
"""Broadcasts a sequence tensors to the specified GPUs.
Small tensors are first coalesced into a buffer to reduce the number
of synchronizations.
Args:
tensors (sequence): tensors to broadcast. Must be on the same device,
either CPU or GPU.
devices (Iterable[torch.device, str or int]): an iterable of GPU
devices, among which to broadcast.
buffer_size (int): maximum size of the buffer used for coalescing
Returns:
A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
"""
devices = [_get_device_index(d) for d in devices]
tensors = [_handle_complex(t) for t in tensors]
return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
4.3.4.4 C++
Как видно из кода инициализации, это делается в файле broadcast_coalesced.
auto m = py::cast<py::module>(module);
m.def(
"_broadcast_coalesced",
[](std::vector<at::Tensor>& tensors,
std::vector<int64_t> devices,
size_t buffer_size) {
return broadcast_coalesced(tensors, devices, buffer_size);
},
py::arg("tensors"),
py::arg("devices"),
py::arg("buffer_size"),
py::call_guard<py::gil_scoped_release>())
- Broadcast_coalesced будет распространять переменную на все графические процессоры. В Broadcast_coalesced несколько переменных могут быть объединены в одну большую переменную, транслированы на другие устройства, а затем разделены в соответствии с исходной формой.
- При разделении операция представления будет транслировать все переменные вместе для совместного использования счетчика версий, поскольку все они являются представлениями больших переменных. Однако эта большая переменная немедленно отбрасывается, и все эти переменные вообще не используют общую память.
- Например, когда два буфера совместно транслируются в режиме «Параллельные данные», и один из них выполняет операции на месте во время «вперед», а другой используется в режиме «назад», движок автоградации будет жаловаться. Поэтому мы переопределим эти переменные после трансляции и дадим им отдельные счетчики версий.
Конкретный код находится в torch/csrc/cuda/comm.cpp. Изучим его аннотации.
// broadcast_coalesced
// ~~~~~~~~~~~~~~~~~~~
//
// In broadcast_coalesced, multiple variables may be coalesced into a single
// large one, broadcast to other devices, and the get split according to the
// original shapes.
//
// When splitting, the view operations will make all Variables broadcast
// together to share a single version counter, because they are all views of the
// large Variable. However, that large Variable is immediately discarded and all
// these Variables do not share storage at all.
//
// For example, when two buffers are broadcast together in `DataParallel` and
// one of them is modified in-place during `forward` but the other is needed in
// backward, autograd engine will complain.
//
// We thus re-wrap these Variables after broadcasting (i.e., effectively doing
// what is equivalent to .data in Python), and give them individual version
// counters.
Конкретные параметры метода broadcast_coalesced поясняются следующим образом:
- тензоры должны быть на одном устройстве, CPU или GPU;
- devices — устройство, на которое нужно скопировать;
- buffer_size — это самый большой буфер. Буфер используется здесь для объединения небольших тензоров в буфер, чтобы уменьшить количество синхронизаций;
tensor_list2d broadcast_coalesced(
TensorList tensors,
IntArrayRef devices,
size_t buffer_size) {
TORCH_CHECK(
std::all_of(
tensors.begin(),
tensors.end(),
[&](const at::Tensor& t) { return t.get_device() == devices[0]; }),
"All tensors must be on devices[0]: ",
devices[0]);
#ifdef USE_NCCL
buffer_size = std::min(torch::cuda::nccl::get_max_count(), buffer_size);
#endif
tensor_list2d outputs(devices.size());
outputs[0] = tensors.vec();
for (auto& o : outputs)
o.reserve(tensors.size());
unique_type_checker type_checker;
at::cuda::CUDAGuard device_guard(devices[0]);
for (auto& chunk : utils::take_tensors(tensors, buffer_size)) {
auto type_id = chunk.type_id();
type_checker.show(type_id);
std::vector<at::Tensor> results;
if (chunk.options().is_sparse()) {
auto flat_tuple = utils::flatten_sparse_tensors(chunk.tensors);
auto broadcast_indices = broadcast(flat_tuple.first, devices); //这里进行广播
auto broadcast_values = broadcast(flat_tuple.second, devices); //这里进行广播
results.reserve(devices.size());
for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
device_guard.set_index(devices[i]);
auto& device_outputs = outputs[i];
auto& inds = broadcast_indices[i];
auto& vals = broadcast_values[i];
for (auto& t :
utils::unflatten_sparse_tensors(inds, vals, chunk.tensors)) {
Variable var = t;
device_outputs.push_back(make_variable(var.tensor_data(), false));
}
}
} else {
auto results = // 这里进行广播
broadcast(utils::flatten_dense_tensors(chunk.tensors), devices);
for (size_t i = 1, num_devices = devices.size(); i < num_devices; ++i) {
device_guard.set_index(devices[i]);
auto& device_outputs = outputs[i];
for (auto& t :
utils::unflatten_dense_tensors(results[i], chunk.tensors)) {
Variable var = t;
device_outputs.push_back(make_variable(var.tensor_data(), false));
}
}
}
}
// If we only saw a single tensor type, then we can skip expensive reordering
if (!type_checker.unique) {
for (auto& o : outputs)
utils::reorder_tensors_like(o, tensors);
}
return outputs;
}
Метод трансляции следующий:
std::vector<Tensor> broadcast(const Tensor& tensor, IntArrayRef devices) {
std::vector<Tensor> diff_device_dst_tensors;
diff_device_dst_tensors.reserve(devices.size());
for (auto device : devices) {
if (device != tensor.get_device()) {
diff_device_dst_tensors.push_back(at::empty(
tensor.sizes(),
tensor.options().device(
at::Device(DeviceType::CUDA, device)))); // preserve memory format
}
}
// 继续调用操作
_broadcast_out_impl(tensor, diff_device_dst_tensors);
std::vector<Tensor> dst_tensors;
dst_tensors.reserve(devices.size());
auto it = diff_device_dst_tensors.begin();
for (auto device : devices) {
if (device != tensor.get_device()) {
dst_tensors.push_back(*it++);
} else {
dst_tensors.push_back(tensor);
}
}
TORCH_INTERNAL_ASSERT(it == diff_device_dst_tensors.end());
return dst_tensors;
}
Наконец, _broadcast_out_impl вызывается для трансляции исходного тензора (CPU или CUDA) в список устройств CUDA, который вызывает nccl::broadcast(nccl_list).
static inline std::vector<Tensor>& _broadcast_out_impl(
const Tensor& tensor,
std::vector<Tensor>& out_tensors) {
#ifdef USE_NCCL
std::vector<Tensor> nccl_list;
nccl_list.reserve(out_tensors.size() + 1);
nccl_list.push_back(tensor);
for (auto& out_tensor : out_tensors) {
nccl_list.push_back(out_tensor);
}
if (nccl::is_available(nccl_list)) {
nccl::broadcast(nccl_list); // 这里调用了 NCCL 操作
} else {
#else
{
#endif
for (auto& out_tensor : out_tensors) {
out_tensor.copy_(tensor, /*non_blocking=*/true);
}
}
return out_tensors;
}
До сих пор мы распространяли и данные, и модель на другие графические процессоры. Сначала мы строим текущий форвард-граф, и вы можете иметь четкое представление.Репликация вызывает Broadcast.forward и сохраняет input_device и num_inputs в своем контексте. Далее может быть выполнено прямое распространение.
+----------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| |
| replicate +---------------> parallel_apply gather |
| |
+----------------------------------------------------------------------------------------+
+---------------------------+
| Broadcast |
| |
| |
| |
| forward() +----------->
| |
| |
| +---------------------+ |
| | ctx | |
| | input_device | |
| | | |
| | num_inputs | |
| | | |
| +---------------------+ |
| |
| |
| |
| |
| |
| |
+---------------------------+
Из-за нехватки места в следующей статье мы продолжим анализ с параллельной операции (прямое распространение).
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel
Понимать распределенное обучение интерпретации исходного кода PyTorch?
обсуждение.py torch.org/he/data пар AL…
[Оригинал][глубина][PyTorch] Вторая часть серии DDP: принцип реализации и анализ исходного кода
Pytorch-CUDA от входа до отказа (2)
DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения