0x00 сводка
В предыдущих статьях мы представили базовые знания о параллелизме конвейера PyTorch, механизме автоматической балансировки, сегментации данных и т. д. В этой статье мы объединяем содержание статьи, чтобы увидеть, как реализовать конвейерные зависимости. -зависимости устройств между этими отношениями небольших партий.
Ссылки на другие статьи о конвейерном параллелизме:
[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe(3) -- перерасчет
[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения
[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (6) --- Стратегия 1F1B
[Анализ исходного кода] Параллельная реализация конвейера PyTorch (1) -- базовые знания
[Анализ исходного кода] Параллельная реализация конвейера PyTorch (2) — как разделить модель
[Анализ исходного кода] Параллельная реализация конвейера PyTorch (4) - прямой расчет
Изображения в этой статье взяты из бумаги и исходного кода github.
0x01 предыдущий обзор
Чтобы лучше понять эту статью, давайте сначала рассмотрим ключевые части предыдущей статьи.
-
исходный трубопроводСостояние следующее:
- Стратегия конвейерного параллелизма заключается в распределении задач в соответствии с индексом раздела j так, чтобы раздел j целиком находился на устройстве j.
- Устройство, содержащее позднюю часть модели, должно дождаться окончания расчета устройства, содержащего раннюю часть модели.
- целевой конвейерСостояние следующее:
-
текущая проблема:
- Если они разбиты на несколько микропакетов, обязательно, что они должны быть завершены до и должны быть завершены до выполнения.
- Вычислительный граф обратного распространения динамически строится во время прямого распространения. PyTorch не записывает графы прямых вычислений и не поддерживает ленту градиентов, а механизм автоградации PyTorch только выполняет графы вычислений в обратном направлении. Это означает, что механизм автозагрузки не может работать точно в порядке, обратном выполнению прямого процесса, если это не предусмотрено структурой графа.
-
текущая сложность:
- Как я могу опубликовать эти связанные с устройством задачи в правильном порядке на каждом устройстве, чтобы избежать задержки выполнения задач на устройстве (асинхронно с ЦП) из-за того, что интерпретатор Python не может запросить их заранее.[Эта статья уже была представлена]
- Как установить кросс-девайсные зависимости между этими небольшими партиями.
-
План реализации:
-
Как обеспечить правильный порядок выполнения?torchgpipe вводит детерминированный тактовый цикл, который дает общий порядок задач.[Эта статья уже была представлена]
-
Как гарантировать динамические явные зависимости в вычислительном графе?Для каждого плана запуска, сгенерированного clock_cycles:
- Используйте функцию забора для вызова «разветвления» и «объединения», чтобы динамически создавать явные зависимости обратного распространения в графе обратных вычислений.
- Вычислить с помощью calculate(schedule, skip_trackers, in_queues, out_queues) .
-
Поскольку схема порядка выполнения была представлена ранее, в этой статье описывается, как вычисляются зависимости.
0x02 Расчетная зависимость
+-----------------------------------------------------------------------------------------+
| |
| Layer 1 +---> Layer 2 +-----> Layer 3 +-----> Layer 4 +-----> Layer 5 +---> Layer 6 |
| |
+--------------------------+---------------------------+----------------------------------+
+
|
|
v
+------------------------------------------------------------------------------------+
| +--------------------+ +---------------------+ +--------------------+ |
| |Partition 1 | |Partition 2 | |Partition 3 | |
| | | | | | | |
| | Layer 1 | +---------> Layer 4 | | | |
| | + | | | + | +-------> Layer 6 | |
| | | | | | | | | | | |
| | v | | | | | | | | |
| | Layer 2 | | | | | | | | |
| | + | | | v | | | | |
| | | | | | Layer 5 +---------+ | | |
| | v | | | | | | |
| | Layer 3 +---------+ | | | | |
| | | | | | | |
| +---------+----------+ +---------+-----------+ +-----------+--------+ |
| |
+------------------------------------------------------------------------------------+
Зачем нужно вычислять зависимости?
- Поскольку модель была многоуровневой, разные ее части разделены на разных устройствах, а данные также разделены на микропакеты, поэтому модель изначально имела линейную зависимость, но теперь она должна стать зависимостью конвейера. Поэтому исходный вычислительный граф не может удовлетворить потребности, поэтому требуются целевые дополнения. Как и на картинке выше, 6 слоев разделены на три раздела.Как построить зависимости между этими тремя разделами?
- Предыдущие линейные зависимости в основном определялись при определении модели, но теперь динамическую зависимость необходимо устанавливать при каждом запуске.
Следовательно, для конвейерного параллелизма torchgpipe необходимо дополнить нативную псевдораспределенную зависимость между устройствами. torchgpipe достигает своей цели, внося различные коррективы в прямые и обратные графы вычислений. Вычислительный граф означает все виды логики зависимостей, а дополнение логики зависимостей дополняется двумя функциями Fork и Join, представленными в этом разделе.
Первоначальный вопрос здесь заключается в том, как Torchgpipe строит граф удаленных обратных вычислений без использования PyTorch RPC и p2p. Позже я обнаружил, что слишком много думал, потому что Torchgpipe не учел эту ситуацию, он нацелен на GPU, которые все находятся на одном хосте, и не предполагает многомашинных вычислений в разных местах.
Torchgpipe — это, по сути, процесс, который запускает несколько потоков для вычислений и является альтернативой DP. Например, в исходном коде есть такое сравнение:
### ResNet-101 Accuracy Benchmark
Batch size | torchgpipe | nn.DataParallel | Goyal et al.
---------- | ---------: | --------------: | -----------:
256 | 21.99±0.13 | 22.02±0.11 | 22.08±0.06
1K | 22.24±0.19 | 22.04±0.24 | N/A
4K | 22.13±0.09 | N/A | N/A
Другой пример явно упоминается в коде:
If you decide not to use checkpointing at all, :class:`nn.DataParallel
<torch.nn.DataParallel>` might be more efficient than GPipe.
0x03 Зависимость обратного распространения
Давайте сначала рассмотрим зависимость обратного распространения, которой посвящена статья.
3.1 Анализ
Вспомним два предыдущих примера.
Рисунок 1
фигура 2
Здесь необходимо выполнить две зависимости:
Поскольку Nuggets недавно допустили ошибку при анализе формулы, они могут отображать их только на картах, а для лучшего отображения они должны их отображать.
3.2 Основные функции
3.2.1 Function
Во-первых, давайте посмотрим, что делает torch.autograd.Function.
Класс torch.autograd.Function фактически является базовым родительским классом функции операции.Такая функция операции должна иметь два основных процесса, а именно процесс прямой операции и процесс обратного деривации.
Если некоторые операции невозможно реализовать с помощью существующих слоев или методов PyTorch, необходимо реализовать новый метод для расширения PyTorch. Когда механизм автоматического вывода не используется и необходимо настроить правила вывода, следует расширить класс torch.autograd.Function. Поскольку pytorch больше не предоставляет механизм автоматического вывода, пользователю необходимо определить процесс расчета для прямого и обратного распространения, то есть «Расширение torch.autograd».
Далее мы представляем ключевой алгоритм обратной зависимости: Fork and Join.
3.2.2 Fork
Вилка — это функция автоградации, которая отображает тензор x в пару (x, ), где — пустой тензор. Расширен метод forktorch.autograd.Function
.
def fork(input: Tensor) -> Tuple[Tensor, Tensor]:
"""Branches out from an autograd lane of the given tensor."""
if torch.is_grad_enabled() and input.requires_grad:
input, phony = Fork.apply(input)
else:
phony = get_phony(input.device, requires_grad=False)
return input, phony
class Fork(torch.autograd.Function):
@staticmethod
def forward(ctx: 'Fork', input: Tensor) -> Tuple[Tensor, Tensor]: # type: ignore
phony = get_phony(input.device, requires_grad=False)
return input.detach(), phony.detach()
@staticmethod
def backward(ctx: 'Fork', grad_input: Tensor, grad_grad: Tensor) -> Tensor: # type: ignore
return grad_input
3.2.3 Join
Join — это функция автоградации, которая отображает пару (x, ) в тензор x , где — пустой тензор. Метод Join также расширенtorch.autograd.Function
.
def join(input: Tensor, phony: Tensor) -> Tensor:
"""Merges two autograd lanes."""
if torch.is_grad_enabled() and (input.requires_grad or phony.requires_grad):
input = Join.apply(input, phony)
return input
class Join(torch.autograd.Function):
@staticmethod
def forward(ctx: 'Join', input: Tensor, phony: Tensor) -> Tensor: # type: ignore
return input.detach()
@staticmethod
def backward(ctx: 'Join', grad_input: Tensor) -> Tuple[Tensor, None]: # type: ignore
return grad_input, None
3.2.4 Phony
Phony — это тензор без пространства, и, поскольку он не требует накопления градиента, в графе autograd можно строить произвольные зависимости.
def get_phony(device: torch.device, *, requires_grad: bool) -> Tensor:
"""Gets a phony. Phony is tensor without space. It is useful to make
arbitrary dependency in a autograd graph because it doesn't require any
gradient accumulation.
.. note::
Phonies for each device are cached. If an autograd function gets a phony
internally, the phony must be detached to be returned. Otherwise, the
autograd engine will mutate the cached phony in-place::
class Phonify(torch.autograd.Function):
@staticmethod
def forward(ctx, input):
phony = get_phony(input.device, requires_grad=False)
return phony.detach() # detach() is necessary.
"""
key = (device, requires_grad)
try:
phony = _phonies[key]
except KeyError:
with use_stream(default_stream(device)):
phony = torch.empty(0, device=device, requires_grad=requires_grad)
_phonies[key] = phony
return phony
3.2.5 detach
В коде часто можно увидеть использование detach, это видно из комментариев к решению ошибки в PyTorch.
# A Python autograd function might fail with this error:
#
# RuntimeError: Returning Variables sharing storage with other Variables
# that require grad is not supported in Python functions. Please submit a
# feature request if you hit this error.
#
# It doesn't look like an essential restriction. But it happens on the
# current PyTorch version. To avoid it, we should detach the tensor before
# returning by identity autograd functions, such as Wait, Fork, and Join.
#
3.3 Использование
В конвейере мы можем увидеть конкретный метод использования.Метод забора (опуская часть кода) использует зависимости для построения зависимостей обратного распространения, чтобы гарантировать, что пакеты [i-1] будут завершены после пакетов [i].
def fence(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
) -> None:
"""Copies micro-batches after computation for the previous
micro-batches.
"""
batches = self.batches
copy_streams = self.copy_streams
skip_layout = self.skip_layout
for i, j in schedule:
# Ensure that batches[i-1] is executed after batches[i] in
# backpropagation by an explicit dependency.
if i != 0:
depend(batches[i-1], batches[i]) # 在这里建立了后向传播依赖关系
next_stream = copy_streams[j][i]
for prev_j, ns, name in skip_layout.copy_policy(j):
prev_stream = copy_streams[prev_j][i]
skip_trackers[i].copy(batches[i], prev_stream, next_stream, ns, name)
if j != 0:
prev_stream = copy_streams[j-1][i]
copy(batches[i], prev_stream, next_stream)
Конкретный код зависимости выглядит следующим образом:
def depend(fork_from: Batch, join_to: Batch) -> None:
fork_from[0], phony = fork(fork_from[0])
join_to[0] = join(join_to[0], phony)
Давайте объединим пример кода для назначения входящих параметров и переинтерпретируем метод следующим образом, чтобы каждый мог лучше его понять.
def depend(batches[i-1]: Batch, batches[i]: Batch) -> None:
batches[i-1][0], phony = fork(batches[i-1][0])
batches[i][0] = join(batches[i][0], phony)
Конкретная логика такова, мост завершается через фальшивку, то есть при прямом распространении пакетов[i] зависит от результата выполнения пакетов[i-1].
+----------------+ +--------------+
| | | |
| batches[i-1] | | batches[i] |
| | | |
+----------+-----+ +-----+--------+
| |
| |
| |
v v
+--------------------------------------------------------+
| depend | | |
| | | |
| | | |
| v | |
| +-----------------------+ | |
| | fork | | | |
| | | get_phony | | |
| | | + | | |
| | | | | | |
| | | | | | |
| +-----------------------+ | |
| | | | |
| | | | |
| | | | |
| v v | |
| +-----------+--+ +--+-----+ | |
| | | | | | |
| | batches[i-1] | | phony | | |
| | | | | | |
| +--------------+ +--+-----+ | |
| | | |
| | | |
| v v |
| +--+------------------+ |
| |Join | | |
| | | | |
| | | | |
| | v | |
| +---------------------+ |
| | |
| | |
| | |
| v |
| +-----+------+ |
| | | |
| | batches[i] | |
| | | |
| +------------+ |
| |
+--------------------------------------------------------+
Давайте объединим несколько пакетов, чтобы увидеть цепочку зависимостей.
+----------------------------------------------------------+
| depend |
| |
| +------------+ |
+------------- | |fork | +-----------+ |
| | | | | | | |
|batches[i] +----------------------> | batches[i]| |
| | | | | | | |
+------------- | | | +-----------+ |
| | | +-------+ |
| | +-----------> | Join | |
| | | | | |
| +------------+ | | |
+------------- | | | +--------------+ |
| | | | | | | |
|batches[i+1]+-------------------------------------------->+ batches[i+1] | |
| | | | | | | |
+---------+--- | | | +--------------+ |
| | +-------+ |
| | |
| +----------------------------------------------------------+
| +----------------------------------------------------------+
| | depend |
| | |
| | +-------------+ |
| | |fork | +------------+ |
| | | | | | |
+--------------------------> |batches[i+1]| |
| | | | | |
| | | +------------+ |
| | | +-------+ |
| | +---------> |Join | |
| +-------------+ | | |
+------------+ | | | +-------------+ |
| | | | | | | |
|batches[i+2]+--------------------------------------------> | batches[i+2]| |
| | | | | | | |
+----------+-+ | | | +-------------+ |
| | +-------+ |
| | |
| +----------------------------------------------------------+
|
| +-----------------------------------------------------------+
| | depend |
| | |
+-----------------------------> ...... |
| |
| |
+-----------------------------------------------------------+
Таким образом, приведенное выше изображение представляет собой график прямого расчета, поэтому при обратном распространении пакеты [i] должны быть завершены до пакетов [i-1].
depend(batches[i-1], batches[i])
Чтобы соответствовать рисунку в статье, мы изменим его на:
depend(batches[i], batches[i+1])
Код зависимости также меняется на:
def depend(batches[i]: Batch, batches[i+1]: Batch) -> None:
batches[i][0], phony = fork(batches[i][0])
batches[i+1][0] = join(batches[i+1][0], phony)
0x04 Зависимость от прямого распространения
Вернемся назад и посмотрим на положительные зависимости. Поскольку частью цели прямого распространения является завершение зависимостей обратного распространения, а в настоящее время обратное распространение только завершает зависимости между строками, а зависимости между столбцами не завершаются, мы теперь завершаем их.
Зависимость между столбцами — это зависимость между устройствами, то есть выход предыдущего устройства — это вход последнего устройства.
4.1 Модель сегментации
Прежде всего, нам все еще нужно рассмотреть, как разделить модель, как вы можете видеть из split_module,
Переменная-член partitions в GPipe имеет тип nn.ModuleList. nn.ModuleList — это контейнер, который хранит разные модули и автоматически добавляет параметры каждого модуля в сеть. Однако nn.ModuleList не определяет сеть, а просто хранит разные модули вместе, последовательности между этими модулями нет, порядок выполнения сети определяется в соответствии с прямой функцией.
def split_module(module: nn.Sequential,
balance: Iterable[int],
devices: List[torch.device],
) -> Tuple[List[nn.Sequential], List[int], List[torch.device]]:
balance = list(balance)
j = 0
partitions = []
layers: NamedModules = OrderedDict()
for name, layer in module.named_children(): # 遍历模型包含的层
layers[name] = layer # 把新的层加入到数组中
if len(layers) == balance[j]: # 如果数组大小等于balance[j],就是达到了device j应该包含的层数
# Group buffered layers as a partition.
partition = nn.Sequential(layers) # 把层数组组合成一个sequential module
device = devices[j]
partition.to(device) # 把层放置到相关设备之上
partitions.append(partition) # 这个新module加入到分区数组中
# Prepare for the next partition.
layers.clear()
j += 1 # 去下一个device看看
partitions = cast(List[nn.Sequential], nn.ModuleList(partitions))
del devices[j:]
return partitions, balance, devices
Возникает следующий вопрос: Sequential можно использовать внутри раздела для выполнения ряда операций вперед, но как настроить порядок выполнения между разделами?
+-----------------------------------------------------------------------------------------+
| |
| Layer 1 +---> Layer 2 +-----> Layer 3 +-----> Layer 4 +-----> Layer 5 +---> Layer 6 |
| |
+-----------------------------------------+-----------------------------------------------+
|
|
|
v
+-----------------------------------------------------------------------------------------+
| +--------------------+ +---------------------+ +--------------------+ |
| |Partition 1 | |Partition 2 | |Partition 3 | |
| | | ??? | | | | |
| | Layer 1 | +----------> Layer 4 | ??? | | |
| | + | | | + | +-------> Layer 6 | |
| | | | | | | | | | | |
| | v | | | | | | | | |
| | Layer 2 | | | | | | | | |
| | + | | | v | | | | |
| | | | | | Layer 5 +------------+ | | |
| | v | | | | | | |
| | Layer 3 +----------+ | | | | |
| | | | | | | |
| +--------------------+ +---------------------+ +--------------------+ |
| |
+-----------------------------------------------------------------------------------------+
4.2 Сборка зависимостей
Начнем с бумаги. Предположим, у нас есть нейронная сеть, состоящая из ряда подсетей. Предположим, что эти подсети равны , а их параметры равны соответственно, тогда вся сеть равна:
Параметр равен , и для ясности мы называем j-е разбиение f и предполагаем, что параметры разбиений взаимно не пересекаются.
При обучении сети методы на основе градиента (такие как стохастический градиентный спуск) должны вычислять выходные данные f (x) сети с учетом мини-пакета обучающих данных x и соответствующих потерь. и градиент g потерь по отношению к параметрам сети. Эти два этапа называются прямым распространением и обратным распространением соответственно.
Поскольку f состоит из своих L слоев подмодулей () последовательно, прямой проход может быть вычислен путем ввода (то есть ввода x), а затем последовательного применения каждого раздела, т. Е. Здесь . Это может быть выражено как:
Итак, мы знаем,Порядок прямого распространения определяется .
Мы можем дополнительно проанализировать код, чтобы увидеть, как реализовать зависимости порядка между разделами.
def run(self) -> None:
"""Runs pipeline parallelism.
It modifies the given batches in place.
"""
batches = self.batches
partitions = self.partitions
devices = self.devices
skip_layout = self.skip_layout
m = len(batches)
n = len(partitions)
skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches]
with spawn_workers(devices) as (in_queues, out_queues):
for schedule in clock_cycles(m, n): # 这里使用,给出了执行序列计划,后续按照这个来执行
self.fence(schedule, skip_trackers)
self.compute(schedule, skip_trackers, in_queues, out_queues)
Цель разбораfor schedule in clock_cycles(m, n)
Это цикл for, который:
Наш вопрос: как мы передаем этот цикл for, чтобы он должен был выполняться до? , то есть как организовать последовательное выполнение обратного распространения? Как завершить зависимость в строке?
Это необходимо проанализировать с помощью исходного кода вычислений. Основные моменты:
- Пакеты[i] здесь изменятся, например, пакеты[0] станут после расчета разделов[j]
batches[0][j]
. - Для метода вычисленияКлюч - это код внизу
batches[i] = batch
. Он состоит в том, чтобы присвоить результат расчета j-го устройства для i-й партии партиям[i].После присвоения партии[i]batches[i][j]
, так что в следующем вычислении строится F[i, j+1], а операция зависимости в следующем заборе выполняется дляbatches[i, j+1]
. - Таким образом, на графике прямого расчета посредством этой операции присваивания пакеты [i, j+1] зависят от пакетов [i, j], поэтому при обратном расчете пакеты [i, j + 1] должны находиться в пакетах [i, j + 1]. j + 1] i, j] завершено раньше.
def compute(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
in_queues: List[InQueue],
out_queues: List[OutQueue],
) -> None:
"""Runs tasks with synchronization to copy streams."""
batches = self.batches
partitions = self.partitions
devices = self.devices
n = len(partitions)
streams = [current_stream(d) for d in devices]
for i, j in schedule: # 针对 schedule 之中的每一对 i,j
batch = batches[i]
partition = partitions[j]
# Synchronize with the copied input. ([1] in the diagram)
# Determine whether checkpointing or not.
if checkpoint:
# 忽略
else:
def compute(batch: Batch = batch,
partition: nn.Sequential = partition,
skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
) -> Batch:
with use_skip_tracker(skip_tracker):
return batch.call(partition) # 前向计算,计算以 partition为单位计算,partition内部的层是顺序计算,由 Sequential保证。
task = Task(streams[j], compute=compute, finalize=None)
del compute
# Compute tasks in parallel. ([2] in the diagram)
in_queues[j].put(task) # 让 worker计算
for i, j in schedule:
ok, payload = out_queues[j].get() # 获取 worker 的前向计算结果,就是 第 j 个device 对 第 i 个 batch 的计算结果
task, batch = cast(Tuple[Task, Batch], payload)
# The copy stream synchronizes to copy the output. ([3] in the
# diagram)
# Finalize tasks. If checkpointing is enabled, here the
# recomputation is scheduled at backpropagation. ([4] in the
# diagram)
# 第 j 个device 对 第 i 个 batch 的计算 就是 F[i,j]
batches[i] = batch # 这里是关键,就是把 第 j 个device 对 第 i 个 batch 的计算结果 赋值到 batches[i],batches[i]就是 batches[i][j],在下次计算时候,构建的就是 F[i,j+1], 下一次 fence 之中的 depend 操作,就是针对 batches[i,j+1]
Что касается этой операции присваивания, соответствующий grad_fn — PermuteBackward, например:
a = torch.tensor([2., 3.], requires_grad=True)
c = a
c.backward(gradient=external_grad)
print(c)
конкретно это:
c = {Tensor: 2} tensor([2., 3.], requires_grad=True)
T = {Tensor: 2} tensor([2., 3.], grad_fn=<PermuteBackward>)
Теперь давайте обновим изображение ниже.
+-------------------------------------------------------------------+
| depend |
| |
| +---------------+ |
| |fork | |
+------------- | | | +-----------+ |
| | | | | | | |
|batches[i] +-------------------------> | batches[i]| |
| | | | | | | |
+------------- | | | +-----------+ |
| | | |
| | | |
| | | +--------+ +-------+ |
| | get_phony +------> | +--->+ Join | |
| | | | phony | | | |
| +---------------+ | | | | |
| +--------+ | | |
| | | |
+------------- | | | +--------------+ |
| | | | | | | |
|batches[i+1]+----------------------------------------------------->+ batches[i+1] | |
| | | | | | | |
+------------- | | | +--------------+ |
| +-------+ |
| |
+-------------------------------------------------------------------+
Разворачиваем по горизонтали и получаем следующее, то есть батч разбивается на два небольших батча: батчи[i], батчи[i+1], которые конвейерно распределяются по двум устройствам partition[j], partitions[j+1], чтобы и строки, и столбцы имели зависимости обратного распространения.
F[i,j] F[i,j+1]
+------------------------------------------------+ +-----------------------------------------------+
| partitions[j] | | partitions[j+1] |
| | | |
| +--------------------+ +------------------+ | | +-------------------+ +------------------+ |
| |fence | | compute | | | | fence | | compute | |
| | | | | | | | | | | |
+--------------+ | | +--------------+ | | +------------+ | | +-----------------+ | | +-------------+ | | +------------+ | | +-----------------+
| | | | | depend | | | |forward | | | | | | | | depend | | | |forward | | | | |
| batches[i] +---------------------------------------------------------> | batches[i][j] +----------------------------------------------------------> | batches[i][j+1] |
| | | | | | | | | | | | | | | | | | | | | | | | | |
+--------------+ | | | | | | | | | | +-----------------+ | | | | | | | | | | +-----------------+
| | | | | | +------------+ | | | | | | | | +------------+ | |
| | | | | | | | | | | | | | | |
+--------------+ | | | | | +------------------+ | +-----------------+ | | | | | +------------------+ | +-------------------+
| | | | | | | | | | | | | | | | | |
| batches[i+1]+---------------------------------------------------------> | batches[i+1][j] +----------------------------------------------------------> | batches[i+1][j+1] |
| | | | | | | | | | | | | | | | | |
+--------------+ | | +--------------+ | | +-----------------+ | | +-------------+ | | +-------------------+
| | | | | | | |
| +--------------------+ | | +-------------------+ |
+------------------------------------------------+ +-----------------------------------------------+
Телефон такой:
0x05 Сводка
Следующий рисунок. То есть модель разбивается на 3 подсети, а мини-пакет разбивается на 4 микропакета. Нижние индексы F и B равны (m, n).
Как показано выше, здесь необходимо выполнить две зависимости:
- межстрочная зависимость, которая является зависимостью между пакетами, которая является зависимостью внутри устройства. На картинке это пунктирная линия, то есть должно быть завершено до , должно быть завершено до .
- межколоночные зависимости, который является зависимостью между разделами (устройствами). Из рисунка это сплошная линия, то есть должно быть выполнено до , то есть первое устройство должно быть выполнено до второго устройства, а выход первого устройства является входом второго устройства.
Как показано на рисунке выше, нам нужно заполнить зависимости строк и столбцов.
- межстрочная зависимостьЭто гарантируется Join & Fork, а установка зависимостей завершается использованием пустых тензоров, чтобы гарантировать, что пакеты [i-1] будут завершены после пакетов [i].
-
межколоночные зависимостичерез
batches[i] = batch
Завершите, используйте PermuteBackward для завершения зависимостей между устройствами.
Пока мы завершили настройку порядка выполнения и зависимостей, в следующей статье мы расскажем, как выполнять параллельную обработку.
0x06 Исправления и дополнения
6.1 Происхождение
друг @Меч Уиллоу ИньфэнБыли подняты вопросы о:
межколоночные зависимостичерез
batches[i] = batch
Завершите, используйте PermuteBackward для завершения зависимостей между устройствами.
Он считает, что, читая исходный код, я лично чувствую, что две автоградации. Функции пользовательского копирования и ожидания определяют зависимости между устройствами.
Я снова внимательно прочитал документ и исходный код и обнаружил, что он верен Мое предыдущее понимание было неверным, и настоящим исправлено следующим образом.
6.2 Содержание статьи
Давайте сначала посмотрим на содержание бумаги.
6.2.1 Последовательность выполнения на уровне устройства
Содержание бумаги следующее:
2.2. Device-wise Execution Order
Подводя итог, можно сказать, что при конвейерном параллелизме (с контрольными точками) каждому устройству назначается набор задач в заданном порядке, каждое устройство будет выполнять заданные задачи одна за другой, как только будут выполнены кросс-девайсные зависимости. недостающий компонент в этой передаче данных изображения между устройствами.Для иллюстрации, полный порядок выполнения, которому должно следовать устройство j, показан на рисунке 3. Здесь операции передачи данных явно обозначены как «прием» и «отправка» для акцента.
Перевод выглядит следующим образом:
Таким образом, при конвейерном параллелизме (с контрольными точками) каждому устройству назначается набор задач в определенном порядке. Как только зависимости между устройствами будут удовлетворены, каждое устройство будет выполнять заданную задачу одно за другим. Однако на предыдущей диаграмме отсутствует компонент передачи данных между устройствами. Для наглядности полная последовательность выполнения, которой должно следовать устройство j, показана на рис. 3. Для лучшей иллюстрации здесь операции передачи данных явно обозначены как «получение» и «отправка».
Конкретная схема выглядит следующим образом:
6.2.2 Параллельные вычисления и копирование
В другой части статьи обсуждается использование потоков.
Concurrent Copy and Computation: Streams
PyTorch issues every device-bound kernels to the default stream, unless it is specified otherwise. Stream is a device- bound sequence of kernels that is executed in order. Kernels in the same stream are guaranteed to be executed in the pre- scribed order, but kernels in different streams can be inter- leaved, and even can overlap when possible. In particular, nearly all CUDA devices with compute capability 1.1 and higher support concurrent copy and execution: data transfer between devices can always overlap with kernel execution.
Из-за проблемы копирования формул этот и следующий абзацы исходного текста переводятся следующим образом:
PyTorch публикует каждое ядро, привязанное к устройству, в поток по умолчанию, если не указано иное. Поток — это последовательность ядер, которые по порядку выполняют эти привязанные к устройству ядра. Ядра в одном потоке гарантированно будут выполняться в заранее заданном порядке, но ядра в разных потоках могут чередоваться друг с другом и даже могут перекрываться. В частности, почти все устройства CUDA с вычислительной мощностью 1.1 и выше поддерживают одновременное копирование и выполнение: передача данных между устройствами всегда перекрывает выполнение ядра.
torchgpipe регистрирует каждое ядро копии в потоке не по умолчанию, сохраняя при этом вычислительные ядра в потоке по умолчанию. Это позволяет обрабатывать устройство j параллельно, т.е.Может использоваться с «Отправить на устройствоиз" и/или "РабприниматьЭти две операции выполняются параллельно. Кроме того, каждое устройство использует отдельный поток для каждого микропакета. Поскольку между разными микропакетами нет реальной зависимости, такое использование потоков является безопасным, что обеспечивает максимально быстрое копирование.
6.2.3 Вывод
Видно, что передача данных осуществляется через Stream, который представляет собой фактическую зависимость между устройствами и может достичь цели параллельных данных и копирования.
6.3 Реализация
Далее давайте рассмотрим конкретную реализацию и по очереди проверим наши выводы.
6.3.1 _copy_streams
_copy_streams определяется следующим образом:
self._copy_streams: List[List[AbstractStream]] = []
Код инициализации выглядит следующим образом, чанки — это количество микропакетов. _ensure_copy_streams предназначен для создания выделенного потока для каждого пакета макросов каждого устройства.
def _ensure_copy_streams(self) -> List[List[AbstractStream]]:
"""Ensures that :class:`GPipe` caches CUDA streams for copy.
It's worth to cache CUDA streams although PyTorch already manages a
pool of pre-allocated CUDA streams, because it may reduce GPU memory
fragementation when the number of micro-batches is small.
"""
if not self._copy_streams:
for device in self.devices:
self._copy_streams.append([new_stream(device) for _ in range(self.chunks)])
return self._copy_streams
Допустим есть 3 устройства, модель разбита на 3 подсети, а мини-партия разбита на 4 микро-партии. Особенности таковы: т._copy_streams[i][j]
Среди них i представляет последовательность устройств, а j представляет последовательность пакетов. (В последующих статьях есть подробности как пользоваться)
+----------------------------------+
| _copy_streams |
| |
| +----------------------+ |
| | | |
| | [1,1] [1,2] [1,3]+--------------------------------+
| | | | |
| | [2,1] [2,2] [2,3]+------------------------------------------+
| | | | | |
+-------------------------+[3,1] [3,2] [3,3] | | | |
| | | | | | |
| | +----------------------+ | | |
| | | | |
| +----------------------------------+ | |
| | |
| v |
| +------------------------------------------------------------------------+------+ |
| | Stream of device 1, Stream of device 1, Stream of device 1, Stream of device 1| |
| +-------------------------------------------------------------------------------+ |
| |
| +-------------------------------------------------------------------------------+ |
| | Stream of device 2, Stream of device 2, Stream of device 2, Stream of device 2+<-+
| +-------------------------------------------------------------------------------+
|
| +-------------------------------------------------------------------------------+
+-->+ Stream of device 3, Stream of device 3, Stream of device 3, Stream of device 3|
+-------------------------------------------------------------------------------+
6.3.2 Оператор копирования
class Copy(torch.autograd.Function):
"""Copies tensors on specific streams."""
@staticmethod
def forward(ctx: Context, # type: ignore
prev_stream: AbstractStream,
next_stream: AbstractStream,
*input: Tensor,
) -> Tensors:
# 这里会把拷贝操作的source,dst 都保存在上下文之中,反向操作时候会取出来
ctx.prev_stream = prev_stream
ctx.next_stream = next_stream
output = []
output_stream = current_stream(get_device(next_stream))
with use_stream(prev_stream), use_stream(next_stream):
for x in input:
y = x.to(get_device(next_stream)) # 进行拷贝操作
output.append(y)
# 'prev_stream' is not where 'x' has been allocated.
record_stream(x, prev_stream)
# 'y' has been allocated on 'next_stream'.
# It might be used on the current stream captured as 'output_stream'.
record_stream(y, output_stream)
return tuple(output)
@staticmethod
def backward(ctx: Context,
*grad_output: Tensor,
) -> Tuple[Optional[Tensor], ...]:
# 取出来上下文保存的拷贝操作的src,dst。
prev_stream = ctx.prev_stream
next_stream = ctx.next_stream
grad_input: Deque[Tensor] = deque(maxlen=len(grad_output))
input_stream = current_stream(get_device(prev_stream))
with use_stream(prev_stream), use_stream(next_stream):
for x in reversed(grad_output):
y = x.to(get_device(prev_stream)) # 进行拷贝操作
grad_input.appendleft(y)
# 'next_stream' is not where 'x' has been allocated.
record_stream(x, next_stream)
# 'y' has been allocated on 'prev_stream'.
# It might be used on the current stream captured as 'input_stream'.
record_stream(y, input_stream)
grad_streams: Tuple[Optional[Tensor], ...] = (None, None)
return grad_streams + tuple(grad_input)
6.3.3 Ожидание оператора
Код оператора Wait следующий, в основном для синхронизации, ожидания завершения операции копирования.
class Wait(torch.autograd.Function):
"""Synchronizes a stream to another stream.
Place it just before you want to start an operation on the next stream,
provided that all operations on the previous stream are done.
"""
@staticmethod
def forward(ctx: Context, # type: ignore
prev_stream: AbstractStream,
next_stream: AbstractStream,
*input: Tensor,
) -> Tensors:
ctx.prev_stream = prev_stream
ctx.next_stream = next_stream
wait_stream(next_stream, prev_stream)
return tuple(x.detach() for x in input)
@staticmethod
def backward(ctx: Context,
*grad_input: Tensor,
) -> Tuple[Optional[Tensor], ...]:
prev_stream = ctx.prev_stream
next_stream = ctx.next_stream
wait_stream(prev_stream, next_stream)
grad_streams: Tuple[Optional[Tensor], ...] = (None, None)
return grad_streams + grad_input
6.3.4 Упаковка
Следующие функции инкапсулируют оператор.
def copy(batch: Batch, prev_stream: AbstractStream, next_stream: AbstractStream) -> None:
batch[:] = Copy.apply(prev_stream, next_stream, *batch)
def wait(batch: Batch, prev_stream: AbstractStream, next_stream: AbstractStream) -> None:
batch[:] = Wait.apply(prev_stream, next_stream, *batch)
6.3.5 Установление зависимостей
Ниже приведен упрощенный код ограждения, который устанавливает две зависимости строк и столбцов в легенде.
def fence(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
) -> None:
"""Copies micro-batches after computation for the previous
micro-batches.
"""
batches = self.batches
copy_streams = self.copy_streams
skip_layout = self.skip_layout
for i, j in schedule:
# Ensure that batches[i-1] is executed after batches[i] in
# backpropagation by an explicit dependency.
if i != 0:
depend(batches[i-1], batches[i]) # 在这里建立了后向传播依赖关系
# 拿到dst设备的拷贝流
next_stream = copy_streams[j][i]
# 残差连接相关设置
for prev_j, ns, name in skip_layout.copy_policy(j):
prev_stream = copy_streams[prev_j][i]
skip_trackers[i].copy(batches[i], prev_stream, next_stream, ns, name)
# 建立跨设备依赖关系,指定了 device[j-1] 的输出是 device[i] 的输入
if j != 0:
prev_stream = copy_streams[j-1][i] # 拿到src设备的拷贝流
copy(batches[i], prev_stream, next_stream) # 建立跨设备依赖关系
Конкретная операция ожидания вызывается в вычислении, и мы даем только часть кода.
def compute(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
in_queues: List[InQueue],
out_queues: List[OutQueue],
) -> None:
"""Runs tasks with synchronization to copy streams."""
batches = self.batches
partitions = self.partitions
devices = self.devices
copy_streams = self.copy_streams
# With checkpointing, the autograd graph looks like this diagram:
# ┌─────┸──────┐
# │ Copy │
# └─────┰──────┘ (fence)
# ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
# ┃ (compute)
# ┌─────┸──────┐
# │ Wait │ [1] Synchronize the current stream with the copy stream.
# └─────┰──────┘
# ┌─────┸──────┐
# │ Checkpoint │ [2] Compute a partition within checkpointing.
# └─────┰──────┘
# ┌─────┸──────┐
# │ Wait │ [3] Synchronize the copy stream with the current stream.
# └─────┰──────┘
# ┠ ─ ─ ─ ┐
# ┃ ┌─────┴─────┐
# ┃ │ Recompute │ [4] Schedule the recomputation at backpropagation.
# ┃ └─────┬─────┘
# ┠ ─ ─ ─ ┘
# ┃
# ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
# ┌─────┸──────┐ (fence)
# │ Copy │
# └─────┰──────┘
for i, j in schedule:
batch = batches[i]
partition = partitions[j]
# Synchronize with the copied input. ([1] in the diagram)
if j != 0:
wait(batch, copy_streams[j][i], streams[j]) # 这里保证了同步完成
6.4 Сравнительная таблица
В документе также есть набор сравнений, которые переведены и выдержки:
Мы также визуализировали временную шкалу каждого графического процессора, чтобы помочь понять роль каждого компонента, как показано на рисунке. Планирование каждого изображения описано ниже.
-
(a) При детерминированных тактовых циклах все ядра излучают в правильном порядке во время прямого распространения. Это объясняется в левой части временной шкалы. Однако из-за отсутствия явного кодирования зависимостей в графе вычислений движок autograd будет обрабатывать микропакеты в неконтролируемом порядке, поэтому временная шкала будет загромождена.
-
(b) Для обратных зависимостей ядра в настоящее время выпускаются в правильном детерминированном порядке при обратном распространении.
-
(c) При использовании потока копирования не по умолчанию копирование и вычисление теперь выполняются параллельно, как показано перекрывающимися синими и красными полосами.
-
(d) Портал удалил ненужные копии, вызванные передачей пропусков тензоров на все промежуточные устройства. По сравнению с (с) длина красных полосок уменьшена.
6.5 Резюме
GPipe необходимо выполнить две зависимости:
- межстрочная зависимость, которая является зависимостью между пакетами, которая является зависимостью внутри устройства. На рисунке это выглядит как пунктирная линия.Должен бытьсделано раньше,Должен бытьсделано раньше.
- межколоночные зависимости, который является зависимостью между разделами (устройствами). На рисунке это сплошная линия, котораяДолжен бытьЗавершается раньше, то есть первое устройство должно завершиться раньше второго устройства, а выход первого устройства является входом второго устройства.
Подстрочная зависимость соответствует статье в:
Стратегия конвейерного параллелизма заключается в назначении задач относительно индекса раздела j так, чтобы j-й раздел полностью находился в j-м устройстве.В дополнение к этому принудительно выполняется выполнение Fi,j перед выполнением Fi+1,j и Bi,j должен быть завершен до выполнения Bi−1,j .
Как показано на рисунке выше, нам нужно заполнить зависимости строк и столбцов.
- межстрочная зависимостьЭто гарантируется Join & Fork, а установка зависимостей завершается использованием пустых тензоров, чтобы гарантировать, что пакеты [i-1] будут завершены после пакетов [i]. PermuteBackward помогает с этой операцией зависимости.
- межколоночные зависимостиЗависимость между устройствами достигается с помощью двух производных операторов копирования и ожидания.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
Учебник по редактированию формул в уценке
docs.NVIDIA.com/Bulky/Bulky-Day…
Обучение CUDA: краткое изложение основ
Использование Stream в очерках CUDA
Accelerating Wide & Deep Recommender Inference on GPUs
HugeCTR: High-Performance Click-Through Rate Estimation Training
обсудить.py torch.org/he/how-to-cheat…
Пишите на torch.org/docs/notes/…