0x00 сводка
Эта статья является третьей частью распространяемого PyTorch, Продолжая вышеизложенное, она знакомит с параллельной операцией и обратным распространением DataPrallel.
Другие статьи из этой серии:
Автоматическая дифференциация инструментов глубокого обучения (1)
Автоматическая дифференциация инструментов глубокого обучения (2)
Автоматическая дифференциация оружия глубокого обучения (3) --- Пример интерпретации
[Анализ исходного кода] Как PyTorch реализует прямое распространение (1) --- Базовый класс (1)
[Анализ исходного кода] Как PyTorch реализует прямое распространение (2) --- Базовый класс (ниже)
[Анализ исходного кода] Как PyTorch реализует прямое распространение (3) --- конкретная реализация
[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка
[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм
[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор
[Анализ исходного кода] Как PyTorch использует GPU
[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)
0x01 Работа в прямом направлении
Давайте сначала вспомним текущий форвард-граф, реплицируем вызовы Broadcast.forward и сохраним input_device и num_inputs в его контексте.
+----------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| |
| replicate +---------------> parallel_apply gather |
| |
+----------------------------------------------------------------------------------------+
+---------------------------+
| Broadcast |
| |
| |
| |
| forward() +----------->
| |
| |
| +---------------------+ |
| | ctx | |
| | input_device | |
| | | |
| | num_inputs | |
| | | |
| +---------------------+ |
| |
| |
| |
| |
| |
| |
+---------------------------+
1.1 параллелизм
В настоящее время мы использовали функцию Scatter для размещения и копирования данных с устройства [0] на разные карты, а также функцию Replicate для копирования модели с устройства [0] на разные карты, чтобы каждая карта имела одинаковую модель и разные данные, теперь надо звонить вперед для расчета потерь и градиента соответственно. Это часть parallel_apply.
# 分发数据
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 并行训练
outputs = self.parallel_apply(replicas, inputs, kwargs)
Нашему графику распространения соответствует:
parallel_apply реализуется на основе многопоточности, используя заранее подготовленную реплику и входные данные, затем цикл for запускает многопоточность для прямого распространения и, наконец, выводит результат распространения.
def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
# 确保模型和输入大小一致
assert len(modules) == len(inputs)
# 确保每个 GPU 都有相应的元数据,如没有就空白补全
if kwargs_tup is not None:
# 在前面已经补全
assert len(modules) == len(kwargs_tup)
else:
kwargs_tup = ({},) * len(modules)
# 确保模型数目和CPU数目一致
if devices is not None:
assert len(modules) == len(devices)
else:
devices = [None] * len(modules)
devices = [_get_device_index(x, True) for x in devices]
# 基于threading多线程实现
lock = threading.Lock()
results = {}
grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()
# 定义 worker
def _worker(i, module, input, kwargs, device=None):
torch.set_grad_enabled(grad_enabled)
if device is None:
device = get_a_var(input).get_device()
try:
# 设置当前的设备
with torch.cuda.device(device), autocast(enabled=autocast_enabled):
# this also avoids accidental slicing of `input` if it is a Tensor
if not isinstance(input, (list, tuple)):
input = (input,)
output = module(*input, **kwargs) # 前向操作
with lock:
# 并行计算得到输出
results[i] = output
except Exception:
with lock:
results[i] = ExceptionWrapper(
where="in replica {} on device {}".format(i, device))
if len(modules) > 1:
# 如有一个进程控制多个 GPU ,起多个线程
# 注意,这里就是每个 worker 调用了 modules 数组中的一个模型copy
threads = [threading.Thread(target=_worker,
args=(i, module, input, kwargs, device))
for i, (module, input, kwargs, device) in
enumerate(zip(modules, inputs, kwargs_tup, devices))]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
else:
# 一个GPU对应一个进程
_worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
outputs = []
for i in range(len(inputs)):
output = results[i]
# error handle
if isinstance(output, ExceptionWrapper):
output.reraise()
outputs.append(output)
# 输出 n 个计算结果
return outputs
В это время прямое распространение соответствует следующему рисунку Теперь параллельная операция вызывает прямой метод модуля.
+----------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| 1 2 3 |
| replicate +---------------> parallel_apply gather |
| |
+----------------------------------------------------------------------------------------+
+---------------------------+ +-------------------+
| Broadcast | | module |
| | | |
| | | |
| 1 | | 2 |
| forward() +-----------> | forward() +--------->
| | | |
| | | |
| +---------------------+ | | |
| | ctx | | | |
| | input_device | | | |
| | | | | |
| | num_inputs | | | |
| | | | | |
| +---------------------+ | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
+---------------------------+ +-------------------+
1.2 Gather
В настоящее время мы использовали функцию Scatter для размещения и копирования данных с устройства [0] на разные карты, а также функцию Replicate для копирования модели с устройства [0] на разные карты, чтобы каждая карта имела одинаковую модель и различные данные, а затем вызов вперед для расчета потерь и градиента соответственно. Это часть parallel_apply.
Все, что нам нужно сделать сейчас, — это объединить градиенты распределенных вычислений в устройство [0], которым является self.output_device.
# 分发数据
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 并行训练
outputs = self.parallel_apply(replicas, inputs, kwargs)
# 收集到 devices[0]
return self.gather(outputs, self.output_device)
Нашему графику распространения соответствует:
Давайте посмотрим, как результаты собираются в устройство [0] и как устройство [0] действует как сервер параметров.
1.2.1 Мир Python
Gather в основном вызывает Gather.apply(target_device, dim, *outputs) для завершения работы по сбору данных.
def gather(outputs, target_device, dim=0): # target_device 就是 device[0]
r"""
Gathers tensors from different GPUs on a specified device
(-1 means the CPU).
"""
def gather_map(outputs):
out = outputs[0]
if isinstance(out, torch.Tensor):
return Gather.apply(target_device, dim, *outputs) # 调用下面的 Gather
if out is None:
return None
if isinstance(out, dict):
return type(out)(((k, gather_map([d[k] for d in outputs]))
for k in out))
return type(out)(map(gather_map, zip(*outputs)))
# Recursive function calls like this create reference cycles.
# Setting the function to None clears the refcycle.
try:
res = gather_map(outputs)
finally:
gather_map = None
return res
Gather вызывает comm.gather для завершения работы, а comm.gather ведет нас в мир C++.
Мы опускаем некоторые коды проверки.
# Gather 源码
class Gather(Function):
@staticmethod
def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0]
# 下面会往 context 内部存放几个变量,后续会用到
target_device = _get_device_index(target_device, True)
ctx.target_device = target_device
ctx.dim = dim
ctx.input_gpus = tuple(i.get_device() for i in inputs)
if all(t.dim() == 0 for t in inputs) and dim == 0:
inputs = tuple(t.view(1) for t in inputs)
ctx.unsqueezed_scalar = True
else:
ctx.unsqueezed_scalar = False
ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
return comm.gather(inputs, ctx.dim, ctx.target_device) # 这里会进入C++世界
@staticmethod
def backward(ctx, grad_output): # 注意,这里后续会用到
scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
if ctx.unsqueezed_scalar:
scattered_grads = tuple(g[0] for g in scattered_grads)
return (None, None) + scattered_grads
Теперь форвардный расчет выглядит следующим образом:
Gather вызывает форвардную функцию Gather, а форвардный метод сохраняет три переменные input_gpus, input_sizes, dim в ctx, которые будут использоваться позже.
+-----------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| 1 2 3 |
| replicate +---------------> parallel_apply +--------------> gather |
| |
+-----------------------------------------------------------------------------------------+
+---------------------------+ +-------------------+ +--------------------+
| Broadcast | | module | |Gather |
| | | | | |
| | | | | |
| 1 | | 2 | | 3 |
| forward() +-----------> | forward() +--------> | forward() |
| | | | | |
| | | | | |
| +---------------------+ | | | | +----------------+ |
| | ctx | | | | | |ctx | |
| | input_device | | | | | | input_gpus | |
| | | | | | | | | |
| | num_inputs | | | | | | input_sizes| |
| | | | | | | | | |
| +---------------------+ | | | | | dim | |
| | | | | +----------------+ |
| | | | | |
| | | | | |
| | | | | |
| | | | | |
| | | | | |
+---------------------------+ +-------------------+ +--------------------+
1.2.2 Мир C++
Функция сбора вызывает _gather_out_impl для завершения операции копирования.
at::Tensor gather(
at::TensorList tensors,
int64_t dim,
c10::optional<int32_t> destination_index) { // destination_index 就是 device[0] 的index
int64_t total_size = 0;
auto& first = tensors.front();
const auto first_size = first.sizes();
dim = at::maybe_wrap_dim(dim, first);
std::vector<int64_t> expected_size(first_size.begin(), first_size.end());
auto memory_format = first.suggest_memory_format();
for (size_t i = 0; i < tensors.size(); i++) {
const auto& tensor = tensors[i];
expected_size[dim] = tensor.size(dim);
total_size += tensor.size(dim);
if (memory_format != MemoryFormat::Contiguous &&
tensor.suggest_memory_format() != memory_format) {
memory_format = MemoryFormat::Contiguous;
}
}
expected_size[dim] = total_size;
at::Device device(DeviceType::CPU);
// 根据 index 得到输出的目标设备
if (!destination_index || *destination_index != -1) {
// device 就是 GPU 0 这个设备
device = at::Device(
DeviceType::CUDA, destination_index ? *destination_index : -1);
}
//首先,构建一个空的目标tensor建立在目标设备之上,命名为result
at::Tensor result =
at::empty(expected_size, first.options().device(device), memory_format);
return _gather_out_impl(tensors, result, dim); // 然后对result进行gather
}
_gather_out_impl выполняет определенную операцию сбора, которая заключается в копировании входных тензоров в целевой тензор, то есть в GPU0.
// ***************** Gather *******************
//
// Gather a list of CUDA tensors on one or more devices to a target tensor or
// device, either CPU or CUDA.
// no checks
static inline at::Tensor& _gather_out_impl(
at::TensorList tensors,
at::Tensor& out_tensor,
int64_t dim) {
std::vector<int64_t> chunk_sizes;
chunk_sizes.reserve(tensors.size());
for (auto& tensor : tensors) {
chunk_sizes.push_back(tensor.size(dim));
}
auto chunks =
out_tensor.split_with_sizes(/*split_sizes=*/chunk_sizes, /*dim=*/dim);
for (size_t i = 0; i < tensors.size(); i++) { // 拷贝到GPU 0 之上
chunks[i].copy_(tensors[i], /*non_blocking=*/out_tensor.is_cuda());
}
return out_tensor;
}
0x02 Рассчитать потери
Теперь, когда мы собрали градиенты на устройстве [0], мы запускаем обратное распространение, общая логика которого показана выше. Во-первых, вычислить потери на устройстве [0] . По сути, потеря вычисления на этом шаге является промежуточным звеном между прямым вычислением и обратным распространением, здесь оно рассматривается как начало обратного распространения, как показано на следующем рисунке.
Давайте узнаем пример кода и посмотрим ключевые моменты:
- Данные были размещены на графическом процессоре по умолчанию, то есть на графическом процессоре 0.
- прогнозирование — это результат прямого вычисления при сборе на GPU 0.
- использовать
loss = criterion(prediction,target_var)
Вычислительные потери поверх графического процессора по умолчанию. - Используйте loss.backward(), чтобы начать обратное распространение.
for batch_idx, (data, label) in pbar:
if args.cuda:
data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上
data_v = Variable(data)
target_var = Variable(label)
prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出
# 到目前为止,我们完成了DataParallel.forward()
#这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里
#前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果聚合到主gpu里
criterion = nn.CrossEntropyLoss()
loss = criterion(prediction,target_var) # 3. 在默认GPU之上计算loss
optimizer.zero_grad()
loss.backward() # 4. 开始反向传播
optimizer.step()
0x03 Обратное распространение
Мы запускаем вышеуказанную часть Forward, вычисляем потери, а затем запускаем приведенный выше код.loss.backward()
часть.
3.1 Распределение градиентов
Сначала мы подошли к части распределения градиентов, то есть к распределению потерь между графическими процессорами, чтобы их можно было распространять обратно независимо на каждом графическом процессоре. Соответствует следующему рисунку:
3.1.1 Gather.backward
Как упоминалось ранее, прогнозирование — это результат прямого вычисления при сборе на GPU 0. И потери рассчитываются на основе предсказания, поэтому изloss.backward()
Запускаем обратное распространение, первым шагом от заднего к переднему является операция распространения «сбора», которая соответствует обратной функции «Сбор», код ядра которой — Scatter.apply.
class Gather(Function):
# 这里前向传播用到了,为了对照,我们依然贴出来
@staticmethod
def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0]
# 下面会往 context 内部存放几个变量,后续会用到
target_device = _get_device_index(target_device, True)
ctx.target_device = target_device
ctx.dim = dim
ctx.input_gpus = tuple(i.get_device() for i in inputs)
if all(t.dim() == 0 for t in inputs) and dim == 0:
inputs = tuple(t.view(1) for t in inputs)
ctx.unsqueezed_scalar = True
else:
ctx.unsqueezed_scalar = False
ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
# 这里会进入C++世界,把输出聚集到 GPU 0。
return comm.gather(inputs, ctx.dim, ctx.target_device)
@staticmethod
def backward(ctx, grad_output): # 这里现在后向传播用到了!
# 把前向传播在 context 之中存放的变量取出,作为 Scatter 的输入
scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
if ctx.unsqueezed_scalar:
scattered_grads = tuple(g[0] for g in scattered_grads)
return (None, None) + scattered_grads
Подробности следующие: вы можете видеть, что обратное использует ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output, сохраненные в предыдущем прямом распространении, для вызова Scatter.apply.
На рисунке вверху показан процесс прямого распространения, внизу — процесс обратного распространения, а в середине — некоторые модули кода, используемые в прямом и обратном распространении.
+--------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| 1 2 3 |
| replicate +---------------> parallel_apply +--------------> gather |
| |
+--------------------------------------------------------------------------------------+
+---------------------------+ +-------------------+ +--------------------+
| Broadcast | | module | |Gather |
| | | | | |
| | | | | |
| 1 | | 2 | | 3 |
| forward() +-----------> | forward() +--------> | forward() |
| | | | | |
| | | | | |
| +---------------------+ | | | | +----------------+ |
| | ctx | | | | | |ctx | |
| | input_device | | | | | | input_gpus | |
| | | | | | | | | |
| | num_inputs | | | | | | input_sizes| |
| | | | | | | | | |
| +---------------------+ | | | | | dim | |
| | | | | +----------------+ |
| | | | | |
| | | | | |
| | | | <---------+ backward() |
| | | | | 3 |
| | | | | |
+---------------------------+ +-------------------+ +--------------------+
+--------------------------------------------------------------------------------------+
| loss.backward() |
| 3 |
| <--------------------+ |
| |
| |
+--------------------------------------------------------------------------------------+
3.1.2 Scatter
Scatter.apply фактически вызывает свой прямой метод.
- Сначала извлеките ранее сохраненные переменные из контекста, здесь в основном устройства ввода input_device (исходное устройство) и target_gpus (целевое устройство).
- Получите поток на целевое устройство.
- Вызовите comm.scatter, чтобы распределить градиент на целевое устройство.
class Scatter(Function):
@staticmethod
def forward(ctx, target_gpus, chunk_sizes, dim, input):
target_gpus = [_get_device_index(x, True) for x in target_gpus]
ctx.dim = dim
ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
streams = None
if torch.cuda.is_available() and ctx.input_device == -1:
# Perform CPU to GPU copies in a background stream
streams = [_get_stream(device) for device in target_gpus]
# 分发到其他GPU
outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
# Synchronize with the copy stream
if streams is not None:
for i, output in enumerate(outputs):
with torch.cuda.device(target_gpus[i]):
main_stream = torch.cuda.current_stream()
main_stream.wait_stream(streams[i])
output.record_stream(main_stream)
return outputs
@staticmethod
def backward(ctx, *grad_output):
return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
3.1.3 C++
выше код питонаoutputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
Прямо в мир C++. Конкретный код находится в torch/csrc/cuda/comm.cpp.
Роль разброса состоит в том, чтобы разделить тензор и распределить его по потоку каждого устройства.
std::vector<at::Tensor> scatter(
const at::Tensor& tensor,
at::IntArrayRef devices,
const c10::optional<std::vector<int64_t>>& chunk_sizes,
int64_t dim,
const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
streams) {
dim = at::maybe_wrap_dim(dim, tensor);
// 把tensor进行split
std::vector<at::Tensor> chunks = chunk_sizes
? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
: tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
at::cuda::OptionalCUDAStreamGuard cuda_guard;
for (size_t i = 0; i < chunks.size(); ++i) {
const auto device_index = static_cast<int16_t>(devices[i]);
if (device_index != tensor.get_device()) {
if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
cuda_guard.reset_stream(*(*streams)[i]);
}
// 发送给各个设备的流
chunks[i] = chunks[i].to(
{DeviceType::CUDA, device_index},
/*non_blocking=*/true,
/*copy=*/false,
/*memory_format=*/at::MemoryFormat::Preserve);
}
}
return chunks;
}
3.2 Параллельное обратное распространение
Теперь, когда градиент распределен по каждому графическому процессору, следующим шагом является официальное введение параллельного обратного распространения, Эта часть функции заключается в параллельном запуске обратного распространения на каждом графическом процессоре для расчета градиента параметра. Соответствует следующему рисунку:
Эта часть вызывает обратную исходную модель, в частности значение 4 на следующем рисунке:
+--------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| 1 2 3 |
| replicate +---------------> parallel_apply +--------------> gather |
| |
+--------------------------------------------------------------------------------------+
+---------------------------+ +-------------------+ +--------------------+
| Broadcast | | module | |Gather |
| | | | | |
| | | | | |
| 1 | | 2 | | 3 |
| forward() +-----------> | forward() +--------> | forward() |
| | | | | |
| | | | | |
| +---------------------+ | | | | +----------------+ |
| | ctx | | | | | |ctx | |
| | input_device | | | | | | input_gpus | |
| | | | | | | | | |
| | num_inputs | | | | | | input_sizes| |
| | | | | | | | | |
| +---------------------+ | | | | | dim | |
| | | | | +----------------+ |
| | | | | |
| | | | | |
| | <---------+ backward() | <---------+ backward() |
| | | 4 | | 3 |
| | | | | |
+---------------------------+ +-------------------+ +--------------------+
+--------------------------------------------------------------------------------------+
| loss.backward() |
| 4 3 |
| <------------------+ <--------------------+ |
| |
| |
+--------------------------------------------------------------------------------------+
3.3 Слияние градиентов
Эта часть функции заключается в слиянии градиентов на GPU 0, а общее расширение процесса соответствует следующему рисунку:
3.3.1 Broadcast.backward
Эта часть соответствует обратному распространению широковещания.
class Broadcast(Function):
@staticmethod
def forward(ctx, target_gpus, *inputs):
target_gpus = [_get_device_index(x, True) for x in target_gpus]
# 前向传播时候,向上下文存入了一些变量
ctx.target_gpus = target_gpus
if len(inputs) == 0:
return tuple()
ctx.num_inputs = len(inputs)
# input 放在 device[0],所以 input_device 就是 GPU 0
ctx.input_device = inputs[0].get_device()
# 和 detach 的情形一样
outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
non_differentiables = []
# 在上下文中设置哪些不需要梯度
for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
if not input_requires_grad:
for output in outputs:
non_differentiables.append(output[idx])
ctx.mark_non_differentiable(*non_differentiables)
return tuple([t for tensors in outputs for t in tensors])
@staticmethod
def backward(ctx, *grad_outputs):
# 反向传播来到这里,取出之前在上下文存放的变量作为输入。ctx.input_device 就是之前存储的 GPU 0。
return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
Следовательно, мы можем расширить блок-схему:
+--------------------------------------------------------------------------------------+
| DataParallel.forward |
| |
| 1 2 3 |
| replicate +---------------> parallel_apply +--------------> gather |
| |
+--------------------------------------------------------------------------------------+
+---------------------------+ +-------------------+ +--------------------+
| Broadcast | | module | |Gather |
| | | | | |
| | | | | |
| 1 | | 2 | | 3 |
| forward() +-----------> | forward() +--------> | forward() |
| | | | | |
| | | | | |
| +---------------------+ | | | | +----------------+ |
| | ctx | | | | | |ctx | |
| | input_device | | | | | | input_gpus | |
| | | | | | | | | |
| | num_inputs | | | | | | input_sizes| |
| | | | | | | | | |
| +---------------------+ | | | | | dim | |
| | | | | +----------------+ |
| | | | | |
| | | | | |
| backward() | <---------+ backward() | <---------+ backward() |
| 5 | | 4 | | 3 |
| | | | | |
+---------------------------+ +-------------------+ +--------------------+
+--------------------------------------------------------------------------------------+
| loss.backward() |
| 5 4 3 |
| <------------------------+ <------------------+ <--------------------+ |
| |
| |
+--------------------------------------------------------------------------------------+
3.3.2 ReduceAddCoalesced
Broadcast.backward вызывает метод ReduceAddCoalesced.apply, который соответствует прямому методу ReduceAddCoalesced, целью которого является слияние градиента с назначением целевого устройства, которым является GPU 0.
class ReduceAddCoalesced(Function):
@staticmethod
# 会调用到这里,destination 是GPU 0
def forward(ctx, destination, num_inputs, *grads):
# 从梯度之中提取所在的设备
ctx.target_gpus = [grads[i].get_device() for i in range(0, len(grads), num_inputs)]
grads_ = [grads[i:i + num_inputs]
for i in range(0, len(grads), num_inputs)]
# 把梯度归并到目标设备 destination,就是GPU 0
return comm.reduce_add_coalesced(grads_, destination)
@staticmethod
def backward(ctx, *grad_outputs):
return (None, None,) + Broadcast.apply(ctx.target_gpus, *grad_outputs)
3.3.3 c++
Посмотрите на комментарии: чтобы добавить градиенты с нескольких графических процессоров, используйте код слияния-добавления.
def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
"""Sums tensors from multiple GPUs.
Small tensors are first coalesced into a buffer to reduce the number
of synchronizations.
Args:
inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
contain tensors from a single device.
destination (int, optional): a device on which the output will be
placed (default: current device).
buffer_size (int): maximum size of the buffer used for coalescing
Returns:
A tuple of tensors containing an elementwise sum of each group of
inputs, placed on the ``destination`` device.
"""
dense_tensors: List[List] = [[] for _ in inputs] # shape (num_gpus, num_tensors)
output = []
ref_order = []
# process sparse ones first since they may have different sizes on different gpus
for tensor_at_gpus in zip(*inputs):
if all(t.is_sparse for t in tensor_at_gpus):
# 进行归并
result = reduce_add(tensor_at_gpus, destination) # this will be sparse too
output.append(result)
ref_order.append(tensor_at_gpus[0])
else:
for coll, t in zip(dense_tensors, tensor_at_gpus):
coll.append(t.to_dense() if t.is_sparse else t)
ref_order.append(dense_tensors[0][-1])
itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
# now the dense ones, which have consistent sizes
for chunks in zip(*itrs):
flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks] # (num_gpus,)
# 进行归并
flat_result = reduce_add(flat_tensors, destination)
for t in _unflatten_dense_tensors(flat_result, chunks[0]):
# The unflattened tensors do not share storage, and we don't expose
# base flat tensor anyways, so give them different version counters.
# See NOTE [ Version Counter in comm.*_coalesced ]
output.append(t.data)
return tuple(_reorder_tensors_as(output, ref_order))
3.4 Обновление параметров модели
Эта часть функции: обновить параметры градиента. Выполните градиентный спуск и обновите параметры модели на основном графическом процессоре.
Кроме того, поскольку параметры модели обновляются только на ведущем графическом процессоре, а другие подчиненные графические процессоры в это время не обновляются синхронно, необходимо скопировать обновленные параметры модели на оставшиеся подчиненные графические процессоры для достижения параллелизма. Это делается в следующем цикле for, и цикл повторяется.
Соответствующий пример кода:
for batch_idx, (data, label) in pbar: # 6. 下一次迭代会继续从分发开始
if args.cuda:
data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上
data_v = Variable(data)
target_var = Variable(label)
prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出
# 到目前为止,我们完成了DataParallel.forward()
#这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在在前向传播里
#前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果和到主gpu里
criterion = nn.CrossEntropyLoss()
loss = criterion(prediction,target_var) # 3. 在默认GPU之上计算loss
optimizer.zero_grad()
loss.backward() # 4. 开始反向传播
optimizer.step() # 5. 更新模型
0x04 Сводка
Подытожим процесс: сначала данные и модель помещаются в GPU по умолчанию, то есть GPU 0, а затем итерация выглядит следующим образом:
Это соответствует номерам на рисунке ниже.
- scatter будет распространять данные на другие графические процессоры.
- replicate распространит модель на другие графические процессоры.
- parallel_apply запускает несколько потоков для прямого вычисления.
- Gather соберет выходные данные вычислений в GPU 0.
- GPU 0 вычислит потери.
- Распределяйте градиенты по другим графическим процессорам.
- Модель вызывает обратное вычисление.
- Слияние градиентов с GPU 0.
- optimizer.step обновляет модель.
+-----+ +-------+
|GPU1 | | GPU1 |
main thread +-----+ +-------+
+-----> Forward----> scatter +--------------> replicate-------> parallel_apply +--------> gather +---------+
+ + + |
1 | 2 | 3 | |
| | | |
| +---------+----------+---+ | |
| | | | | |
+---------+----------+ | +--------------------+ |
| | | | | | | | | |
| | 2 | | 2 | | 2 thread|1 thread 2 thread 3 |
1 | | 1 | | 1 | | | | | |
| v | v | v | | | |
v v v v v v |
+--+---+ +--+---+ +--+---+ +--+---+ +--+---+ +--+---+ +-------+ |
| GPU1 | | GPU2 | | GPU3 | | GPU1 | | GPU2 | | GPU3 | | GPU1 | |
+------+ +------+ +------+ +--+---+ +-+----+ +---+--+ +-+-+--++ |
| | | ^ ^ ^ |
| | | 4 | | | |
| | ----------^ | | |
| | 4 | | |
| +------------------------+ | |
| | |
+------------------------------------+ |
+------------------------------------------------------------------------------------------------------+
| +------+
| | GPU1 |
| +------+ main thread
+-> loss = criterion(...)+-----> scatter +--------------> model.backward() +----------> reduce gradient +-------> optimizer.step
+ + + +------+ 9
| 5 | 6 | 7 | GPU1 |
| | | +--+---+
| v---------------v +--------------------+ ^
| | | | | | | | 8
| | | | thread 1 thread 2 thread 3 |
| | | | + | | +-------------+
| | | | | | | | | |
v v v v v v v | | |
+--+---+ +---+-+ +--+--+ +-+---+ +--+--+ +---+--+ +--+--+ +--+--+ +-+--+ +-+---+
| GPU1 | | GPU1| | GPU2| |GPU3 | | GPU1| | GPU2 | |GPU3 | | GPU1| |GPU2| | GPU3|
+------+ +-----+ +-----+ +-----+ +-----+ +------+ +-----+ +-----+ +----+ +-----+
Телефон такой:
На этом анализ DP завершен, и в следующей статье мы представим некоторые сведения о DDP.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel
Понимать распределенное обучение интерпретации исходного кода PyTorch?
обсуждение.py torch.org/he/data пар AL…
[Оригинал][глубина][PyTorch] Вторая часть серии DDP: принцип реализации и анализ исходного кода
Pytorch-CUDA от входа до отказа (2)
DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения