[Анализ исходного кода] Распределенный PyTorch (3) ----- DataParallel (ниже)

машинное обучение искусственный интеллект PyTorch

0x00 сводка

Эта статья является третьей частью распространяемого PyTorch, Продолжая вышеизложенное, она знакомит с параллельной операцией и обратным распространением DataPrallel.

Другие статьи из этой серии:

Автоматическая дифференциация инструментов глубокого обучения (1)

Автоматическая дифференциация инструментов глубокого обучения (2)

Автоматическая дифференциация оружия глубокого обучения (3) --- Пример интерпретации

[Анализ исходного кода] Как PyTorch реализует прямое распространение (1) --- Базовый класс (1)

[Анализ исходного кода] Как PyTorch реализует прямое распространение (2) --- Базовый класс (ниже)

[Анализ исходного кода] Как PyTorch реализует прямое распространение (3) --- конкретная реализация

[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (2) ---- Статическая структура движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (3) ---- Динамическая логика движка

[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм

[Анализ исходного кода] Распространение PyTorch (1) ------ история и обзор

[Анализ исходного кода] Как PyTorch использует GPU

[Анализ исходного кода] Распределенный PyTorch (2) ----- DataParallel (включен)

0x01 Работа в прямом направлении

Давайте сначала вспомним текущий форвард-граф, реплицируем вызовы Broadcast.forward и сохраним input_device и num_inputs в его контексте.

+----------------------------------------------------------------------------------------+
| DataParallel.forward                                                                   |
|                                                                                        |
|                                                                                        |
|              replicate +--------------->   parallel_apply             gather           |
|                                                                                        |
+----------------------------------------------------------------------------------------+
​
     +---------------------------+
     | Broadcast                 |
     |                           |
     |                           |
     |                           |
     |          forward()  +----------->
     |                           |
     |                           |
     |  +---------------------+  |
     |  | ctx                 |  |
     |  |       input_device  |  |
     |  |                     |  |
     |  |       num_inputs    |  |
     |  |                     |  |
     |  +---------------------+  |
     |                           |
     |                           |
     |                           |
     |                           |
     |                           |
     |                           |
     +---------------------------+
​

1.1 параллелизм

В настоящее время мы использовали функцию Scatter для размещения и копирования данных с устройства [0] на разные карты, а также функцию Replicate для копирования модели с устройства [0] на разные карты, чтобы каждая карта имела одинаковую модель и разные данные, теперь надо звонить вперед для расчета потерь и градиента соответственно. Это часть parallel_apply.

# 分发数据
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)      
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
​
# 并行训练
outputs = self.parallel_apply(replicas, inputs, kwargs)

Нашему графику распространения соответствует:

parallel_apply реализуется на основе многопоточности, используя заранее подготовленную реплику и входные данные, затем цикл for запускает многопоточность для прямого распространения и, наконец, выводит результат распространения.

def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):

    # 确保模型和输入大小一致
    assert len(modules) == len(inputs)

    # 确保每个 GPU 都有相应的元数据,如没有就空白补全
    if kwargs_tup is not None:
        # 在前面已经补全
        assert len(modules) == len(kwargs_tup)
    else:
        kwargs_tup = ({},) * len(modules)

    # 确保模型数目和CPU数目一致    
    if devices is not None:
        assert len(modules) == len(devices)
    else:
        devices = [None] * len(modules)

    devices = [_get_device_index(x, True) for x in devices]

    # 基于threading多线程实现
    lock = threading.Lock()
    results = {}
    grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()

    # 定义 worker
    def _worker(i, module, input, kwargs, device=None):
        torch.set_grad_enabled(grad_enabled)
        if device is None:
            device = get_a_var(input).get_device()
        try:
            # 设置当前的设备
            with torch.cuda.device(device), autocast(enabled=autocast_enabled):
                # this also avoids accidental slicing of `input` if it is a Tensor
                if not isinstance(input, (list, tuple)):
                    input = (input,)
                output = module(*input, **kwargs) # 前向操作
            with lock:
                # 并行计算得到输出
                results[i] = output
        except Exception:
            with lock:
                results[i] = ExceptionWrapper(
                    where="in replica {} on device {}".format(i, device))

    if len(modules) > 1:
        # 如有一个进程控制多个 GPU ,起多个线程
        # 注意,这里就是每个 worker 调用了 modules 数组中的一个模型copy
        threads = [threading.Thread(target=_worker,
                                    args=(i, module, input, kwargs, device))
                   for i, (module, input, kwargs, device) in
                   enumerate(zip(modules, inputs, kwargs_tup, devices))]

        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
    else:
        # 一个GPU对应一个进程
        _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])

    outputs = []
    for i in range(len(inputs)):
        output = results[i]

        # error handle
        if isinstance(output, ExceptionWrapper):
            output.reraise()
        outputs.append(output)
        
    # 输出 n 个计算结果
    return outputs

В это время прямое распространение соответствует следующему рисунку Теперь параллельная операция вызывает прямой метод модуля.

+----------------------------------------------------------------------------------------+
| DataParallel.forward                                                                   |
|                                                                                        |
|                  1                               2                      3              |
|              replicate +--------------->   parallel_apply             gather           |
|                                                                                        |
+----------------------------------------------------------------------------------------+
​
     +---------------------------+       +-------------------+
     | Broadcast                 |       | module            |
     |                           |       |                   |
     |                           |       |                   |
     |              1            |       |         2         |
     |          forward()  +-----------> |      forward() +--------->
     |                           |       |                   |
     |                           |       |                   |
     |  +---------------------+  |       |                   |
     |  | ctx                 |  |       |                   |
     |  |       input_device  |  |       |                   |
     |  |                     |  |       |                   |
     |  |       num_inputs    |  |       |                   |
     |  |                     |  |       |                   |
     |  +---------------------+  |       |                   |
     |                           |       |                   |
     |                           |       |                   |
     |                           |       |                   |
     |                           |       |                   |
     |                           |       |                   |
     |                           |       |                   |
     +---------------------------+       +-------------------+
​

1.2 Gather

В настоящее время мы использовали функцию Scatter для размещения и копирования данных с устройства [0] на разные карты, а также функцию Replicate для копирования модели с устройства [0] на разные карты, чтобы каждая карта имела одинаковую модель и различные данные, а затем вызов вперед для расчета потерь и градиента соответственно. Это часть parallel_apply.

Все, что нам нужно сделать сейчас, — это объединить градиенты распределенных вычислений в устройство [0], которым является self.output_device.

# 分发数据
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)      
# 分发模型
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
# 并行训练
outputs = self.parallel_apply(replicas, inputs, kwargs)
# 收集到 devices[0]
return self.gather(outputs, self.output_device)

Нашему графику распространения соответствует:

Давайте посмотрим, как результаты собираются в устройство [0] и как устройство [0] действует как сервер параметров.

1.2.1 Мир Python

Gather в основном вызывает Gather.apply(target_device, dim, *outputs) для завершения работы по сбору данных.

def gather(outputs, target_device, dim=0): # target_device 就是 device[0]
    r"""
    Gathers tensors from different GPUs on a specified device
      (-1 means the CPU).
    """
    def gather_map(outputs):
        out = outputs[0]
        if isinstance(out, torch.Tensor):
            return Gather.apply(target_device, dim, *outputs) # 调用下面的 Gather
        if out is None:
            return None
        if isinstance(out, dict):
            return type(out)(((k, gather_map([d[k] for d in outputs]))
                              for k in out))
        return type(out)(map(gather_map, zip(*outputs)))
​
    # Recursive function calls like this create reference cycles.
    # Setting the function to None clears the refcycle.
    try:
        res = gather_map(outputs)
    finally:
        gather_map = None
    return res

Gather вызывает comm.gather для завершения работы, а comm.gather ведет нас в мир C++.

Мы опускаем некоторые коды проверки.

# Gather 源码
class Gather(Function):
​
    @staticmethod
    def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0]
​
        # 下面会往 context 内部存放几个变量,后续会用到
        target_device = _get_device_index(target_device, True)
        ctx.target_device = target_device
        ctx.dim = dim
        ctx.input_gpus = tuple(i.get_device() for i in inputs)
​
        if all(t.dim() == 0 for t in inputs) and dim == 0:
            inputs = tuple(t.view(1) for t in inputs)
            ctx.unsqueezed_scalar = True
        else:
            ctx.unsqueezed_scalar = False
            
        ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
        return comm.gather(inputs, ctx.dim, ctx.target_device) # 这里会进入C++世界
​
    @staticmethod
    def backward(ctx, grad_output): # 注意,这里后续会用到
        scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
        if ctx.unsqueezed_scalar:
            scattered_grads = tuple(g[0] for g in scattered_grads)
        return (None, None) + scattered_grads

Теперь форвардный расчет выглядит следующим образом:

Gather вызывает форвардную функцию Gather, а форвардный метод сохраняет три переменные input_gpus, input_sizes, dim в ctx, которые будут использоваться позже.

+-----------------------------------------------------------------------------------------+
| DataParallel.forward                                                                    |
|                                                                                         |
|                  1                               2                           3          |
|              replicate +--------------->   parallel_apply +--------------> gather       |
|                                                                                         |
+-----------------------------------------------------------------------------------------+
​
     +---------------------------+       +-------------------+       +--------------------+
     | Broadcast                 |       | module            |       |Gather              |
     |                           |       |                   |       |                    |
     |                           |       |                   |       |                    |
     |              1            |       |         2         |       |         3          |
     |          forward()  +-----------> |      forward() +--------> |      forward()     |
     |                           |       |                   |       |                    |
     |                           |       |                   |       |                    |
     |  +---------------------+  |       |                   |       | +----------------+ |
     |  | ctx                 |  |       |                   |       | |ctx             | |
     |  |       input_device  |  |       |                   |       | |     input_gpus | |
     |  |                     |  |       |                   |       | |                | |
     |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
     |  |                     |  |       |                   |       | |                | |
     |  +---------------------+  |       |                   |       | |     dim        | |
     |                           |       |                   |       | +----------------+ |
     |                           |       |                   |       |                    |
     |                           |       |                   |       |                    |
     |                           |       |                   |       |                    |
     |                           |       |                   |       |                    |
     |                           |       |                   |       |                    |
     +---------------------------+       +-------------------+       +--------------------+
​

1.2.2 Мир C++

Функция сбора вызывает _gather_out_impl для завершения операции копирования.

at::Tensor gather(
    at::TensorList tensors,
    int64_t dim,
    c10::optional<int32_t> destination_index) { // destination_index 就是 device[0] 的index
​
  int64_t total_size = 0;
  auto& first = tensors.front();
  const auto first_size = first.sizes();
  dim = at::maybe_wrap_dim(dim, first);
  std::vector<int64_t> expected_size(first_size.begin(), first_size.end());
  auto memory_format = first.suggest_memory_format();
  for (size_t i = 0; i < tensors.size(); i++) {
    const auto& tensor = tensors[i];
    expected_size[dim] = tensor.size(dim);
    total_size += tensor.size(dim);
    if (memory_format != MemoryFormat::Contiguous &&
        tensor.suggest_memory_format() != memory_format) {
      memory_format = MemoryFormat::Contiguous;
    }
  }
  expected_size[dim] = total_size;
  at::Device device(DeviceType::CPU);
  // 根据 index 得到输出的目标设备
  if (!destination_index || *destination_index != -1) {
    // device 就是 GPU 0 这个设备
    device = at::Device(
        DeviceType::CUDA, destination_index ? *destination_index : -1);
  }
​
  //首先,构建一个空的目标tensor建立在目标设备之上,命名为result
  at::Tensor result =
      at::empty(expected_size, first.options().device(device), memory_format);
  
  return _gather_out_impl(tensors, result, dim); // 然后对result进行gather
}

_gather_out_impl выполняет определенную операцию сбора, которая заключается в копировании входных тензоров в целевой тензор, то есть в GPU0.

// ***************** Gather *******************
//
// Gather a list of CUDA tensors on one or more devices to a target tensor or
// device, either CPU or CUDA.
​
// no checks
static inline at::Tensor& _gather_out_impl(
    at::TensorList tensors,
    at::Tensor& out_tensor,
    int64_t dim) {
  std::vector<int64_t> chunk_sizes;
  chunk_sizes.reserve(tensors.size());
  for (auto& tensor : tensors) {
    chunk_sizes.push_back(tensor.size(dim));
  }
  auto chunks =
      out_tensor.split_with_sizes(/*split_sizes=*/chunk_sizes, /*dim=*/dim);
  for (size_t i = 0; i < tensors.size(); i++) { // 拷贝到GPU 0 之上
    chunks[i].copy_(tensors[i], /*non_blocking=*/out_tensor.is_cuda());
  }
  return out_tensor;
}

0x02 Рассчитать потери

Теперь, когда мы собрали градиенты на устройстве [0], мы запускаем обратное распространение, общая логика которого показана выше. Во-первых, вычислить потери на устройстве [0] . По сути, потеря вычисления на этом шаге является промежуточным звеном между прямым вычислением и обратным распространением, здесь оно рассматривается как начало обратного распространения, как показано на следующем рисунке.

Давайте узнаем пример кода и посмотрим ключевые моменты:

  1. Данные были размещены на графическом процессоре по умолчанию, то есть на графическом процессоре 0.
  2. прогнозирование — это результат прямого вычисления при сборе на GPU 0.
  3. использоватьloss = criterion(prediction,target_var)Вычислительные потери поверх графического процессора по умолчанию.
  4. Используйте loss.backward(), чтобы начать обратное распространение.
for batch_idx, (data, label) in pbar:   
    if args.cuda:
        data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上
    data_v = Variable(data)
    target_var = Variable(label)
    prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出
    
    # 到目前为止,我们完成了DataParallel.forward()
    #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里
    #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果聚合到主gpu里

    criterion = nn.CrossEntropyLoss()
    loss = criterion(prediction,target_var)  # 3. 在默认GPU之上计算loss
    optimizer.zero_grad()
    loss.backward()   # 4. 开始反向传播
    optimizer.step()

0x03 Обратное распространение

Мы запускаем вышеуказанную часть Forward, вычисляем потери, а затем запускаем приведенный выше код.loss.backward()часть.

3.1 Распределение градиентов

Сначала мы подошли к части распределения градиентов, то есть к распределению потерь между графическими процессорами, чтобы их можно было распространять обратно независимо на каждом графическом процессоре. Соответствует следующему рисунку:

3.1.1 Gather.backward

Как упоминалось ранее, прогнозирование — это результат прямого вычисления при сборе на GPU 0. И потери рассчитываются на основе предсказания, поэтому изloss.backward()Запускаем обратное распространение, первым шагом от заднего к переднему является операция распространения «сбора», которая соответствует обратной функции «Сбор», код ядра которой — Scatter.apply.

class Gather(Function):
​
    # 这里前向传播用到了,为了对照,我们依然贴出来
    @staticmethod
    def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0]
​
        # 下面会往 context 内部存放几个变量,后续会用到
        target_device = _get_device_index(target_device, True)
        ctx.target_device = target_device
        ctx.dim = dim
        ctx.input_gpus = tuple(i.get_device() for i in inputs)
​
        if all(t.dim() == 0 for t in inputs) and dim == 0:
            inputs = tuple(t.view(1) for t in inputs)
            ctx.unsqueezed_scalar = True
        else:
            ctx.unsqueezed_scalar = False
            
        ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
        # 这里会进入C++世界,把输出聚集到 GPU 0。
        return comm.gather(inputs, ctx.dim, ctx.target_device) 
​
    @staticmethod
    def backward(ctx, grad_output): # 这里现在后向传播用到了!
        # 把前向传播在 context 之中存放的变量取出,作为 Scatter 的输入 
        scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
        if ctx.unsqueezed_scalar:
            scattered_grads = tuple(g[0] for g in scattered_grads)
        return (None, None) + scattered_grads

Подробности следующие: вы можете видеть, что обратное использует ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output, сохраненные в предыдущем прямом распространении, для вызова Scatter.apply.

На рисунке вверху показан процесс прямого распространения, внизу — процесс обратного распространения, а в середине — некоторые модули кода, используемые в прямом и обратном распространении.

+--------------------------------------------------------------------------------------+
| DataParallel.forward                                                                 |
|                                                                                      |
|               1                               2                           3          |
|           replicate +--------------->   parallel_apply +--------------> gather       |
|                                                                                      |
+--------------------------------------------------------------------------------------+
​
  +---------------------------+       +-------------------+       +--------------------+
  | Broadcast                 |       | module            |       |Gather              |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |              1            |       |         2         |       |         3          |
  |          forward()  +-----------> |      forward() +--------> |      forward()     |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |  +---------------------+  |       |                   |       | +----------------+ |
  |  | ctx                 |  |       |                   |       | |ctx             | |
  |  |       input_device  |  |       |                   |       | |     input_gpus | |
  |  |                     |  |       |                   |       | |                | |
  |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
  |  |                     |  |       |                   |       | |                | |
  |  +---------------------+  |       |                   |       | |     dim        | |
  |                           |       |                   |       | +----------------+ |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |                           |       |                   | <---------+ backward()     |
  |                           |       |                   |       |         3          |
  |                           |       |                   |       |                    |
  +---------------------------+       +-------------------+       +--------------------+
​
+--------------------------------------------------------------------------------------+
| loss.backward()                                                                      |
|                                                                           3          |
|                                                           <--------------------+     |
|                                                                                      |
|                                                                                      |
+--------------------------------------------------------------------------------------+
​

3.1.2 Scatter

Scatter.apply фактически вызывает свой прямой метод.

  • Сначала извлеките ранее сохраненные переменные из контекста, здесь в основном устройства ввода input_device (исходное устройство) и target_gpus (целевое устройство).
  • Получите поток на целевое устройство.
  • Вызовите comm.scatter, чтобы распределить градиент на целевое устройство.
class Scatter(Function):
​
    @staticmethod
    def forward(ctx, target_gpus, chunk_sizes, dim, input):
        target_gpus = [_get_device_index(x, True) for x in target_gpus]
        ctx.dim = dim
        ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
        streams = None
        if torch.cuda.is_available() and ctx.input_device == -1:
            # Perform CPU to GPU copies in a background stream
            streams = [_get_stream(device) for device in target_gpus]
         
        # 分发到其他GPU
        outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
        
        # Synchronize with the copy stream
        if streams is not None:
            for i, output in enumerate(outputs):
                with torch.cuda.device(target_gpus[i]):
                    main_stream = torch.cuda.current_stream()
                    main_stream.wait_stream(streams[i])
                    output.record_stream(main_stream)
        return outputs
​
    @staticmethod
    def backward(ctx, *grad_output):
        return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)

3.1.3 C++

выше код питонаoutputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)Прямо в мир C++. Конкретный код находится в torch/csrc/cuda/comm.cpp.

Роль разброса состоит в том, чтобы разделить тензор и распределить его по потоку каждого устройства.

std::vector<at::Tensor> scatter(
    const at::Tensor& tensor,
    at::IntArrayRef devices,
    const c10::optional<std::vector<int64_t>>& chunk_sizes,
    int64_t dim,
    const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
        streams) {
  dim = at::maybe_wrap_dim(dim, tensor);
  
  // 把tensor进行split
  std::vector<at::Tensor> chunks = chunk_sizes
      ? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
      : tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
  
  at::cuda::OptionalCUDAStreamGuard cuda_guard;
  for (size_t i = 0; i < chunks.size(); ++i) {
    const auto device_index = static_cast<int16_t>(devices[i]);
    if (device_index != tensor.get_device()) {
      if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
        cuda_guard.reset_stream(*(*streams)[i]);
      }
      // 发送给各个设备的流
      chunks[i] = chunks[i].to( 
          {DeviceType::CUDA, device_index},
          /*non_blocking=*/true,
          /*copy=*/false,
          /*memory_format=*/at::MemoryFormat::Preserve);
    }
  }
  return chunks;
}

3.2 Параллельное обратное распространение

Теперь, когда градиент распределен по каждому графическому процессору, следующим шагом является официальное введение параллельного обратного распространения, Эта часть функции заключается в параллельном запуске обратного распространения на каждом графическом процессоре для расчета градиента параметра. Соответствует следующему рисунку:

Эта часть вызывает обратную исходную модель, в частности значение 4 на следующем рисунке:

+--------------------------------------------------------------------------------------+
| DataParallel.forward                                                                 |
|                                                                                      |
|               1                               2                           3          |
|           replicate +--------------->   parallel_apply +--------------> gather       |
|                                                                                      |
+--------------------------------------------------------------------------------------+
​
  +---------------------------+       +-------------------+       +--------------------+
  | Broadcast                 |       | module            |       |Gather              |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |              1            |       |         2         |       |         3          |
  |          forward()  +-----------> |      forward() +--------> |      forward()     |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |  +---------------------+  |       |                   |       | +----------------+ |
  |  | ctx                 |  |       |                   |       | |ctx             | |
  |  |       input_device  |  |       |                   |       | |     input_gpus | |
  |  |                     |  |       |                   |       | |                | |
  |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
  |  |                     |  |       |                   |       | |                | |
  |  +---------------------+  |       |                   |       | |     dim        | |
  |                           |       |                   |       | +----------------+ |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |                           | <---------+  backward()   | <---------+ backward()     |
  |                           |       |          4        |       |         3          |
  |                           |       |                   |       |                    |
  +---------------------------+       +-------------------+       +--------------------+
​
+--------------------------------------------------------------------------------------+
| loss.backward()                                                                      |
|                                                4                          3          |
|                                     <------------------+  <--------------------+     |
|                                                                                      |
|                                                                                      |
+--------------------------------------------------------------------------------------+
​

3.3 Слияние градиентов

Эта часть функции заключается в слиянии градиентов на GPU 0, а общее расширение процесса соответствует следующему рисунку:

3.3.1 Broadcast.backward

Эта часть соответствует обратному распространению широковещания.

class Broadcast(Function):
​
    @staticmethod
    def forward(ctx, target_gpus, *inputs):
        target_gpus = [_get_device_index(x, True) for x in target_gpus]
        
        # 前向传播时候,向上下文存入了一些变量
        ctx.target_gpus = target_gpus
        if len(inputs) == 0:
            return tuple()
        ctx.num_inputs = len(inputs)
        # input 放在 device[0],所以 input_device 就是 GPU 0
        ctx.input_device = inputs[0].get_device()
        # 和 detach 的情形一样
        outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
        non_differentiables = []
        
        # 在上下文中设置哪些不需要梯度
        for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
            if not input_requires_grad:
                for output in outputs:
                    non_differentiables.append(output[idx])
        ctx.mark_non_differentiable(*non_differentiables)
        return tuple([t for tensors in outputs for t in tensors])
​
    @staticmethod
    def backward(ctx, *grad_outputs):
        # 反向传播来到这里,取出之前在上下文存放的变量作为输入。ctx.input_device 就是之前存储的 GPU 0。
        return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)

Следовательно, мы можем расширить блок-схему:

+--------------------------------------------------------------------------------------+
| DataParallel.forward                                                                 |
|                                                                                      |
|               1                               2                           3          |
|           replicate +--------------->   parallel_apply +--------------> gather       |
|                                                                                      |
+--------------------------------------------------------------------------------------+
​
  +---------------------------+       +-------------------+       +--------------------+
  | Broadcast                 |       | module            |       |Gather              |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |              1            |       |         2         |       |         3          |
  |          forward()  +-----------> |      forward() +--------> |      forward()     |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |  +---------------------+  |       |                   |       | +----------------+ |
  |  | ctx                 |  |       |                   |       | |ctx             | |
  |  |       input_device  |  |       |                   |       | |     input_gpus | |
  |  |                     |  |       |                   |       | |                | |
  |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
  |  |                     |  |       |                   |       | |                | |
  |  +---------------------+  |       |                   |       | |     dim        | |
  |                           |       |                   |       | +----------------+ |
  |                           |       |                   |       |                    |
  |                           |       |                   |       |                    |
  |          backward()       | <---------+  backward()   | <---------+ backward()     |
  |              5            |       |          4        |       |         3          |
  |                           |       |                   |       |                    |
  +---------------------------+       +-------------------+       +--------------------+
​
+--------------------------------------------------------------------------------------+
| loss.backward()                                                                      |
|                5                               4                          3          |
|         <------------------------+  <------------------+  <--------------------+     |
|                                                                                      |
|                                                                                      |
+--------------------------------------------------------------------------------------+
​

3.3.2 ReduceAddCoalesced

Broadcast.backward вызывает метод ReduceAddCoalesced.apply, который соответствует прямому методу ReduceAddCoalesced, целью которого является слияние градиента с назначением целевого устройства, которым является GPU 0.

class ReduceAddCoalesced(Function):
​
    @staticmethod
    # 会调用到这里,destination 是GPU 0
    def forward(ctx, destination, num_inputs, *grads): 
        # 从梯度之中提取所在的设备
        ctx.target_gpus = [grads[i].get_device() for i in range(0, len(grads), num_inputs)]
​
        grads_ = [grads[i:i + num_inputs]
                  for i in range(0, len(grads), num_inputs)]
        # 把梯度归并到目标设备 destination,就是GPU 0
        return comm.reduce_add_coalesced(grads_, destination)
​
    @staticmethod
    def backward(ctx, *grad_outputs):
        return (None, None,) + Broadcast.apply(ctx.target_gpus, *grad_outputs)

3.3.3 c++

Посмотрите на комментарии: чтобы добавить градиенты с нескольких графических процессоров, используйте код слияния-добавления.

def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
    """Sums tensors from multiple GPUs.
​
    Small tensors are first coalesced into a buffer to reduce the number
    of synchronizations.
​
    Args:
        inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
            contain tensors from a single device.
        destination (int, optional): a device on which the output will be
            placed (default: current device).
        buffer_size (int): maximum size of the buffer used for coalescing
​
    Returns:
        A tuple of tensors containing an elementwise sum of each group of
        inputs, placed on the ``destination`` device.
    """
    dense_tensors: List[List] = [[] for _ in inputs]  # shape (num_gpus, num_tensors)
    output = []
    ref_order = []
    # process sparse ones first since they may have different sizes on different gpus
    for tensor_at_gpus in zip(*inputs):
        if all(t.is_sparse for t in tensor_at_gpus):
            # 进行归并
            result = reduce_add(tensor_at_gpus, destination)  # this will be sparse too
            output.append(result)
            ref_order.append(tensor_at_gpus[0])
        else:
            for coll, t in zip(dense_tensors, tensor_at_gpus):
                coll.append(t.to_dense() if t.is_sparse else t)
            ref_order.append(dense_tensors[0][-1])
    itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
    # now the dense ones, which have consistent sizes
    for chunks in zip(*itrs):
        flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks]  # (num_gpus,)
        # 进行归并
        flat_result = reduce_add(flat_tensors, destination)
        for t in _unflatten_dense_tensors(flat_result, chunks[0]):
            # The unflattened tensors do not share storage, and we don't expose
            # base flat tensor anyways, so give them different version counters.
            # See NOTE [ Version Counter in comm.*_coalesced ]
            output.append(t.data)
    return tuple(_reorder_tensors_as(output, ref_order))

3.4 Обновление параметров модели

Эта часть функции: обновить параметры градиента. Выполните градиентный спуск и обновите параметры модели на основном графическом процессоре.

Кроме того, поскольку параметры модели обновляются только на ведущем графическом процессоре, а другие подчиненные графические процессоры в это время не обновляются синхронно, необходимо скопировать обновленные параметры модели на оставшиеся подчиненные графические процессоры для достижения параллелизма. Это делается в следующем цикле for, и цикл повторяется.

Соответствующий пример кода:

for batch_idx, (data, label) in pbar:   # 6. 下一次迭代会继续从分发开始
    if args.cuda:
        data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上
    data_v = Variable(data)
    target_var = Variable(label)
    prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出
    
    # 到目前为止,我们完成了DataParallel.forward()
    #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在在前向传播里
    #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果和到主gpu里
​
    criterion = nn.CrossEntropyLoss()
    loss = criterion(prediction,target_var)  # 3. 在默认GPU之上计算loss
    optimizer.zero_grad()
    loss.backward()   # 4. 开始反向传播
    optimizer.step() # 5. 更新模型

0x04 Сводка

Подытожим процесс: сначала данные и модель помещаются в GPU по умолчанию, то есть GPU 0, а затем итерация выглядит следующим образом:

Это соответствует номерам на рисунке ниже.

  1. scatter будет распространять данные на другие графические процессоры.
  2. replicate распространит модель на другие графические процессоры.
  3. parallel_apply запускает несколько потоков для прямого вычисления.
  4. Gather соберет выходные данные вычислений в GPU 0.
  5. GPU 0 вычислит потери.
  6. Распределяйте градиенты по другим графическим процессорам.
  7. Модель вызывает обратное вычисление.
  8. Слияние градиентов с GPU 0.
  9. optimizer.step обновляет модель.
                     +-----+                   +-------+
                     |GPU1 |                   | GPU1  |
main thread          +-----+                   +-------+
 +-----> Forward----> scatter +--------------> replicate------->  parallel_apply  +-------->  gather +---------+
                        +                           +                     +                                    |
                      1 |                         2 |                   3 |                                    |
                        |                           |                     |                                    |
                        |  +---------+----------+---+                     |                                    |
                        |  |         |          |                         |                                    |
                        +---------+----------+  |               +--------------------+                         |
                        |  |      |  |       |  |               |         |          |                         |
                        |  | 2    |  | 2     |  | 2       thread|1     thread 2    thread 3                    |
                      1 |  |    1 |  |     1 |  |               |         |          |                         |
                        |  v      |  v       |  v               |         |          |                         |
                        v         v          v                  v         v          v                         |
                     +--+---+  +--+---+   +--+---+           +--+---+  +--+---+   +--+---+    +-------+        |
                     | GPU1 |  | GPU2 |   | GPU3 |           | GPU1 |  | GPU2 |   | GPU3 |    | GPU1  |        |
                     +------+  +------+   +------+           +--+---+  +-+----+   +---+--+    +-+-+--++        |
                                                                |        |            |         ^ ^  ^         |
                                                                |        |            |   4     | |  |         |
                                                                |        |            ----------^ |  |         |
                                                                |        |                4       |  |         |
                                                                |        +------------------------+  |         |
                                                                |                                    |         |
                                                                +------------------------------------+         |
        +------------------------------------------------------------------------------------------------------+
        |                               +------+
        |                               | GPU1 |
        |                               +------+                                                                     main thread
        +-> loss = criterion(...)+-----> scatter   +-------------->  model.backward() +---------->  reduce gradient +-------> optimizer.step
                     +                      +                               +                          +------+         9
                     | 5                    | 6                             | 7                        | GPU1 |
                     |                      |                               |                          +--+---+
                     |              v---------------v             +--------------------+                  ^
                     |              |       |       |             |         |          |                  | 8
                     |              |       |       |         thread 1    thread 2   thread 3             |
                     |              |       |       |             +         |          |           +-------------+
                     |              |       |       |             |         |          |           |      |      |
                     v              v       v       v             v         v          v           |      |      |
                  +--+---+      +---+-+  +--+--+  +-+---+      +--+--+  +---+--+    +--+--+     +--+--+ +-+--+ +-+---+
                  | GPU1 |      | GPU1|  | GPU2|  |GPU3 |      | GPU1|  | GPU2 |    |GPU3 |     | GPU1| |GPU2| | GPU3|
                  +------+      +-----+  +-----+  +-----+      +-----+  +------+    +-----+     +-----+ +----+ +-----+


Телефон такой:

На этом анализ DP завершен, и в следующей статье мы представим некоторые сведения о DDP.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

torch.optim интерпретации исходного кода PyTorch: подробное объяснение интерфейса алгоритма оптимизации

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel

nn.DataParallel от Pytorch

Понимать распределенное обучение интерпретации исходного кода PyTorch?

обсуждение.py torch.org/he/data пар AL…

[Оригинал][глубина][PyTorch] Вторая часть серии DDP: принцип реализации и анализ исходного кода

Pytorch-CUDA от входа до отказа (2)

Pytorch наступает на яму: разница между присваиванием, поверхностным копированием и глубоким копированием, а также ямы model.state_dict() и model.load_state_dict()

DP и DDP интерпретации исходного кода PyTorch: параллелизм моделей и анализ распределенного обучения