[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (6) --- Стратегия 1F1B

машинное обучение глубокое обучение

0x00 сводка

В предыдущей статье мы представили общую архитектуру PipeDream, этап профиля, этап расчета раздела, этап преобразования модели, механизм выполнения и коммуникационный модуль.Эта статья является последней частью серии PipeDream, знакомящей со стратегией 1F1B. , что является самым большим вкладом PipeDream.

Ссылки на другие статьи о конвейерном параллелизме:

[Анализ исходного кода] Конвейер глубокого обучения, параллельный Gpipe(1) --- Базовая реализация конвейера

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe (2) ----- накопление градиента

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe(3) -- перерасчет

[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(2) --- Вычислительный раздел

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(3) --- модель преобразования

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(4) --- механизм выполнения

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (5) --- коммуникационный модуль

0x01 Сравнение конвейеров

Во-первых, давайте сравним различные конвейеры, проанализированные до сих пор.

1.1 Обычная сборочная линия

Базовой единицей модели DNN является слой. PipeDream делит эти слои DNN на этапы — каждый этап состоит из набора последовательных слоев в модели. PipeDream развертывает разные этапы модели на разных машинах, и у каждого этапа могут быть разные репликации. На этом этапе выполняются прямые и обратные проходы ко всем слоям на этом этапе. PipeDream относится к стадии, содержащей входной слой, как к стадии ввода, а к стадии, содержащей выходной слой, как к стадии вывода.

В простейшем случае, как и при традиционном параллельном обучении моделей, в системе активен только один минипакет. На рисунке выше показана временная шкала вычислений.В этом примере есть четыре машины и конвейер, который можно считать наиболее распространенным конвейером.

  • На прямом этапе каждый этап выполняет прямой проход мини-пакета слоев на этом этапе и отправляет результат на следующий этап. Выходной каскад вычисляет потери минипакета после завершения прямого прохода.
  • На обратном этапе каждый этап формирует обратный канал для передачи потерь на предыдущий этап один за другим.

1.2 Конвейер Gpipe

Поскольку PipeDream улучшен на основе Gpipe, мы также должны рассмотреть его проблемы на основе Gpipe.

Схема параллельного обучения конвейера Gpipe выглядит следующим образом:

  • Обучаемые слои делятся на этапы, каждый этап содержит набор последовательных слоев в модели.
  • Минипакет входных данных разбивается на m микропакетов, как и allreduce, после чего некоторые вычисления передаются следующему узлу, и, наконец, параметры синхронно обновляются.
  • GPipe использует существующие методы, такие как накопление градиента, для оптимизации эффективности использования памяти, подкачка памяти путем сбрасывания хранилища активации между прямым и обратным проходами и их пересчет, когда требуется активация для обратного прохода.

Есть несколько проблем с конвейером Gpipe:

  • Чрезмерные промывки конвейера приводят к увеличению времени простоя.
  • Если m мало, Gpipe может снизить эффективность оборудования из-за накладных расходов на пересчет и частых обновлений конвейера, поэтому m обычно задается большим.
  • Следовательно, необходимость кэширования m активаций приводит к увеличению памяти. Причина в том, что активация промежуточных результатов каждого прямого вычисления микропакета должна использоваться его обратным вычислением, поэтому его необходимо кэшировать в памяти.

1.3 Трубопровод 1F1B

Стратегия PipeDream 1F1B (один прямой проход, за которым следует один обратный проход) может решить проблему количества активаций кеша, так что количество кешей активации связано только с количеством этапов, тем самым дополнительно экономя видеопамять.

Параллельный метод Pipeline заключается в размещении разных слоев модели на разных машинах (узлах) и последовательном выполнении прямого и обратного расчета.

Цель PipeDream — объединить конвейерный параллелизм, параллелизм моделей и параллелизм данных таким образом, чтобы минимизировать общее время обучения. Однако, чтобы этот подход был эффективным для больших моделей DNN и чтобы воспользоваться потенциальными преимуществами конвейерного распараллеливания обучения, PipeDream должен преодолеть три основные проблемы:

  1. Как автоматически разделить работу (уровни модели) по доступным вычислительным ресурсам.
  2. Как запланировать вычисления, чтобы максимизировать пропускную способность, обеспечивая при этом продвижение задач обучения.
  3. Перед лицом асинхронности, вызванной конвейером, как обеспечить эффективность обучения.

Среди них 1F1B соответствует двум последним вызовам.

1.3.1 Идеи

Разбираем идеи стратегии 1F1B.

конечная цельДа: уменьшите количество кэшей активации и уменьшите использование памяти, чтобы можно было обучать более крупные модели.

текущее затруднительное положениеДа: даже если используется метод контрольных точек, активация прямого расчета не будет завершена до завершения соответствующего обратного расчета.

РешенияДа: постарайтесь сократить время хранения каждой активации, то есть для этого требуется, чтобы каждый микропакет данных выполнял обратный расчет как можно раньше, чтобы каждая активация могла быть выпущена как можно раньше.

Уведомление: В PipeDream слово мини-пакет используется в конце, поэтому мы можем думать, что мини-пакет PipeDream — это микро-пакет Gpipe.С этого момента используется мини-пакет.

решениеДа:

  • Пусть последний этап (машина 4 на рисунке ниже) выполняет обратное распространение этого мини-пакета сразу после завершения прямого распространения мини-пакета, чтобы другие этапы могли начать вычисление обратного распространения как можно раньше.Это стратегия 1F1B. Это немного похоже на превращение общей синхронизации в асинхронную для многих небольших блоков данных, и многие небольшие блоки данных обновляются независимо каждым.
  • В стабильном состоянии 1F1B расчеты вперед/назад будут строго чередоваться на каждой машине, так что на каждом GPU будет обрабатываться мини-пакет данных, что обеспечивает высокую загрузку ресурсов (сравните выравнивание всего конвейера, незначительные паузы конвейера, отсутствие конвейерная промывка, гарантирует, что обновления параметров на каждом этапе выполняются в фиксированном цикле)
  • Столкнувшись с асинхронностью, вызванной конвейером, 1F1B использует разные версии весов, чтобы обеспечить эффективность обучения.
  • PipeDream снова расширяет 1F1B.Для этапов, использующих параллелизм данных, режим циклического планирования используется для назначения задач каждому устройству на одном этапе, гарантируя, что расчеты прямого и обратного распространения пакета данных происходят в одном и том же машина, это 1Ф1Б-РР (один-вперед-ное-назад-круговой).

На самом деле стратегия 1F1B состоит в том, чтобы изменить синхронизацию пакета на асинхронность на множестве небольших данных (мини-пакет), и немедленно отменить расчет мини-пакета, и обновить градиент соответствующего воркера после завершения обратного хода мини-пакета. Все рабочие бегут вместе. Можно понять, что реализация BSP стала реализацией ASP.

1.3.2 Иллюстрация

На рисунке ниже показан конвейер, реализующий 1F1B.

  • Разделите партию на несколько мини-партий, например, разделите партию на 4 мини-партии 1, 2, 3 и 4.
  • Вставьте несколько мини-пакетов в конвейер один за другим.
  • Машина 1 сначала вычисляет прямое распространение синего 1, а затем отправляет синий 1 на машину 2 для продолжения расчета.
  • Затем машина 2 вычисляет прямое распространение синего 2, а затем отправляет синий 1 на машину 2 для дальнейших вычислений.
  • Когда синий 1 пересекает Машину 1 ~ 4 сверху вниз, все прямое распространение завершается, поэтому начинается обратное распространение, соответствующее первому зеленому 1, а затем обратно к Машине 3 ~ 1.
  • Когда данные 1 завершают все обратное распространение, то есть зеленый цвет 1 поступает на Машину 1.
  • После того, как каждая машина завершит обратное распространение своего мини-пакета, она локально обновит градиент.
  • Только подмножество модели передается от машины к машине, так что вычисления и связь могут быть распараллелены.

Следует отметить, что на следующем рисунке показаны начальная стадия и стабильная стадия, которые будут упомянуты в наших последующих пояснениях.

0x02 Реализация PipeDream

Сначала приведен пример графика с 4 GPU, а также на графике приведен пример течения времени одного из GPU (Mach. 3). Здесь происходит частичное перекрытие вычислений и коммуникации градиента/активации.

2.1 Общая логика

Давайте возьмем тренировку в качестве примера в сочетании со следующим рисунком для иллюстрации.

Необходимо ввести существительное NOAM, обозначающее количество небольших партий деятельности.

NUM_OPT_ACTIVE_MINIBATCHES (NOAM) = ⌈ (# машин) / (# машин на этапе ввода) ⌉

Следствием является: минимальное количество пакетов, разрешенных для реплики входного этапа, чтобы поддерживать полную загрузку конвейера в устойчивом состоянии на основе разделов, сгенерированных нашим алгоритмом.

На изображении выше показана соответствующая временная шкала вычислений для конвейера, каждый конвейер имеет 4 этапа, работающих на разных машинах, поэтому NOAM для этой конфигурации равен 4.

Подробно разберем ходовые шаги.

  • На этапе запуска (состояние запуска на рисунке) в начале обучения этап ввода сначала считывает достаточное количество минипакетных данных (то есть NOAM), чтобы гарантировать, что, когда конвейер находится в стабильной фазе, каждое устройство имеет соответствующую работу для обрабатываться. . На приведенном выше рисунке входной каскад отправляет четыре мини-пакета для распространения на выходной каскад.
  • Как только выходной каскад завершает прямой проход первой мини-партии (то есть первой синей 1 машины 4), он выполняет обратную передачу той же мини-партии (то есть первой зеленой 1 машины 4). .
  • Затем начните поочередно выполнять прямое и обратное распространение последующих мини-пакетов (то есть 2 до, 2 после, 3 до, 3 после... для машины 4).
  • Когда процесс обратного распространения начинает распространяться на более ранние этапы в конвейере (то есть работа 3 ~ работа 1), каждый этап начинает чередоваться между прямыми и обратными процессами в разных мини-пакетах.
  • В устойчивом состоянии каждая машина занята прямым или обратным распространением небольшой партии.

2.2 Проблема веса

Режим обучения Pipeline внесет два несоответствия параметров, потому что это фактически расчет ASP, без согласования он будет становиться все более и более хаотичным:

  • В нативном конвейере PipeDream прямое распространение каждого этапа выполняется с использованием определенной версии параметров, а обратное распространение выполняется с использованием разных версий параметров, то есть прямое распространение одного и того же мини-пакета. Несовместимо с используемыми параметрами. в обратном распространении. Например, как показано на рисунке выше:

    • Когда мини-пакет 5 входит в рабочий процесс 1, его логика прямого распространения выполняется после расчета обратного распространения мини-пакета 1, то есть параметры, используемые в расчете прямого распространения, являются параметрами, обновленными после расчета обратного распространения мини-пакета 1.
    • Однако логика обратного распространения мини-пакета 5 начинает выполняться после выполнения «мини-пакета 2, мини-пакета 3, мини-пакета 4», то есть в это время используются следующие параметры: «мини-пакет 1, мини-пакет 2, мини-пакет 3, мини-пакет 4». расчет обратного распространения Параметры будут обновлены позже.
    • Это приводит к несогласованности параметров, используемых в прямом и обратном расчетах мини-пакета 5. То есть первая строка Машины 1, Синяя № 5 и Зеленая № 5 должны использовать при расчетах параметры, обновленные после Зеленой № 1.
  • Один и тот же мини-пакет выполняет одну и ту же операцию на разных этапах (одна и та же прямая операция или одно и то же обратное распространение), используя несогласованные версии параметров. Также как показано выше:

    • Для прямой вычислительной части мини-пакета 5 на рабочем потоке 1 (синий 5) его прямая логика выполняется после обратного вычисления мини-пакета 1.
    • Но часть прямого вычисления (синяя 5) мини-пакета 5 на рабочем потоке 2 выполняется после завершения обратного вычисления «мини-пакета 1, мини-пакета 2».
    • Это приводит к несогласованным версиям параметров, используемых в прямом вычислении мини-пакета 5 на двух этапах.

Чтобы решить эти две проблемы, PipeDream использует две технологии сохранения веса и вертикальной синхронизации соответственно.

  • Weight stashing :Для весов поддерживается несколько версий, по одной для каждой активной мини-партии. Каждый этап вычисляется вперед с последней версией весов, обрабатывая входной мини-пакет. После расчета прямого прохода этот параметр будет сохранен для обратного расчета того же минипакета. Сохранение весов гарантирует, что в рамках этапа одна и та же версия параметров модели используется для прямого и обратного распространения данного мини-пакета, но не гарантирует согласованности в использовании параметров модели для данного мини-пакета на всех этапах.
  • Vertical Sync :При поступлении каждого минипакета в пайплайн используются параметры последней версии входного этапа, причем номер версии параметров будет сопровождать весь жизненный цикл данных минипакета, а на каждом этапе используются параметры одной и той же версии (вместо последней версии, как при сохранении веса). Таким образом достигается согласованность параметров между этапами.

2.3 Weight Stashing

В качестве примера возьмем следующий рисунок:

Рабочий 1, работа 2... каждая имеет свой вес, обозначаемый ​,​ .... То есть на графике ​ нижний индекс i представляет i-го рабочего, а верхний индекс (j) представляет j-ю минипартию.

В один этап (каждый рабочий):

  • Каждый обратный проход приводит к обновлению веса, а следующий прямой проход использует последнюю версию доступных весов. То есть вес каждого работника обновляется после того, как происходит новое обратное распространение зеленого цвета. Следующие новые операции должны основываться на этом новом весе.
  • После расчета прямого прохода веса, использованные при прямом проходе, будут сохранены для обратного расчета того же мини-пакета.
  • Сохранение весов гарантирует, что в рамках этапа одни и те же версии параметров модели используются для прямого и обратного распространения данного мини-пакета.

Возьмем приведенную выше картинку в качестве примера:

Синий 5 в первом ряду рабочего 1 зависит от зеленого 1 в том же ряду, предшествующем ему. Когда первый зеленый 1 в строке Worker 1 заканчивается, это означает, что мини-пакет 1 завершил 4 прямых распространения и 4 обратных распространения этого конвейера. Итак, это новая версия веса 1, т. Следовательно, обе мини-партии 5 (синяя вперед и зеленая назад) Работы 1 должны быть рассчитаны на основе новой версии. Поэтому необходимо записать новую версию.

Синяя цифра 5 во втором ряду рабочего 2 зависит от зеленой цифры 2 в той же строке, предшествующей ей. Точно так же, когда заканчивается первая зеленая цифра 2 рабочего потока 1, это означает, что минипакет 2 завершил 4 прямых распространения и 4 обратных распространения этого конвейера. Так же и новая версия веса 2. В это время прямое направление мини-пакета 6 и зеленое обратное направление, не отмеченные на рисунке, должны рассчитываться на основе веса 2 новой версии, поэтому необходимо записать новую версию.

Для рабочего 3, с его точки зрения, его собственные веса должны выполнить два движения вперед и два назад (один раз для рабочего 4, затем второй для рабочего 3). Когда выполняется прямое распространение мини-пакета 5, он был обновлен (обновлен зеленым цветом мини-пакета 3), поэтому его необходимо записать для использования в обратном обновлении мини-пакета 5 в будущем.

И так далее, worker 1 должен записывать каждую новую версию ​, ​,​,​,... . Это вес рабочего 1, соответствующий мини-партиям 1, 2, 3 и 4.

Если это влияет на просмотр из-за проблемы с отображением формулы уценки, переместитеwoo woo woo.cn blog on.com/Ross i XYZ/afraid/…

2.4 Vertical Sync

Текущая проблема: прямой расчет миниванны 5 на воркере 1 использует параметры после обратного распространения 1, но расчет миниванны 5 на воркере 2 использует параметры после обратного распространения 2, и итоговая сводка не будет опять накосячил.?

Vertical SyncМеханизм таков: каждый минипакет(​), поступающий в пайплайн, связан с последней версией весов​ на момент поступления на входной этап пайплайна. Эта информация о версии передается вместе с активациями и градиентами по мере прохождения мини-пакета через этап прямого распространения конвейера. На всех этапах прямой проход вычисляется с использованием сохраненных, а не последней версии параметров, как при сохранении веса. После вычисления обратного распространения с использованием сохраненного , каждый этап применяет обновление веса независимо, создавая последний вес, а затем удаляя его.

Для иллюстрации используйте следующую схему:

Заставить всех рабочих использовать этот рабочий процесс в качестве параметров после обратного распространения мини-пакета 1 при расчете мини-пакета 5, а именно:

Для рабочего 2 используйте текущий этап, зеленый 1 (после обратного распространения 1, обновленные веса текущего этапа), чтобы выполнить прямое распространение 5.

Точно так же для рабочего процесса 3 используйте текущую стадию зеленого 1 (обновленный вес этой стадии после 1 обратного распространения), чтобы выполнить прямое распространение 5. Для работника 4 используйте текущий этап, зеленый 1 (после обратного распространения 1, обновленные веса текущего этапа), чтобы выполнить прямое распространение 5.

Однако эта синхронизация приведет к большому количеству напрасных и бесполезных вычислений. Например, вес 1 используется при обновлении 5, но все веса обратного распространения 2/3/4 вычисляются напрасно, поэтому по умолчанию вертикальная синхронизация не используется. Таким образом, хотя каждый слой не полностью согласован, все параметры действительны из-за наличия накопления веса.

2.5 Буфер

Вот еще одно объяснение обработки буфера.

статус параметра.Для каждого этапа PipeDream в основном поддерживает все параметры, связанные со слоями в памяти графического процессора, которые непосредственно назначены этому этапу. Параметры каждого слоя хранятся отдельно, и каждому слою присваивается уникальный идентификатор. Если этап не реплицирован, PipeDream применит обновление к последней версии данных параметров, хранящихся в памяти графического процессора, когда будут доступны обновления веса в предоставленном буфере графического процессора. Если этап копируется, обновления веса копируются в память хоста и отправляются на сервер параметров. Когда доступны новые версии параметров, как часть схемы хранения веса,Не будуНемедленно удалите предыдущую версию. Данные параметров отбрасываются только после того, как они были отформатированы для обратного прохода с использованием более новых параметров.

Промежуточное состояние.Промежуточным данным для каждого слоя также назначается уникальный идентификатор большого двоичного объекта. При получении промежуточных данных с предыдущего этапа (или с диска в случае этапа ввода) PipeDream копирует промежуточные данные в память GPU и помещает указатель на соответствующий буфер в рабочую очередь. Промежуточные данные, переданные вперед, не отбрасываются до тех пор, пока соответствующий мини-пакет не завершит обратный проход для этого этапа. Промежуточные данные из обратного прохода высвобождаются, когда рабочий процесс машинного обучения завершает их использование, и, при необходимости, после отправки их на следующий этап. Из-за различных требований к промежуточным данным в прямом и обратном проходах этапы в PipeDream обычно управляют промежуточными данными из нескольких версий прямого прохода, в то время как управляют промежуточными данными только из одной версии обратного прохода в текущем выполнении.

код 0x03

3.1 Общий код

Мы используем runtime/translation/main_with_runtime.py для анализа.

Некоторые второстепенные коды опущены ниже.

Общую логику использования среды выполнения можно взять в качестве примера в следующем файле: runtime/translation/main_with_runtime.py. Основная логика такова:

  • Разобрать входные параметры.

  • Загрузите, сгенерируйте модель.

  • Сборка моделей из модулей.

  • Настройте в соответствии с такими параметрами, как размер ввода, размер партии и т. д.

  • Повторите каждый слой модели (пропустите последний слой потерь).

    • Итерируйте вход каждого слоя, строя входной тензор.
    • Выход строится путем вызова прямой функции, соответствующей этапу.
    • Итерируйте вывод каждого слоя, устанавливая его тип и форму.
  • Создайте тип тензора выходного значения.

  • Загрузить файл конфигурации.

  • Создайте среду StageRuntime.

  • Создайте оптимизатор.Здесь оптимизатор использует AdamWithWeightStashing или SGDWithWeightStashing, поэтому используется накопление веса.

  • Загрузить набор данных.

  • Тренируйтесь и сохраните контрольно-пропускной пункт.

Общий код выглядит следующим образом:

def main():
    # 解析输入参数
    global args, best_prec1
    args = parser.parse_args()
​
    # Special case handling for GNMT model
    l2_promote()
​
    torch.cuda.set_device(args.local_rank)
​
    # build tokenizer
    tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))
​
    # define loss function
    criterion = build_gnmt_criterion(
        vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)
​
    # create stages of the model
    # 加载,生成模型
    module = importlib.import_module(args.module)
    args.arch = module.arch()
    # 依据模块来构建模型
    model = module.model(criterion)
​
    # 依据参数进行配置比如输入大小,batch size等
    input_size = [args.max_length_train, args.batch_size]
    training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
                              "input2": input_size, "target": [args.max_length_train * args.batch_size],
                              "target_length": [args.batch_size]}
    dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
              "target": torch.int64, "target_length": torch.int32}
    inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
    target_tensor_names = {"target", "target_length"}
    
    # 遍历模型的每个层(跳过最后loss层)
    for module_id, (stage, inputs, outputs) in enumerate(model[:-1]):  # Skip last layer (loss).
        input_tensors = []
        # 遍历每层的输入,构建输入张量
        for module_input in inputs:
            if module_input in inputs_module_destinations:
                inputs_module_destinations[module_input] = module_id
​
            input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
                                      dtype=dtypes[module_input])#.cuda()
            input_tensors.append(input_tensor)
        #stage.cuda()
        # PyTorch should not maintain metadata for a backward pass on
        # synthetic inputs. Without the following line, the runtime is
        # as much as 1.5x slower in a full DP configuration.
        with torch.no_grad():
            # 通过调用stage对应的forward函数,构建出输出
            output_tensors = stage(*tuple(input_tensors))
        if not type(output_tensors) is tuple:
            output_tensors = [output_tensors]
        # 遍历每层的输出,设置其类型和形状    
        for output, output_tensor in zip(outputs,
                                         list(output_tensors)):
            # output 是 ['out2', 'out1']
            training_tensor_shapes[output] = list(output_tensor.size())
            dtypes[output] = output_tensor.dtype
​
    # 构建输出值张量类型           
    eval_tensor_shapes = {}
    for key in training_tensor_shapes:
        eval_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
        training_tensor_shapes[key] = tuple(
            training_tensor_shapes[key])
​
    # 加载配置文件
    configuration_maps = {
        'module_to_stage_map': None,
        'stage_to_rank_map': None,
        'stage_to_depth_map': None
    }
    if args.config_path is not None:
        json_config_file = json.load(open(args.config_path, 'r'))
        configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
        configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
        configuration_maps['stage_to_rank_map'] = {
            int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
        configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
​
    # 构建一个 StageRuntime
    r = runtime.StageRuntime(
        model=model, distributed_backend=args.distributed_backend,
        fp16=args.fp16, loss_scale=args.loss_scale,
        training_tensor_shapes=training_tensor_shapes,
        eval_tensor_shapes=eval_tensor_shapes,
        training_tensor_dtypes=dtypes,
        inputs_module_destinations=inputs_module_destinations,
        target_tensor_names=target_tensor_names,
        configuration_maps=configuration_maps,
        master_addr=args.master_addr,
        rank=args.rank, local_rank=args.local_rank,
        num_ranks_in_server=args.num_ranks_in_server,
        verbose_freq=args.verbose_frequency,
        model_type=runtime.TRANSLATION,
        enable_recompute=args.recompute)
​
    # stage needed to determine if current stage is the first stage
    # num_stages needed to determine if current stage is the last stage
    # num_ranks needed to determine number of warmup_minibatches in case of pipelining
    args.stage = r.stage
    args.num_stages = r.num_stages
    args.num_ranks = r.num_ranks
    if not is_first_stage():
        args.synthetic_data = True
​
    # define optimizer
    if args.no_input_pipelining:
        num_versions = 1
    else:
        # number of versions is the total number of machines following the current
        # stage, shared amongst all replicas in this stage
        num_versions = r.num_warmup_minibatches + 1
​
    # if specified, resume from checkpoint
    if args.resume:
        checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)
        assert os.path.isfile(checkpoint_file_path)
        print("=> loading checkpoint '{}'".format(checkpoint_file_path))
        checkpoint = torch.load(checkpoint_file_path)
        args.start_epoch = checkpoint['epoch']
        best_prec1 = checkpoint['best_prec1']
        r.load_state_dict(checkpoint['state_dict'])
        print("=> loaded checkpoint '{}' (epoch {})"
                .format(checkpoint_file_path, checkpoint['epoch']))
​
    # TODO: make this configurable by args
    # 建立 optimizer,使用了AdamWithWeightStashing 或者 SGDWithWeightStashing
    use_adam_optimizer = True
    if use_adam_optimizer:
        optimizer = adam.AdamWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
            macrobatch=args.macrobatch)
    else:
        optimizer = sgd.SGDWithWeightStashing(
            modules=r.modules(), master_parameters=r.master_parameters,
            model_parameters=r.model_parameters, loss_scale=args.loss_scale,
            num_versions=num_versions, lr=args.lr, momentum=args.momentum,
            weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)
​
    if args.resume:
        optimizer.load_state_dict(checkpoint['optimizer'])
​
    cudnn.benchmark = True
​
    # 加载 dataset
    train_dataset = LazyParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=False,
        max_size=None)
​
    val_dataset = ParallelDataset(
        src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),
        tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),
        tokenizer=tokenizer,
        min_len=args.min_length_train,
        max_len=args.max_length_train,
        sort=True)
​
    distributed_sampler = False
    if configuration_maps['stage_to_rank_map'] is not None:
        num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])
        if num_ranks_in_first_stage > 1:
            distributed_sampler = True
​
    # TODO: fix random seeds
    train_loader = train_dataset.get_loader(
        batch_size=args.batch_size, seeds=range(args.epochs),
        batch_first=False, shuffle=True,
        bucketing=not args.no_bucketing, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        rank=r.rank_in_stage if r.stage == 0 else 0
    )
​
    val_loader = val_dataset.get_loader(
        batch_size=args.batch_size, batch_first=False,
        shuffle=True, num_workers=args.workers,
        world_size=r.num_ranks_in_first_stage,
        seeds=range(args.epochs),
        rank=r.rank_in_stage if r.stage == 0 else 0
    )
​
    # if checkpoint is loaded, start by running validation
    if args.resume:
        assert args.start_epoch > 0
        validate(val_loader, r, args.start_epoch-1)
​
    # 进行训练,保存checkpoint
    for epoch in range(args.start_epoch, args.epochs):
        if distributed_sampler:
            train_loader.sampler.set_epoch(epoch)
        adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)
​
        # train or run forward pass only for one epoch
        if args.forward_only:
            validate(val_loader, r, epoch)
        else:
            train(train_loader, r, optimizer, epoch)
​
            # evaluate on validation set
            prec1 = validate(val_loader, r, epoch)
            if r.stage != r.num_stages: prec1 = 0
​
            # remember best prec@1 and save checkpoint
            best_prec1 = max(prec1, best_prec1)
​
            should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0
            if args.checkpoint_dir and should_save_checkpoint:
                save_checkpoint({
                    'epoch': epoch + 1,
                    'arch': args.arch,
                    'state_dict': r.state_dict(),
                    'best_prec1': best_prec1,
                    'optimizer' : optimizer.state_dict(),
                    'tokenizer': tokenizer.get_state()
                }, args.checkpoint_dir, r.stage, epoch)

3.2 Функция обучения

Давайте посмотрим на код поезда функции обучения ниже.

  • Сначала войдите в фазу прогрева при запуске, которую необходимо выполнять до тех пор, пока выходные данные не завершат прямое распространение первого небольшого пакета, соответствующего состоянию запуска на рисунке выше.

  • Затем начните поочередно выполнять прямое и обратное распространение последующих небольших пакетов.С этого момента введите устойчивое состояние на рисунке выше.На каждом этапе для каждого небольшого пакета:

    • Реализуйте прямое распространение с целью передачи мини-пакетов нижестоящим рабочим процессам. Это 1ф.
    • Если это последний этап, обновите потерю.
    • Градиент очищен.
    • Загрузите сохраненные веса.
    • обратное распространение. Это 1Б.
    • Восстановите последние веса. В настоящее время на этом этапе 1F1B завершен.
    • Перейти к следующему шагу.
  • Наконец, остается оставшийся проход назад, который соответствует проходу вперед в фазе разминки.

def train(train_loader, r, optimizer, epoch):
    batch_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to train mode
    n = r.num_iterations(loader_size=len(train_loader))
    if args.num_minibatches is not None:
        n = min(n, args.num_minibatches)
    r.train(n)
    if not is_first_stage(): train_loader = None
    r.set_loader(train_loader)

    end = time.time()
    epoch_start_time = time.time()

    if args.no_input_pipelining:
        num_warmup_minibatches = 0
    else:
        num_warmup_minibatches = r.num_warmup_minibatches

    # start num_warmup_minibatches forward passes
    # 启动热身阶段,需要一直执行到 输出完成第一个小批次的前向传播,对应上图的Start State。
    for i in range(num_warmup_minibatches):
        r.run_forward() # 前向传播,就是1F

    # 开始交替执行后续小批次的前向传播和后向传播,从此时开始,进入到上图的 Steady State。
    for i in range(n - num_warmup_minibatches):
        # perform forward pass
        r.run_forward() #前向传播,就是1F

        if is_last_stage(): # 最后阶段
            # measure accuracy and record loss
            output, target, loss, num_tokens = r.output, r.target, r.loss.item(), r.num_tokens()
            losses.update(loss, num_tokens) # 更新损失

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()
            epoch_time = (end - epoch_start_time) / 3600.0
            full_epoch_time = (epoch_time / float(i+1)) * float(n)
        else:
            # print log,省略

        # perform backward pass
        if args.fp16:
            r.zero_grad() # 梯度清零
        else:
            optimizer.zero_grad() # 梯度清零
            
        optimizer.load_old_params() # 加载 stash weight

        r.run_backward() # 后向传播,就是1B
        
        optimizer.load_new_params() # 恢复新的weight
        
        optimizer.step() # 下一次训练,同时更新参数

    # finish remaining backward passes
    # 最后剩余的后向传播,对应着热身阶段的前向传播
    for i in range(num_warmup_minibatches):
        optimizer.zero_grad()
        optimizer.load_old_params() # 加载 stash weight
        r.run_backward() # 后向传播,就是1B
        optimizer.load_new_params() # 恢复新的weight
        optimizer.step() # 下一次训练

    # wait for all helper threads to complete
    r.wait()

r вышеуказанного параметра имеет тип StageRuntime, поэтому давайте посмотрим в нем на run_forward и run_backward.

3.3 Прямое распространение

Ниже приведены методы run_forward и _run_forward класса StageRuntime, которые завершают прямое распространение.

   def run_forward(self, recompute_step=False):
        """Run forward pass.
        """
        # Receive tensors from previous worker.
        self.receive_tensors_forward() # 接受上一阶段的张量
        tensors = self.tensors[-1]
​
        # Run forward pass.
        self._run_forward(tensors) # 进行本阶段前向传播计算
​
        # Send tensors forward.
        self.send_tensors_forward() # 发送给下一阶段
        self.forward_stats.reset_stats()
        self.forward_minibatch_id += 1
​
    def _run_forward(self, tensors):
        # Perform forward pass through model (self.modules_with_dependencies already
        # has modules in topological order).
        
        # 得到module和对应的输入,输出
        modules = self.modules_with_dependencies.modules()
        all_input_names = self.modules_with_dependencies.all_input_names()
        all_output_names = self.modules_with_dependencies.all_output_names()
        
        # 遍历模块
        for i, (module, input_names, output_names) in \
                enumerate(zip(modules, all_input_names, all_output_names)):
            if i == (len(modules) - 1) and self.is_criterion: 
                # 如果是计算损失
                # If layer is criterion (loss).
                if self.model_type == SPEECH_TO_TEXT:
                    output = tensors["output"].transpose(0, 1).float()
                    output_sizes = tensors["output_sizes"].cpu()
                    target = tensors["target"].cpu()
                    target_sizes = tensors["target_length"].cpu()
                    input0_size = tensors["input0_size"].cpu()
                    module_outputs = [module(output, target, output_sizes, target_sizes) / input0_size[0]]
                else:
                    module_outputs = [module(tensors[input_name],
                                             tensors["target"])
                                      for input_name in input_names]
                    module_outputs = [sum(module_outputs)]
            else:
                # 中间层
                # If layer is non-criterion.
                module_outputs = module(*[tensors[input_name]
                                          for input_name in input_names])
                if not isinstance(module_outputs, tuple):
                    module_outputs = (module_outputs,)
                module_outputs = list(module_outputs)
​
            # 把计算结果放入tensors之中,这样后续就知道如何发送    
            for (output_name, module_output) in zip(output_names, module_outputs):
                tensors[output_name] = module_output
​
        self.output = tensors[input_names[0]]
        # 如果是最后阶段,则做处理
        if self.is_criterion and self.model_type == TRANSLATION:
            loss_per_batch = tensors[output_names[0]] * tensors[self.criterion_input_name].size(1)
            loss_per_token = loss_per_batch / tensors["target_length"][0].item()
            self.loss = loss_per_token
        elif self.is_criterion:
            self.loss = tensors[output_names[0]]
        else:
            self.loss = 1

3.4 Обратное распространение

Run_backward работающего двигателя завершает обратный расчет.

    def run_backward(self):
        # Receive input gradients needed for backward pass.
        self.receive_tensors_backward() # 从反向计算图上一层接受梯度
        
        # Backward pass through modules in reverse order.
        inputs = {}
        outputs = {}
        input_gradients = {}
        output_gradients = {}
​
        # Get input and output names spanning all modules in this stage.
        all_input_names_set = set()
        all_output_names_set = set()
​
        # 得到module和对应的输入,输出
        modules = self.modules_with_dependencies.modules()
        all_input_names = self.modules_with_dependencies.all_input_names()
        all_output_names = self.modules_with_dependencies.all_output_names()
​
        for (input_names, output_names) in zip(all_input_names, all_output_names):
            for input_name in input_names:
                all_input_names_set.add(input_name)
            for output_name in output_names:
                all_output_names_set.add(output_name)
​
        tensors = self.tensors.pop(0)
        # Set inputs, outputs, and output_gradients.
        # Only set outputs/output_gradients for tensors that are not inputs of
        # other modules in this stage.
        # Similarly, only set inputs for tensors that are not outputs of other
        # modules in this stage.
        for (module, input_names, output_names) in \
            zip(reversed(modules), reversed(all_input_names), reversed(all_output_names)):
            for output_name in output_names:
                if output_name not in all_input_names_set:
                    if output_name not in self.gradients:
                        output_gradients[output_name] = None
                    else: 
                        # 计算梯度记录在这里
                        output_gradients[output_name] = self.gradients[output_name]
                    if tensors[output_name].requires_grad:
                        outputs[output_name] = tensors[output_name]
            for input_name in input_names:
                if input_name not in all_output_names_set:
                    inputs[input_name] = tensors[input_name]
​
        # Hook to record input gradients.
        def hook_wrapper(input_name):
            def hook(input_gradient):
                input_gradients[input_name] = input_gradient
            return hook
​
        for input_name in inputs:
            if input_name != "input0" and input_name != "input1" and input_name != "input2" \
                    and inputs[input_name].requires_grad:
                inputs[input_name].register_hook(hook_wrapper(input_name))
​
        if "loss" in outputs:
            outputs["loss"] *= self.loss_scale
​
        # Perform backward pass.
        # 进行反向传播,output_gradients 
        # outputs 就是要计算梯度的张量,output_gradients就是计算出来的梯度
        torch.autograd.backward(tuple([outputs[output_name] for output_name in outputs]),
                                grad_tensors=tuple([output_gradients[output_name]
                                                    for output_name in outputs]))
​
        # Input tensors don't need gradients.
        for input_name in inputs:
            if not inputs[input_name].requires_grad:
                self.gradients[input_name] = inputs[input_name]
                continue
​
            if input_name != "input0" and input_name != "input1" and input_name != "input2" and input_name != "input":
                self.gradients[input_name] = input_gradients[input_name]
​
        # Send output gradients.
        self.send_tensors_backward() # 发送梯度(self.gradients)给反向图的下一层
        
        if self.verbose_freq > 0 and self.backward_minibatch_id % self.verbose_freq == 0:
            self.backward_stats.print_stats()
        self.backward_stats.reset_stats()
        self.backward_minibatch_id += 1

Давайте углубим наше впечатление с помощью предыдущей картинки.

Логика отправки:

 StageRuntime            CommunicationHandler              send_helper_thread
​
      +                           +                                 +
      |                           |                                 |
      | 1                         |                                 |
      v                           |                                 |
 run_backward                     |                                 |
      |                           |                                 |
      | 2                         |                                 |
      |                           |                    wait on backward_send_queues
      v                  3        v                                 |
send_tensors_backward +--------> send                               |
                                  |                                 |
                                  |                                 |
                                  |  4                              |
                                  v               5                 v
               backward_send_queues.add(tensor) +----> tensor = queue.remove()
                                                notify              |
                                                                    |
                                                                    | 6
                                                                    v
                                                                  _send
                                                                    |
                                                                    | 7
                                                                    |
                                                                    v
                                                                 dist.send

Принять логику:

    StageRuntime             CommunicationHandler           recv_helper_thread
          +                            +                            +
          |                            |                            |
          | 1                          |                            |
          |                            |                            | 4
          v                            |                            v
    run_backward                       |                         _recv
          |                            |                            |
          |                            |                            |
          |                            |                            | 5
          |                            |                            |
          | 2                          |                            v
          |                            |                  dist.recv / dist.broadcast
          |                            |                            |
          v                  3         v                            |
receive_tensors_backward +--------->  recv                          |
          +                            |                            |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |                            v                            |
          |                 backward_receive_queues.remove()        |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |               wait on backward_receive_queues           |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |                            |                 6          v
          |                  backward_receive_queues <-------+ queue.add(tensor)
          |                            |               notify
          |                            |  7
          v                  3 return  |
gradients[output_name] <---------------+
​

3.5 Weight Stashing

Сташирование веса реализуется с помощью OptimizerWithWeightStashing.

Ниже опущено много второстепенного кода, а функции load_old_params и load_new_params вызываются во время обучения.

class OptimizerWithWeightStashing(torch.optim.Optimizer):
    """Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer.
​
    Arguments:
        - optim_name: the name of optimizer, required to create the corresponding
                      base_optimizer (torch.optim.{optim_name}).
        - optimizer_args: the keyword arguments passed to base_optimizer.
    """
​
    def __init__(self, optim_name, modules, master_parameters, model_parameters,
                 loss_scale, num_versions, verbose_freq=0, macrobatch=False,
                 **optimizer_args):
        self.modules = modules
        self.master_parameters = master_parameters
        self.model_parameters = model_parameters  # model_parameters is None if not fp16.
        self.loss_scale = loss_scale
​
        # Only need at most 2 versions if using macrobatching.
        if macrobatch:
            num_versions = min(2, num_versions) 
        self.num_versions = num_versions
        self.base_optimizer = getattr(torch.optim, optim_name)(
            master_parameters, **optimizer_args)
        self.latest_version = Version()
        self.current_version = Version()
        self.initialize_queue()
        self.verbose_freq = verbose_freq
        self.batch_counter = 0
​
        # If macrobatching, push and pop versions at the right rate.
        if macrobatch:
            self.update_interval = self.num_versions
        else:
            self.update_interval = 1
​
    def initialize_queue(self):
        self.queue = deque(maxlen=self.num_versions)
        for i in range(self.num_versions):
            self.queue.append(self.get_params(clone=True))
        self.buffered_state_dicts = self.queue[0][0] # stash weght变量
​
    def load_old_params(self):
        if self.num_versions > 1:
            self.set_params(*self.queue[0]) #找到最初的旧weight
​
    def load_new_params(self):
        if self.num_versions > 1:
            self.set_params(*self.queue[-1]) # 加载最新的weight
​
    def zero_grad(self): # 用来reset
        if self.batch_counter % self.update_interval == 0:
            self.base_optimizer.zero_grad()
​
    def step(self, closure=None):
        """Performs a single optimization step.
​
        Arguments:
            closure (callable, optional): A closure that reevaluates the model
                                          and returns the loss.
        """
        # 每 update_interval个 steps更新一次梯度
        if self.batch_counter % self.update_interval != self.update_interval - 1:
            self.batch_counter += 1
            return None
        
        # 省略代码
        
        self.latest_version = self.latest_version.incr() # 因为多训练了一步,所以增加版本号
        if self.num_versions > 1:
            self.buffered_state_dicts = self.queue[0][0] 
            self.queue.append(self.get_params(clone=False)) # 把新的变量存进去
​
        self.batch_counter += 1
        return loss

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

lingvo framework день чтение заметок

Tensorflow понимает, что градиенты нескольких мини-пакетных вычислений сначала накапливаются, а затем распространяются обратно.

Накопление градиента с помощью tensorflow2

В десять раз время расчета модели увеличилось всего на 20%: плагин для замены градиента с открытым исходным кодом OpenAI

PipeDream: Fast and Efficient Pipeline Parallel DNN Training

Paper Interpretation Series 5: Microsoft Stanford и другие PipeDream быстро обучают крупномасштабные нейронные сети

На данный момент 231 you.GitHub.IO/neural-net…

Блог Woohoo.cn на.com/geek found/afraid/14…

Технология оптимизации видеопамяти во время обучения - слияние ОП и контрольная точка градиента

Pytorch Notes 04 - Пользовательский torch.autograd.Function

Учебное пособие по Autograd для PyTorch

Пользовательское расширение pytorch (3) - простое определение и случай torch.autograd.Function

Пользовательское расширение pytorch (2) - torch.autograd.Function завершает пользовательский слой

torch.autograd интерпретации исходного кода PyTorch: подробное объяснение расчета градиента

Обратное распространение

CS231n Перевод примечаний к курсу: примечания по обратному распространению

Распределенное обучение Pytorch

torch.distributed

Почему модель GPT-3 трудно воспроизвести? Это может быть оптимальным дизайном распределенной инфраструктуры ИИ.

Су Цидун - несбыточная мечта