0x00 сводка
В предыдущей статье мы представили общую архитектуру PipeDream, этап профиля, этап расчета раздела и этап преобразования модели, а в этой статье мы представляем механизм выполнения во время выполнения, который представляет собой унифицированный уровень инфраструктуры.
Ссылки на другие статьи о конвейерном параллелизме:
[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения
0x01 Предисловие
1.1 Предыдущий обзор
В предыдущей статье мы прошли три этапа: профиль, расчетный раздел, преобразование модели, и теперь мы получили несколько файлов Python и файлов конфигурации. После того, как PipeDream загрузит эти файлы, можно продолжить обучение.
Итак, начиная с этой статьи, мы представляем различные вспомогательные системы, необходимые для обучения, такие как механизм выполнения во время выполнения. В основном, чтобы увидеть, какие функции должна включать среда выполнения обучения глубокому обучению.
1.2 Система выполнения
В сочетании с предыдущим анализом давайте сначала подумаем, зачем нам нужно реализовать среду выполнения, и какие функции нужно реализовать для глубокого обучения (конвейерного параллелизма).
1.2.1 Особенности PyTorch
Сначала посмотрим на характеристики PyTorch:
- Сам PyTorch реализует функцию автоградации, так что обратное распространение обеспечивает автоматическую дифференциацию.
- В параллельной реализации распределенных данных PyTorch реализует функцию DDP.
- Что касается параллелизма распределенной модели, PyTorch также предоставляет функции RPC в качестве вспомогательной основы. Однако функциональность RPC была представлена в PyTorch версии 1.5 12 июня 2020 г.
- Для DDP и RPC PyTorch также соответственно реализует функцию Distributed.autograd, которая ограждает пользователей от большого количества распределенных деталей и делает пользователей максимально равнодушными к распределенному обучению (позже у нас будет специальная серия для анализа распространения PyTorch) .
1.2.2 PyTorch RPC
Функция RPC была официально представлена в PyTorch версии 1.5 12 июня 2020 г. следующим образом.
Distributed RPC framework APIs [Now Stable]
The
torch.distributed.rpc
package aims at supporting a wide range of distributed training paradigms that do not fit intoDistributedDataParallel
. Examples include parameter server training, distributed model parallelism, and distributed pipeline parallelism. Features in thetorch.distributed.rpc
package can be categorized into four main sets of APIs.
- The RPC API allows running a function on a specified destination worker with given arguments and fetches the return value or creates a distributed reference to the return value.
- The RRef (Remote REFerence) serves as a reference to an object on another worker. A worker holding an RRef can explicitly request copies of the object, and it can also share the light-weight RRef with other workers without worrying about reference counting. This is especially useful when multiple workers need to repeatedly access different versions of the same remote object.
- With Distributed Autograd, applications can automatically compute gradients even if a model is split on multiple workers using RPC. This is achieved by stitching together local autograd graphs at RPC boundaries in the forward pass and reaching out to participants to transparently launch local autograd in the backward pass.
- The Distributed Optimizer uses gradients computed by Distributed Autograd to update model parameters. Its constructor takes a local optimizer (e.g.,
SGD
,Adagrad
, etc.) and a list of parameter RRefs, and itsstep()
function automatically uses the local optimizer to update parameters on all distinct RRef owner workers.
Однако документ PipeDream был выпущен в 2019 году, а это означает, что PipeDream не может точно использовать PyTorch RPC, а может реализовать только саму коммуникационную логику, то есть поддержку графа вычислений.
1.2.3 Особенности PipeDream
Далее рассмотрим особенности PipeDream:
- PipeDream сочетает в себе параллелизм моделей и параллелизм данных для реализации конвейерного параллелизма.
- PipeDream фактически разделяет полную модель глубокого обучения, и каждая подмодель (подграф) размещается на разных узлах.
1.2.4 Объединение
Сочетание двух вышеуказанных пунктов означает, что для PipeDream чистый DDP, модельный параллелизм и функции автоградации не могут удовлетворить наши потребности и должны использоваться в комбинации.
PipeDream необходимо реализовать как минимум:
- Как взаимодействовать между несколькими этапами (узлами), которые могут использовать функцию PyTorch RPC, но поскольку в начале нет стабильной версии, вы можете реализовать только граф распределенных вычислений самостоятельно, поэтому используется функция P2P распределенного PyTorch.
- Из-за необходимости связи вы сами управляете рангами отправки и получения каждого этапа (узла), то есть настраиваете и управляете производителями и потребителями каждого этапа (узла). Это также означает, что необходимо найти вход и выход каждого этапа (узла).
- Поскольку требуется функция связи p2p, необходимо настроить уникальный идентификатор (соответствующий тегу ниже) для каждого тензора.
- Как сделать параллелизм данных на одном этапе (несколько узлов), который должен использовать функцию PyTorch DDP.
- Из-за использования параллелизма данных вам необходимо самостоятельно управлять количеством параллелизмов на каждом этапе.
- Поскольку вам необходимо сочетать параллелизм моделей и параллелизм данных, вам необходимо самостоятельно управлять рабочими группами процессов.
- Поскольку он работает на разных узлах (компьютерах), когда каждый компьютер запускает сценарий обучения независимо, ему необходимо настроить собственное задание обучения независимо.
Итак, давайте объединим эти функциональные точки и проведем конкретный анализ.
0x02 использовать
2.1 Как звонить
Из примера в исходном коде мы видим, что сценарий main_with_runtime.py может быть запущен на нескольких узлах соответственно, и каждый сценарий имеет разные параметры запуска, такие как разные ранги, так что каждый узел запускается на каждом узле, соответствующем разным этапам. Модель.
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 0 --local_rank 0 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 1 --local_rank 1 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 2 --local_rank 2 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
python main_with_runtime.py --module models.vgg16.gpus=4 -b 64 --data_dir <path to ImageNet> --rank 3 --local_rank 3 --master_addr <master IP address> --config_path models/vgg16/gpus=4/hybrid_conf.json --distributed_backend gloo
Общий код выглядит следующим образом:
-
Разобрать входные параметры
-
загрузить, сгенерировать модель
-
Сборка моделей из модулей
-
Настройте в соответствии с такими параметрами, как размер ввода, размер партии и т. д.
-
Перебрать каждый слой модели (пропустить последний слой потерь)
- Перебрать вход каждого слоя и построить входной тензор
- Выход строится путем вызова прямой функции, соответствующей этапу
- Итерируйте вывод каждого слоя, устанавливая его тип и форму.
-
Создайте тип тензора выходного значения
-
загрузить файл конфигурации
-
Создание среды выполнения StageRuntime
-
оптимизатор сборки
-
загрузить набор данных
-
Поезд, сохранить контрольно-пропускной пункт
Общая логика использования среды выполнения взята в качестве примера в следующем файле: runtime/translation/main_with_runtime.py. Основная логика такова:
2.2 Общая логика
Общая логика использования среды выполнения взята в качестве примера в следующем файле: runtime/translation/main_with_runtime.py. Основная логика такова:
-
Разобрать входные параметры
-
загрузить, сгенерировать модель
-
Сборка моделей из модулей
-
Настройте в соответствии с такими параметрами, как размер ввода, размер партии и т. д.
-
Перебрать каждый слой модели (пропустить последний слой потерь)
- Перебрать вход каждого слоя и построить входной тензор
- Выход строится путем вызова прямой функции, соответствующей этапу
- Итерируйте вывод каждого слоя, устанавливая его тип и форму.
-
Создайте тип тензора выходного значения
-
загрузить файл конфигурации
-
Создание среды выполнения StageRuntime
-
оптимизатор сборки
-
загрузить набор данных
-
Поезд, сохранить контрольно-пропускной пункт
Общий код выглядит следующим образом: Общий код выглядит следующим образом:
def main():
# 解析输入参数
global args, best_prec1
args = parser.parse_args()
# Special case handling for GNMT model
l2_promote()
torch.cuda.set_device(args.local_rank)
# build tokenizer
tokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))
# define loss function
criterion = build_gnmt_criterion(
vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)
# create stages of the model
# 加载,生成模型
module = importlib.import_module(args.module)
args.arch = module.arch()
# 依据模块来构建模型
model = module.model(criterion)
# 依据参数进行配置比如输入大小,batch size等
input_size = [args.max_length_train, args.batch_size]
training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
"input2": input_size, "target": [args.max_length_train * args.batch_size],
"target_length": [args.batch_size]}
dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
"target": torch.int64, "target_length": torch.int32}
inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
target_tensor_names = {"target", "target_length"}
# 遍历模型的每个层(跳过最后loss层)
for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): # Skip last layer (loss).
input_tensors = []
# 遍历每层的输入,构建输入张量
for module_input in inputs:
if module_input in inputs_module_destinations:
inputs_module_destinations[module_input] = module_id
input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
dtype=dtypes[module_input])#.cuda()
input_tensors.append(input_tensor)
#stage.cuda()
# PyTorch should not maintain metadata for a backward pass on
# synthetic inputs. Without the following line, the runtime is
# as much as 1.5x slower in a full DP configuration.
with torch.no_grad():
# 通过调用stage对应的forward函数,构建出输出
output_tensors = stage(*tuple(input_tensors))
if not type(output_tensors) is tuple:
output_tensors = [output_tensors]
# 遍历每层的输出,设置其类型和形状
for output, output_tensor in zip(outputs,
list(output_tensors)):
# output 是 ['out2', 'out1']
training_tensor_shapes[output] = list(output_tensor.size())
dtypes[output] = output_tensor.dtype
# 构建输出值张量类型
eval_tensor_shapes = {}
for key in training_tensor_shapes:
eval_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
training_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
# 加载配置文件
configuration_maps = {
'module_to_stage_map': None,
'stage_to_rank_map': None,
'stage_to_depth_map': None
}
if args.config_path is not None:
json_config_file = json.load(open(args.config_path, 'r'))
configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
configuration_maps['stage_to_rank_map'] = {
int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
# 构建一个 StageRuntime
r = runtime.StageRuntime(
model=model, distributed_backend=args.distributed_backend,
fp16=args.fp16, loss_scale=args.loss_scale,
training_tensor_shapes=training_tensor_shapes,
eval_tensor_shapes=eval_tensor_shapes,
training_tensor_dtypes=dtypes,
inputs_module_destinations=inputs_module_destinations,
target_tensor_names=target_tensor_names,
configuration_maps=configuration_maps,
master_addr=args.master_addr,
rank=args.rank, local_rank=args.local_rank,
num_ranks_in_server=args.num_ranks_in_server,
verbose_freq=args.verbose_frequency,
model_type=runtime.TRANSLATION,
enable_recompute=args.recompute)
# stage needed to determine if current stage is the first stage
# num_stages needed to determine if current stage is the last stage
# num_ranks needed to determine number of warmup_minibatches in case of pipelining
args.stage = r.stage
args.num_stages = r.num_stages
args.num_ranks = r.num_ranks
if not is_first_stage():
args.synthetic_data = True
# define optimizer
if args.no_input_pipelining:
num_versions = 1
else:
# number of versions is the total number of machines following the current
# stage, shared amongst all replicas in this stage
num_versions = r.num_warmup_minibatches + 1
# if specified, resume from checkpoint
if args.resume:
checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)
assert os.path.isfile(checkpoint_file_path)
print("=> loading checkpoint '{}'".format(checkpoint_file_path))
checkpoint = torch.load(checkpoint_file_path)
args.start_epoch = checkpoint['epoch']
best_prec1 = checkpoint['best_prec1']
r.load_state_dict(checkpoint['state_dict'])
print("=> loaded checkpoint '{}' (epoch {})"
.format(checkpoint_file_path, checkpoint['epoch']))
# TODO: make this configurable by args
# 建立 optimizer
use_adam_optimizer = True
if use_adam_optimizer:
optimizer = adam.AdamWithWeightStashing(
modules=r.modules(), master_parameters=r.master_parameters,
model_parameters=r.model_parameters, loss_scale=args.loss_scale,
num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),
weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,
macrobatch=args.macrobatch)
else:
optimizer = sgd.SGDWithWeightStashing(
modules=r.modules(), master_parameters=r.master_parameters,
model_parameters=r.model_parameters, loss_scale=args.loss_scale,
num_versions=num_versions, lr=args.lr, momentum=args.momentum,
weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)
if args.resume:
optimizer.load_state_dict(checkpoint['optimizer'])
cudnn.benchmark = True
# 加载 dataset
train_dataset = LazyParallelDataset(
src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),
tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),
tokenizer=tokenizer,
min_len=args.min_length_train,
max_len=args.max_length_train,
sort=False,
max_size=None)
val_dataset = ParallelDataset(
src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),
tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),
tokenizer=tokenizer,
min_len=args.min_length_train,
max_len=args.max_length_train,
sort=True)
distributed_sampler = False
if configuration_maps['stage_to_rank_map'] is not None:
num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])
if num_ranks_in_first_stage > 1:
distributed_sampler = True
# TODO: fix random seeds
train_loader = train_dataset.get_loader(
batch_size=args.batch_size, seeds=range(args.epochs),
batch_first=False, shuffle=True,
bucketing=not args.no_bucketing, num_workers=args.workers,
world_size=r.num_ranks_in_first_stage,
rank=r.rank_in_stage if r.stage == 0 else 0
)
val_loader = val_dataset.get_loader(
batch_size=args.batch_size, batch_first=False,
shuffle=True, num_workers=args.workers,
world_size=r.num_ranks_in_first_stage,
seeds=range(args.epochs),
rank=r.rank_in_stage if r.stage == 0 else 0
)
# if checkpoint is loaded, start by running validation
if args.resume:
assert args.start_epoch > 0
validate(val_loader, r, args.start_epoch-1)
# 进行训练,保存checkpoint
for epoch in range(args.start_epoch, args.epochs):
if distributed_sampler:
train_loader.sampler.set_epoch(epoch)
adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)
# train or run forward pass only for one epoch
if args.forward_only:
validate(val_loader, r, epoch)
else:
train(train_loader, r, optimizer, epoch)
# evaluate on validation set
prec1 = validate(val_loader, r, epoch)
if r.stage != r.num_stages: prec1 = 0
# remember best prec@1 and save checkpoint
best_prec1 = max(prec1, best_prec1)
should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0
if args.checkpoint_dir and should_save_checkpoint:
save_checkpoint({
'epoch': epoch + 1,
'arch': args.arch,
'state_dict': r.state_dict(),
'best_prec1': best_prec1,
'optimizer' : optimizer.state_dict(),
'tokenizer': tokenizer.get_state()
}, args.checkpoint_dir, r.stage, epoch)
3 Загрузите модель
Давайте сначала посмотрим, как загрузить модель.
3.1 Файл модели
Файл модели сгенерирован выше, поэтому загрузите файл модели здесь.В качестве примера возьмем файл модели в ../translation/models/gnmt/gpus=4/.
здесь__init__
Файлы следующие:
from .gnmt import GNMTSplit
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3
def arch():
return "gnmt"
def model(criterion):
return [
(Stage0(), ["input0", "input1"], ["out2", "out1"]),
(Stage1(), ["out2", "input1", "input2", "out1"], ["out3", "out7"]),
(Stage2(), ["out3", "out7"], ["out8", "out9", "out10"]),
(Stage3(), ["out8", "out9", "out10"], ["out12"]),
(criterion, ["out12"], ["loss"])
]
def full_model():
return GNMTSplit()
Конкретный формат каждого элемента выглядит следующим образом:
(stage, inputs, outputs)
Поэтому его нужно загрузить в этом формате.
3.2 Загрузка
Конкретный метод загрузки выглядит следующим образом:
# create stages of the model
module = importlib.import_module(args.module)
args.arch = module.arch()
module = {module} <module 'translation.models.gnmt.gpus=4' from '../translation/models/gnmt/gpus=4/__init__.py'>
GNMTSplit = {type} <class 'translation.models.gnmt.gpus=4.gnmt.GNMTSplit'>
Stage0 = {type} <class 'translation.models.gnmt.gpus=4.stage0.Stage0'>
Stage1 = {type} <class 'translation.models.gnmt.gpus=4.stage1.Stage1'>
Stage2 = {type} <class 'translation.models.gnmt.gpus=4.stage2.Stage2'>
Stage3 = {type} <class 'translation.models.gnmt.gpus=4.stage3.Stage3'>
gnmt = {module} <module 'translation.models.gnmt.gpus=4.gnmt' from '../translation/models/gnmt/gpus=4/gnmt.py'>
stage0 = {module} <module 'translation.models.gnmt.gpus=4.stage0' from '../translation/models/gnmt/gpus=4/stage0.py'>
stage1 = {module} <module 'translation.models.gnmt.gpus=4.stage1' from '../translation/models/gnmt/gpus=4/stage1.py'>
stage2 = {module} <module 'translation.models.gnmt.gpus=4.stage2' from '../translation/models/gnmt/gpus=4/stage2.py'>
stage3 = {module} <module 'translation.models.gnmt.gpus=4.stage3' from '../translation/models/gnmt/gpus=4/stage3.py'>
Модуль получается следующим образом:
3.3 Построение модели
Далее модель будет строиться по модулю.
model = module.model(criterion)
Здесь критерий LabelSmoothing() .
В вызове модели (критерия) Stage0() ~ Stage3() вызываются один за другим для построения каждого слоя.
Например, Stage3 вызовет__init__
функция.
class Stage3(torch.nn.Module):
def __init__(self):
super(Stage3, self).__init__()
self.layer5 = torch.nn.LSTM(2048, 1024)
self.layer8 = Classifier(1024, 32320)
Модель получается следующим образом.
model = {list: 5}
0 = {tuple: 3}
0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\n (layer7): LSTM(2048, 1024)\n (layer9): Dropout(p=0.2, inplace=False)\n)
1 = {list: 2} ['input0', 'input1']
2 = {list: 2} ['out2', 'out1']
__len__ = {int} 3
1 = {tuple: 3}
0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n)
1 = {list: 4} ['out2', 'input1', 'input2', 'out1']
2 = {list: 2} ['out3', 'out7']
__len__ = {int} 3
2 = {tuple: 3}
0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\n (layer9): LSTM(2048, 1024)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer13): LSTM(2048, 1024)\n (layer16): Dropout(p=0.2, inplace=False)\n)
1 = {list: 2} ['out3', 'out7']
2 = {list: 3} ['out8', 'out9', 'out10']
__len__ = {int} 3
3 = {tuple: 3}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {list: 3} ['out8', 'out9', 'out10']
2 = {list: 1} ['out12']
__len__ = {int} 3
4 = {tuple: 3} (LabelSmoothing(), ['out12'], ['loss'])
0 = {LabelSmoothing} LabelSmoothing()
1 = {list: 1} ['out12']
2 = {list: 1} ['loss']
__len__ = {int} 3
__len__ = {int} 5
3.4 Ввод и вывод
После того, как модель загружена, начните устанавливать вход и выход, конкретная логика:
-
Настроить по параметрам
-
Пройдите каждый слой модели (пропустите последний слой потерь) и сделайте следующее:
- Итерируйте вход каждого слоя, строя входной тензор.
- Выходные данные создаются путем вызова прямой функции, соответствующей этапу.
- Просмотрите выходные данные каждого слоя и установите тип.
- Построить тензорную форму.
Следует отметить, что формат каждого подмодуля следующий:
(
Stage0(),
["input0", "input1"], # 输入
["out2", "out1"] # 输出
)
Комментарии к коду следующие:
# 依据参数进行配置比如输入大小,batch size等
input_size = [args.max_length_train, args.batch_size]
training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],
"input2": input_size, "target": [args.max_length_train * args.batch_size],
"target_length": [args.batch_size]}
dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,
"target": torch.int64, "target_length": torch.int32}
inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}
target_tensor_names = {"target", "target_length"}
# 遍历模型的每个层(跳过最后loss层)
for module_id, (stage, inputs, outputs) in enumerate(model[:-1]): # Skip last layer (loss).
input_tensors = []
# 遍历每层的输入,构建输入张量
for module_input in inputs:
if module_input in inputs_module_destinations:
inputs_module_destinations[module_input] = module_id
input_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),
dtype=dtypes[module_input]).cuda()
input_tensors.append(input_tensor)
stage.cuda()
# PyTorch should not maintain metadata for a backward pass on
# synthetic inputs. Without the following line, the runtime is
# as much as 1.5x slower in a full DP configuration.
with torch.no_grad():
# 通过调用stage对应的forward函数,构建出输出
output_tensors = stage(*tuple(input_tensors))
if not type(output_tensors) is tuple:
output_tensors = [output_tensors]
# 遍历每层的输出,设置其类型和形状
for output, output_tensor in zip(outputs,
list(output_tensors)):
# output 是 ['out2', 'out1']
training_tensor_shapes[output] = list(output_tensor.size())
dtypes[output] = output_tensor.dtype
# 构建输出值张量类型
eval_tensor_shapes = {}
for key in training_tensor_shapes:
eval_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
training_tensor_shapes[key] = tuple(
training_tensor_shapes[key])
Получил форму и тип вывода.
dtypes = {dict: 13}
'input0' = {dtype} torch.int64
'input1' = {dtype} torch.int64
'input2' = {dtype} torch.int64
'target' = {dtype} torch.int64
'target_length' = {dtype} torch.int32
'out2' = {dtype} torch.float32
'out1' = {dtype} torch.float32
'out3' = {dtype} torch.float32
'out7' = {dtype} torch.float32
'out8' = {dtype} torch.float32
'out9' = {dtype} torch.float32
'out10' = {dtype} torch.float32
'out12' = {dtype} torch.float32
__len__ = {int} 13
training_tensor_shapes = {dict: 13}
'input0' = {tuple: 2} (50, 128)
'input1' = {tuple: 1} 128
'input2' = {tuple: 2} (50, 128)
'target' = {tuple: 1} 6400
'target_length' = {tuple: 1} 128
'out2' = {tuple: 3} (50, 128, 1024)
'out1' = {tuple: 3} (50, 128, 1024)
'out3' = {tuple: 3} (50, 128, 1024)
'out7' = {tuple: 3} (50, 128, 1024)
'out8' = {tuple: 3} (50, 128, 1024)
'out9' = {tuple: 3} (50, 128, 1024)
'out10' = {tuple: 3} (50, 128, 1024)
'out12' = {tuple: 3} (50, 128, 32320)
__len__ = {int} 13
eval_tensor_shapes = {dict: 13} {
'input0' = {tuple: 2} (50, 128)
'input1' = {tuple: 1} 128
'input2' = {tuple: 2} (50, 128)
'target' = {tuple: 1} 6400
'target_length' = {tuple: 1} 128
'out2' = {tuple: 3} (50, 128, 1024)
'out1' = {tuple: 3} (50, 128, 1024)
'out3' = {tuple: 3} (50, 128, 1024)
'out7' = {tuple: 3} (50, 128, 1024)
'out8' = {tuple: 3} (50, 128, 1024)
'out9' = {tuple: 3} (50, 128, 1024)
'out10' = {tuple: 3} (50, 128, 1024)
'out12' = {tuple: 3} (50, 128, 32320)
__len__ = {int} 13
3.5 Конфигурация
Загрузите файл конфигурации, созданный выше.
configuration_maps = {
'module_to_stage_map': None,
'stage_to_rank_map': None,
'stage_to_depth_map': None
}
if args.config_path is not None:
json_config_file = json.load(open(args.config_path, 'r'))
configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)
configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)
configuration_maps['stage_to_rank_map'] = {
int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}
configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)
Соответствующие файлы:
{
"module_to_stage_map": [0, 1, 2, 3, 3],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
получить:
configuration_maps = {dict: 3}
'module_to_stage_map' = {list: 5} [0, 1, 2, 3, 3]
'stage_to_rank_map' = {dict: 4} {0: [0], 1: [1], 2: [2], 3: [3]}
'stage_to_depth_map' = {NoneType} None
__len__ = {int} 3
0x04 runtime
Для демонстрации мы запускаем main_with_runtime.py со следующими параметрами.
--module translation.models.gnmt.gpus=4 --data_dir=wmt16_ende_data_bpe_clean --config_path pipedream-pipedream/runtime/translation/models/gnmt/gpus=4/mp_conf.json --local_rank 3 --rank 3 --master_addr 127.0.0.1
В основной функции среда выполнения построена следующим образом. Среда выполнения — это механизм выполнения, обеспечивающий унифицированный масштабируемый уровень инфраструктуры.
r = runtime.StageRuntime(
model=model, distributed_backend=args.distributed_backend,
fp16=args.fp16, loss_scale=args.loss_scale,
training_tensor_shapes=training_tensor_shapes,
eval_tensor_shapes=eval_tensor_shapes,
training_tensor_dtypes=dtypes,
inputs_module_destinations=inputs_module_destinations,
target_tensor_names=target_tensor_names,
configuration_maps=configuration_maps,
master_addr=args.master_addr,
rank=args.rank, local_rank=args.local_rank,
num_ranks_in_server=args.num_ranks_in_server,
verbose_freq=args.verbose_frequency,
model_type=runtime.TRANSLATION,
enable_recompute=args.recompute)
4.1 StageRuntime
StageRuntime определяется следующим образом. Вы можете видеть, что его основные переменные-члены — это метаданные, необходимые для прямых и обратных операций внутри этой стадии, например:
Тензоры, градиенты, распределенные серверные части, шкала потерь, тензорный тип обучающих данных, форма тензора выходного значения и т. д.
class StageRuntime:
def __init__(self, model, distributed_backend, fp16, loss_scale,
training_tensor_shapes, eval_tensor_shapes,
training_tensor_dtypes, inputs_module_destinations,
target_tensor_names, configuration_maps, master_addr,
rank, local_rank, num_ranks_in_server, verbose_freq,
model_type, enable_recompute=False):
# Metadata needed for forward and backward pass within this stage.
self.tensors = []
self.gradients = {}
self.distributed_backend = distributed_backend
self.fp16 = fp16
self.loss_scale = loss_scale
self.training_tensor_shapes = training_tensor_shapes
self.eval_tensor_shapes = eval_tensor_shapes
self.training_tensor_dtypes = training_tensor_dtypes
self.model_type = model_type
self.target_tensor_names = target_tensor_names
self.initialize(model, inputs_module_destinations, configuration_maps,
master_addr, rank, local_rank, num_ranks_in_server)
self.verbose_freq = verbose_freq
self.forward_only = False
self.forward_stats = runtime_utilities.RuntimeStats(forward=True)
self.backward_stats = runtime_utilities.RuntimeStats(forward=False)
# Enable recomputation to prevent the need to save activations
# computed from the forward pass for the backward pass.
self.enable_recompute = enable_recompute
# Disable recomputation for the last stage.
if rank == num_ranks_in_server - 1:
self.enable_recompute = False
Код функции инициализации очень длинный, и мы разбираем его сегмент за сегментом.
4.2 Инициализация
4.2.1 Установка тегов
В начале функции пройдите через вход и выход каждого слоя модели и установите tensor_tag, который должен дать каждому тензору независимый и уникальный тег, Тег передается через слои и, наконец, функция recv в Distributed_c10d .py будет использоваться. tensor_tag будет использоваться в процессе связи и использоваться p2p для определения идентификации.
def recv(tensor,
src=None,
group=None,
tag=0):
"""
Receives a tensor synchronously.
Args:
tensor (Tensor): Tensor to fill with received data.
src (int, optional): Source rank. Will receive from any
process if unspecified.
group (ProcessGroup, optional): The process group to work on. If None,
the default process group will be used.
tag (int, optional): Tag to match recv with remote send
Returns:
Sender rank
-1, if not part of the group
"""
_check_single_tensor(tensor, "tensor")
if _rank_not_in_group(group):
return -1
if group is None:
pg = _get_default_group()
else:
pg = group
if src is None:
work = pg.recv_anysource([tensor], tag)
work.wait()
src_rank = work._source_rank()
if group is None or group is GroupMember.WORLD:
return src_rank
else:
return _get_global_rank(pg, src_rank)
else:
if group is None or group is GroupMember.WORLD:
pg.recv([tensor], src, tag).wait()
else:
group_src_rank = _get_group_rank(pg, src)
pg.recv([tensor], group_src_rank, tag).wait()
return src
Конкретный код для установки тега выглядит следующим образом:
def initialize(self, model, inputs_module_destinations,
configuration_maps, master_addr, rank,
local_rank, num_ranks_in_server):
self.send_ranks = {}
self.receive_ranks = {}
self.rank = rank
self.local_rank = local_rank
self.stage = None
self.tensor_tags = {}
self.forward_minibatch_id = 0
self.backward_minibatch_id = 0
self.criterion_input_name = str(model[-1][1][0])
tensor_tag = 1
# 遍历模型中每一层,每一层的格式是 (_, input_tensors, output_tensors)
for (_, input_tensors, output_tensors) in model:
# 遍历输入
for input_tensor in input_tensors:
if input_tensor not in self.tensor_tags:
self.tensor_tags[input_tensor] = tensor_tag
tensor_tag += 1 # 设置 tag
# 遍历输出
for output_tensor in output_tensors:
if output_tensor not in self.tensor_tags:
self.tensor_tags[output_tensor] = tensor_tag
tensor_tag += 1 # 设置 tag
for target_tensor_name in sorted(self.target_tensor_names):
self.tensor_tags[target_tensor_name] = tensor_tag
tensor_tag += 1 # 设置 tag
self.tensor_tags["ack"] = tensor_tag
tensor_tag += 1 # 设置 tag
Вход:
target_tensor_names = {set: 2} {'target_length', 'target'}
{str} 'target_length'
{str} 'target'
__len__ = {int} 2
model = {list: 5}
0 = {Stage0} Stage0(\n (layer4): Embedding(32320, 1024, padding_idx=0)\n (layer5): EmuBidirLSTM(\n (bidir): LSTM(1024, 1024, bidirectional=True)\n (layer1): LSTM(1024, 1024)\n (layer2): LSTM(1024, 1024)\n )\n (layer6): Dropout(p=0.2, inplace=False)\n (layer7): LS
1 = {list: 2} ['input0', 'input1']
2 = {list: 2} ['out2', 'out1']
__len__ = {int} 3
1 = {tuple: 3}
0 = {Stage1} Stage1(\n (layer6): LSTM(1024, 1024)\n (layer9): Embedding(32320, 1024, padding_idx=0)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer12): LSTM(1024, 1024)\n (layer15): RecurrentAttention(\n (rnn): LSTM(1024, 1024)\n (attn): BahdanauAttention(\n (linear_q): Linear(in_features=1024, out_features=1024, bias=False)\n (linear_k): Linear(in_features=1024, out_features=1024, bias=False)\n (dropout): Dropout(p=0, inplace=False)\n )\n (dropout): Dropout(p=0, inplace=False)\n )\n)
1 = {list: 4} ['out2', 'input1', 'input2', 'out1']
2 = {list: 2} ['out3', 'out7']
__len__ = {int} 3
2 = {tuple: 3}
0 = {Stage2} Stage2(\n (layer7): Dropout(p=0.2, inplace=False)\n (layer9): LSTM(2048, 1024)\n (layer11): Dropout(p=0.2, inplace=False)\n (layer13): LSTM(2048, 1024)\n (layer16): Dropout(p=0.2, inplace=False)\n)
1 = {list: 2} ['out3', 'out7']
2 = {list: 3} ['out8', 'out9', 'out10']
__len__ = {int} 3
3 = {tuple: 3}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {list: 3} ['out8', 'out9', 'out10']
2 = {list: 1} ['out12']
__len__ = {int} 3
4 = {tuple: 3}
0 = {LabelSmoothing} LabelSmoothing()
1 = {list: 1} ['out12']
2 = {list: 1} ['loss']
__len__ = {int} 3
__len__ = {int} 5
получить:
tensor_tags = {dict: 15}
'input0' = {int} 1
'input1' = {int} 2
'out2' = {int} 3
'out1' = {int} 4
'input2' = {int} 5
'out3' = {int} 6
'out7' = {int} 7
'out8' = {int} 8
'out9' = {int} 9
'out10' = {int} 10
'out12' = {int} 11
'loss' = {int} 12
'target' = {int} 13
'target_length' = {int} 14
'ack' = {int} 15
__len__ = {int} 15
4.2.2 Настройка карты
Напомним некоторые определения в файле конфигурации:
- module_to_stage_map: на какие этапы разделена эта модель.
- stage_to_rank_map: какой ранг соответствует каждому этапу, а ранг представляет конкретный рабочий процесс, например, этот этап распараллелен несколькими рангами.
Приведем пример, соответствующее содержимое файла выглядит следующим образом:
{
"module_to_stage_map": [0, 1, 2, 2],
"stage_to_rank_map": {"0": [0, 1, 4, 5, 8, 9, 12, 13], "1": [2, 6, 10, 14], "2": [3, 7, 11, 15]}
}
Для нашей модели в этой статье файл конфигурации выглядит следующим образом:
{
"module_to_stage_map": [0, 1, 2, 3, 3],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
Загружается в память как:
module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
rank_to_stage_map = {dict: 4} {0: 0, 1: 1, 2: 2, 3: 3}
Так как иногда необходимо искать в обратном порядке, тогда программа настраивается в обратном порядке, и результат выглядит следующим образом.
stage_to_module_map = {defaultdict: 4}
default_factory = {type} <class 'list'>
0 = {list: 1} [0]
1 = {list: 1} [1]
2 = {list: 1} [2]
3 = {list: 2} [3, 4]
__len__ = {int} 4
stage_to_rank_map = {dict: 4}
0 = {list: 1} [0]
1 = {list: 1} [1]
2 = {list: 1} [2]
3 = {list: 1} [3]
__len__ = {int} 4
4.2.3 Найдите собственную конфигурацию
Поскольку локальные local_rank и rank задаются в командной строке, то среда выполнения находит свои вещи из конфигурационного файла в соответствии с рангом и далее настраивает себя.
stage_to_module_map = collections.defaultdict(list)
for module in range(len(module_to_stage_map)):
# 这里配置了哪个stage拥有哪些module
stage_to_module_map[module_to_stage_map[module]].append(module)
rank_to_stage_map = {}
for stage in stage_to_rank_map:
for rank in stage_to_rank_map[stage]:
# 配置了哪个 rank 拥有哪些 stage
rank_to_stage_map[rank] = stage
# Now, use this mapping to determine the modules contained in
# each stage.
assert 0 <= self.rank < len(rank_to_stage_map)
self.num_ranks = len(rank_to_stage_map) # 就是得到了world_size,因为有多少个rank,就是有多少个训练进程,就是world size
self.num_stages = len(stage_to_module_map) # 多少个阶段
self.stage = rank_to_stage_map[self.rank] # 通过自己的rank得到自己的stage
self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) # 本rank在stage之中排第几个
self.num_ranks_in_stage = len(stage_to_rank_map[self.stage])#得到自己stage的rank数目,就是数据并行数目,可以得到本层的数据并行次数
self.num_ranks_in_first_stage = len(stage_to_rank_map[0])
self.num_ranks_in_previous_stage = 0
self.ranks_in_previous_stage = []
if self.stage > 0:
self.num_ranks_in_previous_stage = len(
stage_to_rank_map[self.stage - 1])
self.ranks_in_previous_stage = stage_to_rank_map[self.stage - 1]
self.num_ranks_in_next_stage = 0
self.ranks_in_next_stage = []
if self.stage < self.num_stages - 1:
self.num_ranks_in_next_stage = len(
stage_to_rank_map[self.stage + 1])
self.ranks_in_next_stage = stage_to_rank_map[self.stage + 1]
modules = stage_to_module_map[self.stage] # 这里得到 [3,4],后续会用到。
self.modules_with_dependencies = ModulesWithDependencies(
[model[module] for module in modules])
self.is_criterion = self.stage == (self.num_stages - 1)
if stage_to_depth_map is not None:
self.num_warmup_minibatches = stage_to_depth_map[
str(self.stage)]
else:
self.num_warmup_minibatches = self.num_ranks - 1
for i in range(self.stage):
self.num_warmup_minibatches -= len(
stage_to_rank_map[i])
self.num_warmup_minibatches = self.num_warmup_minibatches // \
self.num_ranks_in_stage
Переменные:
self = {StageRuntime}
backward_minibatch_id = {int} 0
criterion_input_name = {str} 'out12'
distributed_backend = {NoneType} None
eval_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '
forward_minibatch_id = {int} 0
fp16 = {bool} False
gradients = {dict: 0} {}
is_criterion = {bool} True
local_rank = {int} 3
loss_scale = {int} 1
model_type = {str} 'translation'
modules_with_dependencies = {ModulesWithDependencies}
_all_input_names = {list: 2} [['out8', 'out9', 'out10'], ['out12']]
_all_output_names = {list: 2} [['out12'], ['loss']]
_modules = {list: 2}
0 = {Stage3} Stage3(\n (layer5): LSTM(2048, 1024)\n (layer8): Classifier(\n (classifier): Linear(in_features=1024, out_features=32320, bias=True)\n )\n)
1 = {LabelSmoothing} LabelSmoothing()
__len__ = {int} 2
num_ranks = {int} 4
num_ranks_in_first_stage = {int} 1
num_ranks_in_next_stage = {int} 0
num_ranks_in_previous_stage = {int} 1
num_ranks_in_stage = {int} 1
num_stages = {int} 4
num_warmup_minibatches = {int} 0
rank = {int} 3
rank_in_stage = {int} 0
ranks_in_next_stage = {list: 0} []
ranks_in_previous_stage = {list: 1} [2]
receive_ranks = {dict: 0} {}
send_ranks = {dict: 0} {}
stage = {int} 3
target = {str} 'python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 178, in _getPyDictionary\n attr = getattr(var, n)\n File "../runtime.py", line 295, in target\n r
target_tensor_names = {set: 2} {'target', 'target_length'}
tensor_tags = {dict: 15} {'input0': 1, 'input1': 2, 'out2': 3, 'out1': 4, 'input2': 5, 'out3': 6, 'out7': 7, 'out8': 8, 'out9': 9, 'out10': 10, 'out12': 11, 'loss': 12, 'target': 13, 'target_length': 14, 'ack': 15}
tensors = {list: 0} []
training_tensor_dtypes = {dict: 13} {'input0': torch.int64, 'input1': torch.int64, 'input2': torch.int64, 'target': torch.int64, 'target_length': torch.int32, 'out2': torch.float32, 'out1': torch.float32, 'out3': torch.float32, 'out7': torch.float32, 'out8': torch.float32, 'out9': torch.floa
training_tensor_shapes = {dict: 13} {'input0': (50, 128), 'input1': (128,), 'input2': (50, 128), 'target': (6400,), 'target_length': (128,), 'out2': (50, 128, 1024), 'out1': (50, 128, 1024), 'out3': (50, 128, 1024), 'out7': (50, 128, 1024), 'out8': (50, 128, 1024), 'out9': (50, 128, 1024), '
Давайте посмотрим, как используются несколько переменных.
4.2.3.1 num_ranks
Во-первых, посмотрите, как используется num_ranks. Используется в последующем коде, например:
world_size=self.num_ranks # 依据 num_ranks 得到 world_size
self.num_warmup_minibatches = self.num_ranks - 1 # 依据 num_ranks 得到热身batch数目
4.2.3.2 rank_in_stage
Во-вторых, давайте посмотрим, как используется rank_in_stage?
спереди
self.rank_in_stage = stage_to_rank_map[self.stage].index(self.rank) # 本rank在stage之中排第几个
rank_in_stage передается модулю связи.
self.comm_handler.initialize(
self.receive_ranks,
self.send_ranks,
self.tensor_tags,
self.target_tensor_names,
self.training_tensor_dtypes,
self.rank_in_stage, # 在这里作为参数传入,在函数里面代表本节点,后续会详细介绍
self.num_ranks_in_stage,
self.ranks_in_previous_stage,
self.ranks_in_next_stage)
4.2.4 Настройка коммуникационного модуля
Далее настройте модуль связи.
# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
master_port = 12345
self.comm_handler = communication.CommunicationHandler(
master_addr=master_addr,
master_port=master_port,
rank=self.rank,
local_rank=self.local_rank,
num_ranks_in_server=num_ranks_in_server,
world_size=self.num_ranks,
fp16=self.fp16,
backend=self.distributed_backend)
Конфигурационный код выглядит следующим образом, а CommunicationHandler конструируется Этот модуль для последующей "настройки производителей и потребителей", поэтому мы временно размещаем последующий код здесь.
else:
......
# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
master_port = 12345
self.comm_handler = communication.CommunicationHandler(
master_addr=master_addr,
master_port=master_port,
rank=self.rank,
local_rank=self.local_rank,
num_ranks_in_server=num_ranks_in_server,
world_size=self.num_ranks,
fp16=self.fp16,
backend=self.distributed_backend)
# 设置生产者和消费者部分,我们下面会详细分析
# 设置接受ranks
for i in range(len(model)): # 遍历层
for j in range(i+1, len(model)): # 遍历 i 层之后的若干层
for tensor_name in model[i][2]: # 找出前面层 output 的tensor
if tensor_name in model[j][1]: # 看看 output 在不在input之中
if module_to_stage_map[i] == \
module_to_stage_map[j]:
continue
# For now, assume that each stage is served by only
# a single machine.
if module_to_stage_map[j] == self.stage:
self.receive_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[i]]
if module_to_stage_map[i] == self.stage:
self.send_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[j]]
# 设置发送ranks
for model_inputs in inputs_module_destinations.keys():
destination_stage = module_to_stage_map[
inputs_module_destinations[model_inputs]]
if destination_stage > self.stage:
self.send_ranks[model_inputs] = \
self.ranks_in_next_stage
if 0 < self.stage <= destination_stage:
self.receive_ranks[model_inputs] = \
self.ranks_in_previous_stage
if destination_stage > 0:
if model_inputs not in self.tensor_tags:
self.tensor_tags[model_inputs] = tensor_tag
tensor_tag += 1
4.2.5 Настройка производителей и потребителей
Затем настройте отправленные и полученные ранги.receive_ranks и send_ranks — это соответствующие целевые ранги отправки и получения каждого тензора на этом этапе.
Как упоминалось ранее, PyTorch не выпускал стабильный RPC, когда разрабатывался PipeDream, поэтому PipeDream (опубликованный в 2019 году) может реализовать только набор взаимосвязей логики или граф распределенных вычислений. Производители и потребители являются важной частью графа распределенных вычислений.
Логическая абстракция выглядит следующим образом:
-
Пройдите модель модели, предполагая модель [i], обратите внимание, что модель [i] здесь является определенным слоем. Этап может включать несколько уровней, например [уровень 1, уровень 2, уровень 3], и этот этап может выполнять параллелизм данных на нескольких рангах. Например, ранг 1 и ранг 2 будут выполняться [уровень 1, уровень 2, уровень 3].
-
Для каждой модели [i] выполните итерацию по моделям после модели [i], предполагая модель [j].
-
Перебрать выходные данные модели [i], предполагая tensor_name.
-
Если tensor_name есть и на входе model[j], то есть tensor_name есть и на выходе model[i] и на входе module[j], значит, они могут установить соединение. Потому что, если тензор имеет только вход или только выход, нет необходимости устанавливать какой-либо механизм связи для этого тензора.
-
Если model[i] и model[j] находятся на одном этапе, это один и тот же узел или несколько узлов, но они контролируются DDP, поэтому механизм связи не используется.
-
Если tensor_name является входом модуля[j], а модуль[j] расположен на этом узле, это означает, что Receive_Ranks этого узла включают вход модуля[j] (конечно, он может также включать вход других модели).
- Таким образом, входной ранг tensor_name включает ранг, соответствующий модели [j].
-
tensor_name — это выходные данные модуля [i], а модуль [i] расположен на этом узле, что указывает на то, что send_ranks этого узла включают выходные данные модуля [i] (конечно, он также может включать выходные данные других моделей) .
- Таким образом, выходной ранг tensor_name включает ранг, соответствующий модели [i].
-
-
Конкретный код выглядит следующим образом:
# To determine where tensors should be sent and received, first
# determine the "producing" and "consuming" module IDs of each
# tensor. We then use the corresponding machine ranks to send
# and receive tensors.
for i in range(len(model)): # 遍历层
for j in range(i+1, len(model)): # 遍历 i 层之后的若干层
for tensor_name in model[i][2]: # 找出前面层 output 的tensor
if tensor_name in model[j][1]: # 看看 tensor_name 在不在input之中,即tensor_name 是不是 modle[j]的输入
# tensor_name即在 model[i] 的输出,也在 module[j]的输入,就说明他们之间可以建立联系
if module_to_stage_map[i] == \
module_to_stage_map[j]: # 两个module在一个node上,不用通信机制
continue
# For now, assume that each stage is served by only
# a single machine.
# tensor_name 是 modle[j]的输入,且module[j]位于本节点上,说明可以和本节点的 receive_ranks 建立联系
if module_to_stage_map[j] == self.stage:
# 所以tensor_name的输入rank包括rank i
self.receive_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[i]]
# tensor_name 是module[i]的输出,且module[i]位于本节点上,说明可以和本节点的 send_ranks 建立联系
if module_to_stage_map[i] == self.stage:
# 所以tensor_name的输出rank包括rank j
self.send_ranks[tensor_name] = \
stage_to_rank_map[module_to_stage_map[j]]
for model_inputs in inputs_module_destinations.keys():
destination_stage = module_to_stage_map[
inputs_module_destinations[model_inputs]]
if destination_stage > self.stage:
self.send_ranks[model_inputs] = \
self.ranks_in_next_stage
if 0 < self.stage <= destination_stage:
self.receive_ranks[model_inputs] = \
self.ranks_in_previous_stage
if destination_stage > 0:
if model_inputs not in self.tensor_tags:
self.tensor_tags[model_inputs] = tensor_tag
tensor_tag += 1
Получены следующие переменные:
num_ranks = {int} 4
num_ranks_in_first_stage = {int} 1
num_ranks_in_next_stage = {int} 0
num_ranks_in_previous_stage = {int} 1
num_ranks_in_stage = {int} 1
num_stages = {int} 4
num_warmup_minibatches = {int} 0
rank = {int} 3
rank_in_stage = {int} 0
ranks_in_next_stage = {list: 0} []
ranks_in_previous_stage = {list: 1} [2]
receive_ranks = {dict: 3} # 这里就是每个tensor对应的接收目标rank
'out8' = {list: 1} [2]
'out9' = {list: 1} [2]
'out10' = {list: 1} [2]
__len__ = {int} 3
send_ranks = {dict: 0} {} # 这里就是每个tensor对应的发送目标rank
__len__ = {int} 0
stage = {int} 3
4.2.6 Настройка модулей
Далее мы будем иметь дело с операциями, связанными с модулем.В частности, здесь мы будем:
- Во-первых, используйте ModulesWithDependencies, чтобы продолжить обработку модели и настроить ввод и вывод.
- Затем вызовите cuda, чтобы переместить модель и параметры в GPU.
- Конвертируйте для fp16, если требуется обработка.
Что касается раздела ModulesWithDependencies, мы сосредоточимся на объяснении.
Перед нашим кодом у нас следующее, то есть индекс модулей, соответствующий этому этапу, должен быть инвертирован.
modules = stage_to_module_map[self.stage] # 这里得到 [3,4],后续会用到。
stage_to_module_map — установить отношения между этапами и модулями, чтобы получить модули, соответствующие этому этапу.
Напомним файл конфигурации, этот этап (значение 3) соответствует двум модулям с индексом 3 и 4, которые являются следующими 3 и 3
module_to_stage_map = {list: 5} [0, 1, 2, 3, 3]
Затем вам нужно получить конкретные модули этого этапа с помощью следующего кода, включая ввод и вывод каждого модуля.
modules = self.modules_with_dependencies.modules()
for i in range(len(modules)):
modules[i] = modules[i].cuda()
if self.fp16:
import apex.fp16_utils as fp16_utils
modules[i] = fp16_utils.BN_convert_float(modules[i].half())
Конкретные ModulesWithDependencies следующие:
class ModulesWithDependencies:
def __init__(self, modules_with_dependencies):
self._modules = []
self._all_input_names = []
self._all_output_names = []
for (module, input_names, output_names) in modules_with_dependencies:
self._modules.append(module)
self._all_input_names.append(input_names)
self._all_output_names.append(output_names)
def modules(self):
return self._modules
def all_input_names(self):
return self._all_input_names
def all_output_names(self):
return self._all_output_names
def is_input_tensor(self, tensor_name):
for module_input_names in self._all_input_names:
if tensor_name in module_input_names:
return True
return False
4.2.7 Установить группу
Далее создается группа по параллельному номеру каждого этапа.
ранги — это параллельные ранги каждого этапа, например этап 0 соответствует [0, 1, 2].
{
"module_to_stage_map": [0, 1, 1],
"stage_to_rank_map": {"0": [0, 1, 2], "1": [3]} # 每个stage的rank,这里目的是得到并行的机器
}
Пройдите этапы, для каждого этапа звонитеnew_group()
Создайте группу процессов.new_group()
Функция для создания новой группы процессов с использованием произвольного подмножества всех процессов, метод возвращает дескриптор группы, который можно использовать какcollectives
(для обмена информацией в конкретном режиме программирования) связанные распределенные функцииgroup
параметр .
Это то, что упоминалось в исходном вопросе: для параллелизма данных на каждом этапе необходимо создавать и управлять своей собственной группой процессов.
# Initialize all groups in the same order on every worker.
if stage_to_rank_map is not None:
groups = []
for stage in range(self.num_stages): # 遍历stage
ranks = stage_to_rank_map[stage] # 与stage的数据并行对应,比如得到 [0, 1, 2]
if len(ranks) > 1: # 与后面的 ddp 相对应
groups.append(dist.new_group(ranks=ranks))
else:
groups.append(None)
group = groups[self.stage]
else:
group = None
4.2.8 Настройка параллелизма данных
Наконец, вызовите DistributedDataParallel для обработки. этот параметрprocess_group=group
Это то, что вернула предыдущая «группа набора».
Это установить набор DistributedDataParallel для каждой группы.
# self.modules_with_dependencies contains a list of PyTorch
# modules, along with a list of user-defined input and output
# tensor names. We use our module_executor.ModuleExecutor
# class to wrap these dependencies, and use run_forward and
# run_backward methods downstream.
num_parameters = 0
for i in range(len(modules)):
if group is not None:
if ((i < (len(modules)-1) and self.is_criterion)
or not self.is_criterion):
num_parameters += \
sum(x.size()[0] * x.size()[1]
if len(x.size()) > 1 else x.size()[0]
for x in modules[i].parameters() if x.size())
# 建立分布式数据并行
modules[i] = torch.nn.parallel.DistributedDataParallel(
modules[i],
process_group=group,
device_ids=[local_rank],
output_device=local_rank)
if self.num_ranks_in_stage > 1:
module_size = 4. * num_parameters
print("Replicating stage: ranks=%d, module_size=%.3f" % (
self.num_ranks_in_stage, module_size))
Что касается DistributedDataParallel, у нас будет специальная серия для анализа в будущем.
4.2.9 Инициализация функции связи
Наконец, инициализируйте этот коммуникационный модуль.
if self.comm_handler is not None:
self.comm_handler.initialize(
self.receive_ranks,
self.send_ranks,
self.tensor_tags,
self.target_tensor_names,
self.training_tensor_dtypes,
self.rank_in_stage,
self.num_ranks_in_stage,
self.ranks_in_previous_stage,
self.ranks_in_next_stage)
Давайте по-прежнему используем картинку в статье в качестве примера:
Если его уточнить для этой статьи, то это:
+----------------------------------------+
| Stage 2 StageRuntime |
| |
| CommunicationHandler |
| |
| +----------------------------+ |
| | +------------------------+ | |
| | |Rank 2 | | |
| | | | | |
| | | | | |
+-----------------------------+ | | | Layer 3 +---> Layer 4 | | |
| Stage 1 StageRuntime | | | | | | | +---------------------------+
| | | | | | | | | Stage 3 StageRuntime |
| | | | +------------------------+ | | | |
| CommunicationHandler | | | +------------------------+ | | | CommunicationHandler |
| | | | |Rank 3 | | | | |
| +-----------------------+ | | DDP | | | | | | +-----------------------+ |
| |Rank 1 | +---------------->+ | | +----------> | | Rank 4 | |
| | | | | | | Layer 3 +---> Layer 4 | | | | | | |
| | Layer 1 +---> Layer 2 | | | | | | | | | | Layer 5 +---> Layer 6 | |
| | | | | | | | | | | | | |
| | | | | | +------------------------+ | | | | | |
| +-----------------------+ | | | +------------------------+ | | | +-----------------------+ |
| | | | |Rank 4 | | | | |
| | | | | | | | | |
+-----------------------------+ | | | | | | +---------------------------+
| | | Layer 3 +---> Layer 4 | | |
| | | | | |
| | | | | |
| | +------------------------+ | |
| +----------------------------+ |
+----------------------------------------+
Телефон такой:
4.3 Функциональная функция
Здесь мы вводим только основные функции. Есть также несколько бизнес-функций, таких как run_forward, которые будут представлены вместе в статье 1F1B.
Все следующие функции вызывают коммуникационный модуль для завершения функции.
4.3.1 receive_tensors_forward
receive_tensors_forward должен получить тензоры из предыдущего слоя при прямом распространении.
При прямом проходе тензоры записываются в self.tensors этого экземпляра.
\
def receive_tensors_forward(self):
if self.forward_only and len(self.tensors) > 0:
self.tensors.pop(0) # 弹出以前
self.tensors.append({})
if self.loader_iter is not None: # 前向传播第一层,需要加载数据
input = next(self.loader_iter) # 加载新的
if self.model_type == TRANSLATION:
(input, target) = input
src, src_length = input
tgt, tgt_length = target
self.tensors[-1]["input0"] = src.cuda(non_blocking=True)
self.tensors[-1]["input1"] = torch.LongTensor(src_length).cuda(
non_blocking=True)
self.tensors[-1]["input2"] = tgt[:-1].cuda(non_blocking=True)
self.tensors[-1]["target"] = tgt[1:].cuda().contiguous().view(-1)
self.tensors[-1]["target_length"] = \
torch.tensor([int(sum(torch.LongTensor(tgt_length) - 1))],
dtype=torch.int).cuda()
elif self.model_type == IMAGE_CLASSIFICATION:
(input, target) = input
if self.fp16:
input = input.half()
self.tensors[-1]["input0"] = input.cuda(non_blocking=True)
self.tensors[-1]["target"] = target.cuda(non_blocking=True)
elif self.model_type == SPEECH_TO_TEXT:
input, target, input_percentages, target_sizes = input
input_sizes = input_percentages.mul_(int(input.size(3))).int()
self.tensors[-1]["input0"] = input.cuda(non_blocking=True)
self.tensors[-1]["input1"] = input_sizes.cuda(non_blocking=True)
self.tensors[-1]["target"] = target.cuda(non_blocking=True)
self.tensors[-1]["target_length"] = target_sizes.cuda(
non_blocking=True)
else:
# Receive all required tensors from upstream machines.
for input_name in self.receive_ranks: # 遍历本stage对应的接受rank,从前面层获取
if input_name == "ack":
continue
self.tensors[-1][input_name] = \
self.comm_handler.recv(
input_name,
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=False)
self.forward_stats.stats['receive_tensors_size'] += \
(self.tensors[-1][input_name].element_size() *
self.tensors[-1][input_name].nelement())
# Used to track where to receive forward from.
self.comm_handler.increment_messaging_index(
sending=False)
4.3.2 send_tensors_forward
send_tensors_forward предназначен для отправки тензоров последующим слоям при прямом распространении.
def send_tensors_forward(self):
# Send all required tensors downstream.
for output_name in self.send_ranks: # 遍历本stage对应的发送rank,进行发送
if output_name == "ack":
continue
self.comm_handler.send(
output_name,
self.tensors[-1][output_name],
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=False)
self.forward_stats.stats['send_tensors_size'] += \
(self.tensors[-1][output_name].element_size() *
self.tensors[-1][output_name].nelement())
4.3.3 receive_tensors_backward
При обратном распространении градиенты сохраняются в файле self.gradients.
receive_tensors_backward — получить тензоры из предыдущего слоя при обратном распространении.
Обратите внимание, что это соответствует self.send_ranks, который является рангом отправки в прямом процессе, и они принимают ранг в обратном процессе.
def receive_tensors_backward(self):
# Receive all required gradients from downstream
# machines.
for output_name in self.send_ranks: # 遍历本stage对应的发送rank(前向),进行接受
if output_name in self.target_tensor_names:
continue
# 获取梯度
self.gradients[output_name] = \
self.comm_handler.recv(
output_name,
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
self.backward_stats.stats['receive_tensors_size'] += \
(self.gradients[output_name].element_size() *
self.gradients[output_name].nelement())
4.3.4 send_tensors_backward
При обратном распространении градиенты сохраняются в файле self.gradients.
send_tensors_forward предназначен для отправки тензоров градиента последующим слоям в обратном направлении.
Обратите внимание, что это соответствует self.receive_ranks, который является рангом получения в прямом процессе, и рангом отправки в обратном процессе.
def send_tensors_backward(self):
# Send all required gradients upstream.
for input_name in self.receive_ranks: # 遍历本stage对应的接受rank,进行发送
if input_name in self.target_tensor_names:
continue
self.comm_handler.send(
input_name,
self.gradients[input_name],
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
self.backward_stats.stats['send_tensors_size'] += \
(self.gradients[input_name].element_size() *
self.gradients[input_name].nelement())
if self.num_ranks_in_previous_stage > 0:
# Used to track where to send tensors in the
# backward pass.
self.comm_handler.increment_messaging_index(
sending=True)
4.3.5 run_ack
run_ack предназначен для отправки подтверждения на передний и задний уровни во время распространения.
def run_ack(self):
# No need for ack if running on a single worker.
if self.rank is None:
return
# Receive ack from next stage. Send ack to previous stage.
if self.stage < (self.num_stages-1):
self.comm_handler.recv(
"ack",
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
if self.stage > 0:
self.comm_handler.send(
"ack",
torch.zeros(self.tensor_shapes["ack"],
dtype=torch.int64).cuda(),
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
# Used to track where to receive forward from.
self.comm_handler.increment_messaging_index(sending=True)
self.backward_minibatch_id += 1
До сих пор мы представили статическую информацию и инициализацию механизма выполнения, в следующей статье мы представим модуль связи.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
\