0x00 сводка
PyTorch Zero Redundancy Optimizer — это класс алгоритмов, предназначенных для поиска компромисса между параллельным обучением данных и параллельным обучением моделей. Идея Zero Redundacy Optimizer исходит от ZeRO от Microsoft, а конкретная реализация основана на OSS от Fairscale.
Fairscale реализует трехэтапный алгоритм ZeRO. Fairscale — это проект с открытым исходным кодом Facebook AI Research (FAIR). Я лично понимаю его как испытательное поле для крупномасштабного распределенного обучения глубокого обучения Facebook. Если один из модулей созреет, он будет объединены в PyTorch.
OSS — это ZeRO-1, реализованный Fairscale, который реализует сегментирование состояния оптимизатора (см. красную рамку на рисунке ниже). PyTorch реализует ZeroRedundancyOptimizer на основе OSS FairScale.
Примечание. Эта статья основана на PyTorch 1.9.0.
0x01 История
1.1 Описание на гитхабе
ZeroRedundancyOptimizer находится вGitHub.com/py torch/Пак Ючон…Введенный, давайте взглянем на его описание.
ZeroRedundancyOptimizer: an implementation of a standalone sharded optimizer wrapper #46750
Implement the first stage of ZeRO, sharding of the optimizer state, as described in this blog post and this paper. This implementation is completely independent from the DeepSpeed framework, and aims at providing ZeRO-compliant building blocks within the PyTorch scheme of things.
This works by:
- acting as a wrapper to a pytorch optimizer. ZeROptimizer does not optimize anything by itself, it only shards optimizers for distributed jobs
- each rank distributes parameters according to a given partitioning scheme (could be updated), and owns the update of a given shard only
- the .step() is called on each rank as expected, the fact that the optimizer actually works on a shard of the model is not visible from the outside
- when the update is completed, each rank broadcasts the updated model shard to all the other ranks
This can be used with DDP, although some communications are wasted in that case (gradients are all-reduced to all ranks). This implementation was initially developed in Fairscale, and can also be used with an optimized DDP which only reduces to the relevant ranks. More context on ZeRO and PyTorch can be found in this RFC
The API with respect to loading and saving the state is a known pain point and should probably be discussed an updated. Other possible follow ups include integrating more closely to a modularized DDP, making the checkpoints partition-agnostic, exposing a gradient clipping option and making sure that mixed precision states are properly handled.
original authors include @msbaines, @min-xu-ai and myself(blefaudeux )
1.2 Анализ
Таким образом, мы можем знать следующую информацию:
- Идея Zero Redundacy Optimizer исходит от Microsoft ZeRO.
- Fairscale реализует трехэтапный алгоритм ZeRO. Fairscale — это проект с открытым исходным кодом Facebook AI Research (FAIR). Я лично понимаю его как тестовое поле для крупномасштабного распределенного обучения глубокого обучения Facebook. Если модуль развивается зрело, он будет объединен в PyTorch.
- OSS — это ZeRO-1, реализованный Fairscale, который реализует сегментирование состояния оптимизатора.
- PyTorch реализует ZeroRedundancyOptimizer на основе OSS FairScale.
Нам нужно посмотреть поближе.
0x02 Базовые знания
2.1 ZeRO
ZeRO (Zero Redundacy Optimizer) является частью Microsoft DeepSpeed с открытым исходным кодом, платформы для оптимизации крупномасштабного обучения. ZeRO — это метод оптимизации памяти для моделей глубокого обучения, который ищет промежуточную точку между параллелизмом моделей и параллелизмом данных, чтобы максимизировать масштабируемость модели.
Оптимизация ZeRO включает несколько аспектов использования памяти модели глубокого обучения, включая память активации, фрагментированную память и память состояния модели.
- Память состояния модели. Состояние модели глубокого обучения можно разделить на три основных процесса: состояние оптимизатора, градиент и параметры.
- Память активации: после оптимизации памяти состояния модели было обнаружено, что функции активации также могут вызывать узкие места. Вычисление функции активации выполняется при прямом проходе для поддержки обратного прохода.
- Фрагментированная память. Неэффективность моделей глубокого обучения иногда вызвана фрагментацией памяти. В модели жизненный цикл каждого тензора разный, что вызовет некоторую фрагментацию памяти из-за изменения срока жизни разных тензоров. Из-за существования этих фрагментов, даже если имеется достаточно свободной памяти, выделение памяти не удастся из-за отсутствия непрерывной памяти. ZeRO активно управляет памятью в соответствии с разным временем жизни тензоров, чтобы предотвратить фрагментацию памяти.
Например, оптимизацию можно увидеть на следующем рисунке:
Источник изображенияWoohoo.Microsoft.com/En-US/Горячий цвет…
2.2 Реализация ZeroRO от Fairscale
Далее давайте рассмотрим рекомендации по использованию Fairscale.
На самом деле это сочетание распределенных/крупномасштабных решений для машинного обучения, из чего видно, что его основаZeRO <https://arxiv.org/pdf/1910.02054.pdf>
Реализованы три разных алгоритма, соответствующие трем этапам ZeRO:
- Разделение состояний оптимизатора (OSS) реализует разделение оптимизатора, оптимизируя использование памяти для разделенных состояний оптимизатора.
- Sharded Data Parallel (SDP) отвечает за Optimizer + Gradient State Sharding.
- Fully Sharded Data Parallel (FSDP) реализует Optimizer + Gradient + Horizontal Model Sharding.
2.3 Optimizer State Sharding (OSS)
Поскольку OSS является источником ZeroRedundancyOptimizer, давайте сначала рассмотрим его идеи. OSS реализует оптимизации, связанные с памятью оптимизатора. Оптимизатору вроде Адама обычно нужно поддерживать импульс, дисперсию. Несмотря на то, что можно тренироваться с параметрами и градиентами точности FP16, параметры и градиенты необходимо сохранить с точностью FP32. Когда полная модель обновляется для каждого ранга, это означает, что значительная часть памяти занята избыточными представлениями состояния оптимизатора. Чтобы преодолеть эту избыточность, сегментация состояния оптимизатора должна разделить этапы оптимизации модели между различными рангами, чтобы каждый ранг отвечал только за обновление соответствующего сегмента модели. Это, в свою очередь, гарантирует, что состояние оптимизатора будет намного меньше на каждом ранге и что оно не будет содержать избыточной информации между рангами.
2.3.1 Процесс обучения
Процесс обучения OSS можно изменить по сравнению с процессом выполнения DDP следующим образом:
-
обернутые осколки оптимизатора определяют состояние оптимизатора жадным алгоритмическим способом на основе размера параметра (а не порядка использования). Это делается для того, чтобы каждый ранг имел почти одинаковый размер памяти оптимизатора.
-
Процесс обучения аналогичен процессу распределенного параллелизма данных (DDP) PyTorch. Передний проход выполняется первым на каждом ряду, за которым следует обратный проход. Во время обратного прохода используйте allreduce для синхронизации градиентов.
-
Каждый ранг обновляет только те параметры состояния оптимизатора, за которые он отвечает, а затем отбрасывает остальные параметры оптимизатора.
-
После обновления выполняется операция широковещательной рассылки или сбора всех данных, чтобы убедиться, что все ранги получают самые последние обновленные значения параметров.
Подробности см. на рисунке ниже.
2.3.2 Передовой опыт
Вот несколько лучших практик:
- OSS предоставляет флаг широковещания_fp16, который вам, вероятно, следует использовать в многоузловых заданиях. Это обычно не требуется в экспериментах с одним узлом.
- Если ваша модель крайне несбалансирована по размеру (например, есть огромный тензор), то этот подход мало поможет и предпочтительнее будут варианты сегментирования тензора, такие как Fairscale.nn.FullyShardedDataParallel.
- OSS остается совместимым с большинством функций DDP.
- OSS должен быть временным решением в среде DDP.
2.3.3 Описание производительности
Вот несколько заметок о производительности.
-
На одном узле OSS всегда должен быть быстрее, чем vanilla PyTorch, экономия памяти будет варьироваться в зависимости от используемого оптимизатора.
-
OSS полезен, когда вы используете оптимизатор с дополнительным состоянием (например, Adam).
-
Если вы используете SGD или любой другой оптимизатор с ограниченным объемом памяти, вы можете столкнуться с замедлением при использовании нескольких узлов из-за дополнительной связи на шаге 4 описанного выше потока. Во время процесса allreduce на шаге 2 также тратится некоторая часть памяти для хранения градиентов, которая затем отбрасывается.
-
OSS также может быть быстрее или медленнее, чем ванильный PyTorch, при использовании нескольких узлов, в зависимости от используемого оптимизатора и дополнительных флагов (таких как Broadcast_fp16, сжатие градиента, накопление градиента, упомянутое выше).
-
Если вы можете использовать больший размер пакета, лучше взять больший размер пакета и уменьшить количество задействованных рангов или использовать градиентное накопление, так как это снижает затраты на связь.
Затем мы официально вводим ZeroRedundancyOptimizer.
0x03 как использовать
Мы сначала используемpy torch.org/tutorials/th…Давайте посмотрим, как использовать ZeroRedundancyOptimizer.
3.1 Идея
Идея ZeroRedundancyOptimizer исходит отDeepSpeed/ZeRO projectиMarian, оба проекта разбивают состояние оптимизатора на распределенные процессы с параллельными данными, чтобы уменьшить объем памяти, занимаемой каждым процессом. Стратегия оптимизации ZeRO в основном заключается в оптимизации использования видеопамяти путем сегментации состояния модели.Состояние модели в основном включает в себя состояние оптимизатора, градиент и параметры модели.
ZeroRedundancyOptimizer реализует сегментацию состояний оптимизатора, которые представляют собой параметры и локальные состояния, необходимые оптимизатору для работы. Например, для SGD требуется тот же импульс, что и для параметров модели, а оптимизатор Адама сохраняетexp_avg
иexp_avg_sq
государство. Следовательно, потребление памяти оптимизатором Adam как минимум в два раза превышает размер модели. Таким образом, когда модель большая, состояние оптимизатора требует больших затрат памяти.
В параллельном вводном руководстве по распределенным данным (Getting Started With Distributed Data Parallel), мы покажем, как использовать DistributedDataParallel (DDP) для обучения модели. В ДДП:
- Каждый рабочий процесс (ранг, узел или устройство) хранит выделенную копию оптимизатора.
- Поскольку DDP уже синхронизирует градиенты с all-reduce при обратном распространении, все реплики оптимизатора будут работать с одними и теми же параметрами и значениями градиента на каждой итерации.
- Эти оптимизаторы используют градиенты all-reduce для обновления параметров модели, поэтому DDP может поддерживать каждую реплику модели (ранг) в одном и том же состоянии параметров.
Основываясь на этом наблюдении, мы можем уменьшить объем памяти оптимизатора, разделив состояние оптимизатора между процессами DDP. В частности, это:
- Разделите оптимизатор на разные рабочие процессы. Экземпляр оптимизатора на каждом рабочем процессе сохраняет только часть состояния оптимизатора, соответствующую его срезу параметров модели (1/world_size), вместо создания соответствующих состояний параметров для всех параметров.
- оптимизатор
step()
Функция отвечает только за обновление параметров в своем сегменте, и когда рабочий процесс завершает обновление параметров, он передает обновленные параметры всем другим одноранговым процессам DDP, чтобы все реплики модели оставались в том же состоянии.
3.2 Как использовать
ZeroRedundancyOptimizer можно использовать в сочетании с torch.nn.parallel.DistributedDataParallel для снижения пикового потребления памяти на один ранг. В приведенном ниже коде показано, как использоватьZeroRedundancyOptimizer, Большая часть кода выглядит какDistributed Data Parallel notesПростой пример DDP, приведенный в . Основное отличие в том, чтоexample
в функцииif else
предложение, которое оборачивает конструкцию оптимизатора и может использоваться в ZeroRedundancyOptimizer иAdam
переключаться между. Мы просто используем ZeroRedundancyOptimizer для деформирования обычного оптимизатора.
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.distributed.optim import ZeroRedundancyOptimizer
from torch.nn.parallel import DistributedDataParallel as DDP
def print_peak_memory(prefix, device):
if device == 0:
print(f"{prefix}: {torch.cuda.max_memory_allocated(device) // 1e6}MB ")
def example(rank, world_size, use_zero):
torch.manual_seed(0)
torch.cuda.manual_seed(0)
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
# create default process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
# create local model
model = nn.Sequential(*[nn.Linear(2000, 2000).to(rank) for _ in range(20)])
print_peak_memory("Max memory allocated after creating local model", rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
print_peak_memory("Max memory allocated after creating DDP", rank)
# define loss function and optimizer
loss_fn = nn.MSELoss()
if use_zero:
optimizer = ZeroRedundancyOptimizer( # 这里使用了ZeroRedundancyOptimizer
ddp_model.parameters(),
optimizer_class=torch.optim.Adam, # 包装了Adam
lr=0.01
)
else:
optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.01)
# forward pass
outputs = ddp_model(torch.randn(20, 2000).to(rank))
labels = torch.randn(20, 2000).to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
print_peak_memory("Max memory allocated before optimizer step()", rank)
optimizer.step()
print_peak_memory("Max memory allocated after optimizer step()", rank)
print(f"params sum is: {sum(model.parameters()).sum()}")
def main():
world_size = 2
print("=== Using ZeroRedundancyOptimizer ===")
mp.spawn(example,
args=(world_size, True),
nprocs=world_size,
join=True)
print("=== Not Using ZeroRedundancyOptimizer ===")
mp.spawn(example,
args=(world_size, False),
nprocs=world_size,
join=True)
if __name__=="__main__":
main()
Результат показан ниже.
с или безZeroRedundancyOptimizer
, после каждой итерации параметры модели используют одну и ту же память, поэтому вывод на печать будет одинаковым. при включенииZeroRedundancyOptimizer
инкапсулироватьAdam
, оптимизаторstep()
Пиковое потребление памяти составляетAdam
Половина потребления памяти. Это соответствует нашим ожиданиям, потому что мы ставимAdam
Состояние оптимизатора разделено на два процесса.
=== Using ZeroRedundancyOptimizer ===
Max memory allocated after creating local model: 335.0MB
Max memory allocated after creating DDP: 656.0MB
Max memory allocated before optimizer step(): 992.0MB
Max memory allocated after optimizer step(): 1361.0MB
params sum is: -3453.6123046875
params sum is: -3453.6123046875
=== Not Using ZeroRedundancyOptimizer ===
Max memory allocated after creating local model: 335.0MB
Max memory allocated after creating DDP: 656.0MB
Max memory allocated before optimizer step(): 992.0MB
Max memory allocated after optimizer step(): 1697.0MB
params sum is: -3453.6123046875
params sum is: -3453.6123046875
3.3 Резюме
После приведенного выше анализа принципов и описания использования мы знаем, что класс ZeroRedundancyOptimizer может инкапсулировать любой optim.Optimizer и может делить свое состояние между рангами в группе. Экземпляр локального оптимизатора в каждом ранге отвечает только за обновление примерно1 / world_size
параметры, поэтому необходимо только сохранить1 / world_size
Размер состояния оптимизатора.
Таким образом, в центре внимания нашего анализа ниже:
- Как разбить параметры оптимизатора?
- Как каждый ранг узнает свои соответствующие параметры?
0x04 инициализация
Мы начинаем с__init__
Посмотрите, как построить, который в основном состоит из трех шагов:
- Инициализируйте базовый класс.
- Инициализировать различные переменные-члены.
- Используйте _update_trainable для внутренней синхронизации и создания буфера, который внутренне вызывает _optim_constructor для создания внутреннего оптимизатора.
def __init__(
self,
params,
optimizer_class: Type[Optimizer], # 就是被包装的原生优化器类型
group: Optional[Any] = None,
parameters_as_bucket_view: bool = False,
**default: Any,
):
# Hold all the model params in the root .param_groups
# NOTE: the default constructor uses `add_param_group` which is partially overloaded here
# we introduce the `initialized` flag for be able to dissociate the behaviour of
# `add_param_group` in between super() and ZeroRedundancyOptimizer
self.initialized = False
super().__init__(params, default) # 初始化基类
# Partition information. lazy evaluation, computed if requested
self._per_device_params_cache: "OrderedDict[torch.device, List[List[Parameter]]]" = (
OrderedDict()
) # device, rank, params
# Build the wrapped optimizer, responsible for a shard of the params
self._param_rank_cache: Dict[torch.Tensor, int] = {} # 初始化各种成员变量
self._param_to_index_cache: Dict[int, int] = {}
self._partition_parameters_cache: List[List[Dict]] = []
self._index_to_param_cache: Dict[int, torch.Tensor] = {}
self._all_params = params
self._reference_is_trainable_mask = list(map(_is_trainable, self._all_params))
self.group = group if group is not None else dist.group.WORLD
self.world_size = dist.get_world_size(self.group)
self.rank = dist.get_rank(self.group)
# global是用来在进程之间同步
self.global_rank = _get_global_rank(self.group, self.rank)
self.parameters_as_bucket_view = parameters_as_bucket_view
self._optim_defaults = default
self._optim_constructor = optimizer_class # 如何生成原生优化器
# Optional consolidated optimizer state
self._all_states: List[Dict[str, Any]] = []
# Current default device is set by the parameters allocated to this rank
self._device = list(self._per_device_params.keys())[0]
self.buckets: Dict[torch.device, List[torch.Tensor]] = {}
self._update_trainable() # 内部同步&构建buffer,调用 _optim_constructor 来构建内部优化器
self.initialized = True
Из-за особенностей языка Python в нем нет специального места для инициализации переменных-членов, но когда переменная встречается во время работы программы, она немедленно инициализируется. Поэтому анализировать будем не в том порядке, в котором программа фактически инициализируется, а в том порядке, в котором логически инициализируются переменные-члены.
Все эти функции или переменные-члены, проанализированные ниже, находятся в__init__
Косвенно вызывается или инициализируется в методе.
4.1 Параметры разбиения
Метод partition_parameters разбивает параметры, что возвращает _partition_parameters_cache.
Обернутый оптимизатор сегментирует состояние оптимизатора в жадном алгоритме сортировки на основе размера параметра (вместо использования порядка), оборачивая некоторые параметры в каждом ранге, так что каждый параметр принадлежит рангу, а не делится между рангами. Разделы являются произвольными и могут не соответствовать порядку регистрации или использования параметров. Это делается для того, чтобы каждый ранг имел почти одинаковый размер памяти оптимизатора.
def partition_parameters(self) -> List[List[Dict]]:
r"""
Partitions parameters across distributed data parallel ranks.
Returns:
a list of ``param_groups`` (which is a list of dict) where each
element of the list contains the param_groups for a rank. Element 0
corresponds to rank 0, etc. We need all the ranks for the broadcast
inside ``step()``.
"""
if len(self._partition_parameters_cache) == 0:
self._partition_parameters_cache = [list() for _ in range(self.world_size)]
# 生成一个数组,用来记录每个rank的大小,一共有world size个rank
sizes = [0] * self.world_size
for param_group in self.param_groups: # 遍历参数组
param_lists: List[List] = [list() for _ in range(self.world_size)]
for param in param_group["params"]:
# Add this param to rank with smallest size.
rank = sizes.index(min(sizes)) # 找到最小的那个rank
param_lists[rank].append(param) # 把参数放到最小rank之中
sizes[rank] += param.numel() # 增加rank的大小
for rank, params in enumerate(param_lists): # 遍历list
param_group_rank = copy.copy(param_group)
param_group_rank["params"] = params
self._partition_parameters_cache[rank].append(param_group_rank)
return self._partition_parameters_cache
Здесь выполняется разбиение, и, наконец, возвращается список param_groups (это список dict). Каждый элемент списка содержит param_groups определенного ранга. Например, элемент 0 соответствует рангу 0, а параметры каждой ранговой группы имеют одинаковый размер.. В step() нам нужна вся информация о рангах для трансляции. На рисунке ниже показаны param_groups, соответствующие рангу 0 и рангу 5.
_partition_parameters_cache
+
|
|
v +---------------+
+-------+---------+ | param_group |
| 0 +----> | | <-------+ 100 M +------------->
+-----------------+ +---------------+
| 1 | | | +--------+---------+------+--------+
+-----------------+ | "params" +------> |param 1 | param 2 | ... | param 6|
| 2 | | | | | | | |
+-----------------+ +---------------+ +--------+---------+------+--------+
| |
| |
| ...... |
| | +---------------+
+-----------------+ | param_group | <-------+ 105 M +----------------->
| 5 +----> | |
+-----------------+ +---------------+ +--------+---------+-------+---------+
| | | | | | |
| "params" +------> | param 7| param 8 | ... | param 11|
| | | | | | |
+---------------+ +--------+---------+-------+---------+
4.2 Назначение параметров рангам
Теперь параметры были разделены на группы аналогичного размера, а затем эти группы нужно разделить на каждый ранг.
Метод _param_to_rank генерирует таблицу, в которой записывается ранг, соответствующий каждому параметру, то есть какой параметр находится в каком ранге.
@property
def _param_to_rank(self) -> Dict[torch.Tensor, int]:
r"""Look up table to match a given param with a data parallel rank"""
if len(self._param_rank_cache) == 0:
for rank, param_groups in enumerate(self.partition_parameters()):
for param_group in param_groups:
for param in param_group["params"]:
self._param_rank_cache[param] = rank
return self._param_rank_cache
Согласно приведенному выше примеру, мы знаем, что параметр 1, параметр 2, параметр 6 имеют ранг 0, параметр 8, параметр 11 имеют ранг 5... следующим образом:
_param_rank_cache
+
|
|
|
v
+----+--------------+------------+
| | |
| param 1 | 0 |
+--------------------------------+
| | |
| param 2 | 0 |
+--------------------------------+
| | |
| param 6 | 0 |
+--------------------------------+
| | |
| param 8 | 5 |
+--------------------------------+
| | |
| param 11 | 5 |
+--------------------------------+
| | |
| param n | n |
| | |
+-------------------+------------+
4.3 _per_device_params
Теперь, когда параметры назначены каждому рангу, следующим шагом будет их назначение устройству.Каждое устройство может содержать группы параметров нескольких рангов._per_device_params
Метод заключается в распределении param_groups оптимизатора среди различных устройств, что возвращает_per_device_params_cache
.
Обратите внимание, что _per_device_params включает здесь все параметры модели, хотя и сгруппированные по устройствам. То есть она одинакова среди всех оптимизаторов ZeRO. Таким образом, эти параметры могут передаваться и синхронизироваться между оптимизаторами ZeRO.
@property
def _per_device_params(self) -> Dict[torch.device, List[List[Parameter]]]:
r"""
Sorted list of all the params, first per device then per rank.
Within a list params are sorted per number of elements to allow for an easy bucketing.
"""
if len(self._per_device_params_cache) == 0:
# Go through all params, log them per device
# The ordering is important here, needs to be the same on all ranks
# So that ulterior broadcast calls are matching
for param_group in self.param_groups: # 遍历参数
for param in param_group["params"]:
device = param.device # 找到其设备
if self._per_device_params_cache.get(device) is None:
self._per_device_params_cache[device] = [[] for _ in range(self.world_size)]
# 每个设备内部还需要按照rank来分开
self._per_device_params_cache[device][self._param_to_rank[param]] += [param]
# Sort param_lists by size
for k in self._per_device_params_cache.keys():
for r in self._per_device_params_cache[k]:
r.sort(key=lambda x: x.numel())
return self._per_device_params_cache
Например, следующие ЦП, ГП 1 (игнорируется) и ГП 2 имеют собственные списки параметров, и каждый список упорядочен в соответствии с размером параметра.
_per_device_params_cache
+
| +--------+--------+-------+--------+
| | | | | |
| +---------+ | param1 | param3 |param5 | param6 |
v | | | | | | |
+----+--------------+ | rank 0 +----> | 1k | 2k | 3k | 7k |
| | | | +--------+--------+-------+--------+
| "CPU" +----> +---------+
| | | |
+-------------------+ | rank 1 | +--------+--------+-------+--------+
| | | +----> | | | | |
| "GPU 1" | +---------+ | param9 | param2 | param4| param8 |
| | | | | | |
+-------------------+ | 0.5k | 1k | 4k | 8k |
| | +--------+--------+-------+--------+
| "GPU 2" | +---------+
| +----> | | +---------+------------+-----------+
+-------------------+ | | | | | |
| rank 5 +----> | param 11| param 13 | param 15 |
| | | | | |
+---------+ +---------+------------+-----------+
| |
| rank 6 | +---------+------------+-----------+
| +----> | | | |
| | | param 19| param 12 | param 14 |
+---------+ | | | |
+---------+------------+-----------+
4.4 _update_trainable
Поскольку некоторые параметры изменяются, локальный оптимизатор и ZeroRedundancyOptimizer необходимо синхронизировать друг с другом.
- Сначала получите self._default_device как «CPU» или «GPU #».
- Затем вызовите _optim_constructor для создания внутреннего оптимизатора. Обратите внимание, что это должно сообщить локальному оптимизатору, что вы несете ответственность за оптимизацию этих параметров независимо от других осколков. Упомянутый ранее метод partition_parameters разделит параметры и вернет _partition_parameters_cache.
# 只是选取自己rank对应的参数进行优化
self.optim = self._optim_constructor(self.partition_parameters()[self.rank], **self._optim_defaults)
# 运行时变量如下:
#_optim_constructor = {type} <class 'torch.optim.adam.Adam'>
#_optim_defaults = {dict: 1} {'lr': 0.01}
-
Затем вызовите _sync_param_groups для синхронизации параметров.
-
Наконец, создайте плоский буфер.
Конкретный код выглядит следующим образом:
def _update_trainable(self) -> None:
r"""
Updates the partitioning and communication patterns if the trainability
(``requires_grad``) of some parameters changed.
"""
# Create the optim which will work on the param shard
if not hasattr(self, "optim"):
self._clear_cache()
# 获得缺省设备
self._default_device = list(self._per_device_params.keys())[0]
# 构建本地优化器,只是选取本rank对应的参数
self.optim = self._optim_constructor(self.partition_parameters()[self.rank], **self._optim_defaults)
# 调用 _sync_param_groups 同步参数,self.optim 是被包装的优化器
self._sync_param_groups(self.optim.param_groups, self.param_groups)
if self.parameters_as_bucket_view:
self._setup_flat_buffers() # 建立 flat buffer
Возьмем, к примеру, ранг 5.Его локальный оптимизатор просто указывает на часть оптимизируемых параметров, соответствующую _partition_parameters_cache[5], и локальный оптимизатор может оптимизировать только эти параметры.
Это обеспечивает разделение параметров оптимизатора. Такие параметры, как _partition_parameters_cache[5], можно позже разместить на графическом процессоре, чтобы каждый графический процессор включал только часть разделов оптимизатора.
Следует отметить, что параметры модели и градиенты не изменились, но локальный ZeroRedundancyOptimizer указывает на некоторые параметры, которые необходимо оптимизировать, поэтому состояние оптимизатора ZeroRedundancyOptimizer также соответственно снижается.
Что касается рисунка ниже, исходному оптимизатору нужно оптимизировать все параметры, может быть, 100 М + 105 М + .... Теперь ZeroRedundancyOptimizer нужно оптимизировать только 105 М.
_partition_parameters_cache
+
|
|
v +---------------+
+-------+---------+ | param_group |
| 0 +----> | | <-------+ 100 M +------------->
+-----------------+ +---------------+
| 1 | | | +--------+---------+------+--------+
+-----------------+ | "params" +------> |param 1 | param 2 | ... | param 6|
| 2 | | | | | | | |
+-----------------+ +---------------+ +--------+---------+------+--------+
| |
| |
| ...... |
| | +---------------+
+-----------------+ | param_group | <-------+ 105 M +----------------->
| 5 +----> | |
+-----------------+ +---------------+ +--------+---------+-------+---------+
| | | | | | |
+--> | "params" +------> | param 7| param 8 | ... | param 11|
| | | | | | | |
| +---------------+ +--------+---------+-------+---------+
|
|
|
+-----------------------+
| Local Optimizer | |
| | |
| | |
| + |
| |
| |
| |
| |
+-----------------------+
Нам нужно еще немного его доработать и посмотреть на две функции _sync_param_groups и _setup_flat_buffers.
4.4.1 Группа параметров синхронизации
_sync_param_groups используется для синхронизации групп параметров внутреннего оптимизатора с группами параметров нулевого оптимизатора.
@staticmethod
def _sync_param_groups(source: List[Dict[Any, Any]], destination: List[Dict[Any, Any]]) -> None:
r"""Sync learning rate and other optimizer attributes (needed to support schedulers)."""
for source_group, destination_group in zip(source, destination):
# Sync everything but the parameters
for k in filter(lambda x: x != "params", source_group.keys()):
destination_group[k] = source_group[k]
4.4.2 Создание единого буфера
Если задан параметр parameters_as_bucket_view, вызовите _setup_flat_buffers, чтобы создать несколько буферов. Тензор того же ранга на том же устройстве считается буфером. заключается в обработке _per_device_params.
def _setup_flat_buffers(self) -> None:
r"""
Make all params which are on the same device and tied to the same rank
views of a single buffer. This is used at construction time, and anytime
parameter trainability is changed (frozen or unfrozen) and
``_update_trainable`` is called.
"""
for device, per_rank_params in self._per_device_params.items():
# Only wipe the existing buckets if there are none
# (could be that this is called twice, when trainability changes)
if device not in self.buckets.keys():
self.buckets[device] = []
# Make parameters a view of the bucket
for dst_rank, params in enumerate(per_rank_params):
if len(params) > 0:
# Clone the non-trainable params, if in a bucket it will get destroyed
for param in filter(lambda x: not x.requires_grad, params):
param.data = param.data.detach().clone()
# Merge all the trainable params in a single bucket
trainable_params = list(filter(_is_trainable, params))
buffer_size = sum(map(lambda x: x.numel(), trainable_params))
bucket = torch.empty(buffer_size, dtype=params[0].dtype, device=device)
offset = 0
for param in trainable_params:
offset_next = offset + param.numel()
bucket[offset:offset_next].copy_(param.data.flatten())
param.data = bucket[offset:offset_next].view_as(param.data)
offset = offset_next
# Either replace the existing bucket, or create it
if len(self.buckets[device]) == dst_rank:
self.buckets[device].append(bucket)
else:
self.buckets[device][dst_rank] = bucket
else:
self.buckets[device].append(torch.zeros(1, device=device))
Подробнее см. следующую легенду Тензоры одного ранга на одном устройстве считаются буфером.
buckets
+
|
| +---------------------------------------+
v | Tensor |
+----+-------+ | +-----------------------------------+ |
| | | | | |
| "CPU" +-----> | | Param 1, param 2, Param 3...... | |
| | | +-----------------------------------+ |
+------------+ +---------------------------------------+
| |
| "GPU 1" +-----> +---------------------------------------+
| | | Tensor |
+------------+ | +-----------------------------------+ |
| | | | | |
| | | | Param 6, Param 7, Param 8...... | |
| | | +-----------------------------------+ |
| | +---------------------------------------+
| |
+------------+
0x05 Обновить параметры
Посмотрим, как оптимизатор обновит параметры дальше, логика такая:
- Если граф вычислений изменяется, его необходимо переработать.
- Вызовите _sync_param_groups, чтобы синхронизировать локальные параметры оптимизатора с оптимизатором ZeRO, предотвращая их изменение планировщиком.
- Вызовите self.optim.step, чтобы обновить локальный оптимизатор поверх локальных параметров.
- Вызовите dist.broadcast для синхронизации параметров между рангами.
- Снова вызовите _sync_param_groups, чтобы синхронизировать локальные параметры оптимизатора с оптимизатором ZeRO, поскольку они были обновлены.
def step(self, closure: Optional[Callable[[], float]] = None, **kwargs: Any) -> Optional[float]:
r"""
Performs a single optimization step (parameter update).
Arguments:
closure (callable): A closure that reevaluates the model and
returns the loss. Optional for most optimizers.
Returns:
optional loss, depends on the underlying optimizer
.. note: Any extra parameter is passed to the base optimizer as-is
"""
# Check whether the model trainability graph changed
# 如果计算图有变化,则需要重新处理
trainable_mask = list(map(_is_trainable, self._all_params))
if trainable_mask != self._reference_is_trainable_mask:
self._update_trainable()
self._reference_is_trainable_mask = trainable_mask
# Sync oss param_groups attributes in case they've been updated by a scheduler.
self._sync_param_groups(self.param_groups, self.optim.param_groups)
# Run the optimizer step on this shard only:
# 更新本地参数
if closure is not None:
loss = self.optim.step(closure=closure, **kwargs) # type: ignore[call-arg]
else:
loss = self.optim.step(**kwargs)
# Sync all the updated shards in between the ranks
handles = []
if self.parameters_as_bucket_view:
for device in self.buckets.keys():
for src_rank, bucket in enumerate(self.buckets[device]):
global_src_rank = _get_global_rank(self.group, src_rank)
handles.append(dist.broadcast(tensor=bucket, src=global_src_rank, group=self.group, async_op=True))
else:
for device, per_rank_params in self._per_device_params.items(): # 遍历设备+其参数
for dst_rank, params in enumerate(per_rank_params): # 遍历rank
global_dst_rank = _get_global_rank(self.group, dst_rank)
for param in params: # 对于每一个参数,都进行broadcast
handles.append(
dist.broadcast(tensor=param.data, src=global_dst_rank, group=self.group, async_op=True)
)
_ = list(map(lambda x: x.wait(), handles))
# Sync hypothethical new results from the wrapped optimizer to the exposed param_groups
self._sync_param_groups(self.optim.param_groups, self.param_groups)
return loss
5.1 Обновление
Первый заключается в локальном обновлении параметров модели.
# 更新本地参数
if closure is not None:
loss = self.optim.step(closure=closure, **kwargs) # type: ignore[call-arg]
else:
loss = self.optim.step(**kwargs)
Предположим, что в модели всего 8 параметров, которые разбиты на верхний и нижний узлы, и в каждом узле есть оптимизатор. Для лучшего понимания в верхнем и нижнем оптимизаторах параметры и номера рангов размещены вверху.
Подчеркнем еще раз: параметры модели и градиенты не изменились, но локальный ZeroRedundancyOptimizer указывает на некоторые параметры, которые необходимо оптимизировать, поэтому состояние оптимизатора ZeroRedundancyOptimizer также соответственно уменьшено.
Поэтому в верхнем и нижнем оптимизаторах модель (параметры, которые нужно оптимизировать) одного размера, но:
-
В ZeroRedundancyOptimizer 0 оптимизирован ранг 0, а локально оптимизированы параметры от 0 до 3. Для двух узлов эти параметры глобально актуальны.
-
В ZeroRedundancyOptimizer 1 оптимизирован ранг 1, а локально оптимизированы параметры с 4 по 7. Для двух узлов эти параметры глобально актуальны.
+--------------------------------------------------------------------------------+
| ZeroRedundancyOptimizer 0 |
| |
| _per_device_params_cache |
| + |
| | |
| v +--------+ +--------+--------+-------+--------+ |
| +---+-----+ | rank 1 | | | | | | |
| | | | +---------> | param4 | param5 | param6| param7 | |
| | "GPU"1" +--> +--------+ | | | | | |
| | | | | +--------+--------+-------+--------+ |
| +---------+ | rank 0 | |
| | | +--------+--------+-------+--------+ |
| | +---------> | | | | | |
| +--------+ | param0 | param1 |param2 | param3 | NEW |
| +----> | | | | | |
| +----------------+ | +--------+--------+-------+--------+ |
| |Local Optimizer | | |
| | +----------+ |
| | | |
| +----------------+ |
| | Node 0
+--------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------+
| | Node 1
| |
| _per_device_params_cache |
| + |
| | +--------+--------+-------+--------+ |
| v +--------+ +---> | | | | | |
| +---+-----+ | rank 1 | | | param4 | param5 | param6| param7 | NEW |
| | | | +---------> | | | | | |
| | "GPU"1" +--> +--------+ | +--------+--------+-------+--------+ |
| | | | | | |
| +---------+ | rank 0 | | +--------+--------+-------+--------+ |
| | +---------> | | | | | |
| | | | | param0 | param1 |param2 | param3 | |
| +--------+ | | | | | | |
| | +--------+--------+-------+--------+ |
| +----------------+ | |
| |Local Optimizer | | |
| | +------------+ |
| | | |
| +----------------+ ZeroRedundancyOptimizer 1 |
| |
+--------------------------------------------------------------------------------+
5.2 Трансляция
Прежде всего, следует отметить, что _per_device_params включает здесь все параметры модели, хотя они классифицированы по устройствам.
Текущее состояние — параметры оптимизатора этого ранга (данного раздела) обновлены, то есть обновлена часть модели. Чтобы поддерживать модели в актуальном состоянии, они должны транслироваться друг другу.
После локального обновления параметров каждый ранг будет передавать свои параметры всем другим одноранговым узлам, чтобы поддерживать все реплики модели в одном и том же состоянии.
+--------------------------------------------------------------------------------+
| ZeroRedundancyOptimizer 0 |
| |
| _per_device_params_cache |
| + |
| | |
| v +--------+ +--------+--------+-------+--------+ |
| +---+-----+ | rank 1 | | | | | | |
| | | | +---------> | param4 | param5 | param6| param7 | |
| | "GPU"1" +--> +--------+ | | | | | |
| | | | | +--------+--------+-------+--------+ |
| +---------+ | rank 0 | |
| | | +--------+--------+-------+--------+ |
| | +---------> | | | | | |
| +--------+ | param0 | param1 |param2 | param3 | NEW |
| +----> | | | | | |
| +----------------+ | +---+----+---+----+-+-----+--+-----+ |
| |Local Optimizer | | | | | | |
| | +----------+ | | | | |
| | | | ^ | ^ | ^ | ^ |
| +----------------+ | | | | | | | | |
| | | | | | | | | | Node 0
+--------------------------------------------------------------------------------+
| | | | | | | |
| | | | | | | |
| | | | | | | |
+--------------------------------------------------------------------------------+
| | | | | | | | | | Node 1
| v | v | v | v | |
| _per_device_params_cache | | | | |
| + | | | | |
| | +------+-+------+-+----+--+------+-+ |
| v +--------+ +---> | | | | | |
| +---+-----+ | rank 1 | | | param4 | param5 | param6| param7 | NEW |
| | | | +---------> | | | | | |
| | "GPU"1" +--> +--------+ | +--------+--------+-------+--------+ |
| | | | | | |
| +---------+ | rank 0 | | +--------+--------+-------+--------+ |
| | +---------> | | | | | |
| | | | | param0 | param1 |param2 | param3 | |
| +--------+ | | | | | | |
| | +--------+--------+-------+--------+ |
| +----------------+ | |
| |Local Optimizer | | |
| | +------------+ |
| | | |
| +----------------+ ZeroRedundancyOptimizer 1 |
| |
+--------------------------------------------------------------------------------+
5.3 Синхронизация локальных параметров
Наконец, необходимо снова вызвать _sync_param_groups, чтобы синхронизировать локальные параметры оптимизатора с оптимизатором ZeRO, так как он был обновлен.
# Sync hypothethical new results from the wrapped optimizer to the exposed param_groups
self._sync_param_groups(self.optim.param_groups, self.param_groups)
Рассмотрим конкретные функции.
@staticmethod
def _sync_param_groups(source: List[Dict[Any, Any]], destination: List[Dict[Any, Any]]) -> None:
r"""Sync learning rate and other optimizer attributes (needed to support schedulers)."""
for source_group, destination_group in zip(source, destination):
# Sync everything but the parameters
for k in filter(lambda x: x != "params", source_group.keys()):
destination_group[k] = source_group[k]
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
Расскажите о ZeroRedundancyOptimizer и присоединяйтесь к torch1.10