0x00 сводка
В предыдущей статье мы представили общую архитектуру PipeDream, этап профиля, этап расчета раздела, этап преобразования модели и механизм выполнения.В этой статье мы представляем коммуникационный модуль PipeDream.Коммуникационный модуль является основой движок, а также то, как используются PyTorch DDP и P2P.Калейдоскоп и прекрасный пример.
Ссылки на другие статьи о конвейерном параллелизме:
[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe(3) -- перерасчет
[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения
0x01 Предисловие
Код коммуникационного модуля находится по адресу: runtime/communication.py. Давайте сначала подумаем, какие функции нужны коммуникационным модулям?
- Связь между этапами (Stages), если этапы обрабатываются на разных машинах? Как с этим бороться на той же машине?
- Поскольку это в основном асинхронная связь, производительность разных узлов может быть разной.Нужен ли вам механизм кэширования для координации разных узлов, аналогичный функции обратного давления?
- Глубокое обучение имеет много параметров, много задействованных тензоров и градиентов, много слоев, и количество параллельных данных в каждом слое также разное, так как же обеспечить, чтобы прямое и обратное распространение выполнялись в определенном порядке?
- Поскольку на узле необходимо выполнять прямое и обратное распространение, необходимо установить несколько потоков для отдельной передачи.
Поэтому мы рассмотрим эти вопросы в следующем анализе.
0x02 определение класса
CommunicationHandler отвечает за связь между этапами.
- Если этапы находятся на разных машинах, используйте PyTorch p2p send/recv.
- Если этапы находятся на одном компьютере, используйте трансляцию PyTorch p2p.
В следующем коде в основном инициализируются различные переменные-члены.Нам больше всего знакомы переменные, связанные с DDP, такие как init_process_group.
class CommunicationHandler(object):
""" Handles communication between stages.
For stages on different machines, use send/recv.
For stages on same machine, use broadcast.
"""
def __init__(self, master_addr, master_port, rank,
local_rank, num_ranks_in_server,
world_size, fp16, backend):
""" Set up process groups.
Note: To turn off broadcasting, set num_ranks_in_server = 1.
"""
self.rank = rank
self.local_rank = local_rank
self.backend = backend
self.num_ranks_in_server = num_ranks_in_server
self.world_size = world_size
self.fp16 = fp16
assert num_ranks_in_server > 0
# Initialize the distributed environment.
# 以下是为了 DDP
os.environ['MASTER_ADDR'] = master_addr
os.environ['MASTER_PORT'] = str(master_port)
dist.init_process_group(backend, rank=rank, world_size=world_size)
assert dist.get_world_size() == self.world_size
# Stores list of ranks of GPUs on the same server.
self.ranks_in_server = []
if num_ranks_in_server == 1:
return
# Stores information about tensors sent directly GPU-to-GPU.
self.connection_list = []
# Stores process groups (for broadcast() connections).
self.process_groups = {}
# Populate ranks_in_server.
rank_of_first_gpu_in_server = rank - rank % num_ranks_in_server
for connected_rank in range(
rank_of_first_gpu_in_server,
rank_of_first_gpu_in_server + num_ranks_in_server):
if connected_rank == rank:
continue
self.ranks_in_server.append(connected_rank)
assert len(self.ranks_in_server) == num_ranks_in_server - 1, \
self.ranks_in_server
сборка 0x03
3.1 Инициализация
Как упоминалось в предыдущей главе, когда создается CommunicationHandler, для инициализации вызывается initialize.
if self.comm_handler is not None:
self.comm_handler.initialize(
self.receive_ranks,
self.send_ranks,
self.tensor_tags,
self.target_tensor_names,
self.training_tensor_dtypes,
self.rank_in_stage,
self.num_ranks_in_stage,
self.ranks_in_previous_stage,
self.ranks_in_next_stage)
В коде инициализации выполните следующие операции, в основном:
- Создайте очередь, необходимую для связи.
- Установите порядок отправки сообщений.
- Создавайте группы процессов.
def initialize(self, receive_ranks, send_ranks,
tensor_tags, target_tensor_names,
training_tensor_dtypes,
rank_in_stage,
num_ranks_in_stage,
ranks_in_previous_stage,
ranks_in_next_stage):
"""
Initialize state needed for CommunicationHandler.
"""
self.receive_ranks = receive_ranks
self.send_ranks = send_ranks
self.tensor_tags = tensor_tags
self.target_tensor_names = target_tensor_names
self.training_tensor_dtypes = training_tensor_dtypes
self.rank_in_stage = rank_in_stage
self.num_ranks_in_stage = num_ranks_in_stage
self.ranks_in_previous_stage = ranks_in_previous_stage
self.num_ranks_in_previous_stage = len(ranks_in_previous_stage)
self.ranks_in_next_stage = ranks_in_next_stage
self.num_ranks_in_next_stage = len(ranks_in_next_stage)
self.setup_queues() # 构建通信需要的queue
self.setup_messaging_schedule() # 构建发送消息的次序
self.create_process_groups() # 构建进程组
Наш конкретный анализ заключается в следующем.
3.2 Создать очередь
Роль очереди состоит в том, чтобы служить основой для отправки и получения.Система находит, какая очередь через индекс, а затем выполняет соответствующие операции.
В функцию инициализации передаются два списка рангов.
- receive_ranks — входной ранг этого узла.
- send_ranks — это выходной ранг этого узла.
Примерный список рангов выглядит следующим образом:
receive_ranks = {dict: 3} # 这里就是每个tensor对应的接收目标rank
'out8' = {list: 1} [2] # out8 是tensor name, {list: 1} [2] 是 out8 对应的 ranks
'out9' = {list: 1} [2] # 就是这几个张量都要从 rank 2 接收
'out10' = {list: 1} [2]
__len__ = {int} 3
setup_queues соответственно создает в общей сложности 4 списка очередей:
- forward_receive_queues : Очереди, которые принимают тензоры во время прямого распространения. Соответствует receive_ranks.
- back_send_queues: очередь для отправки тензоров во время обратного распространения. Соответствует receive_ranks. Потому что объект, полученный при прямом распространении, является целью, отправленной при обратном распространении.
- forward_send_queues : очередь для отправки тензоров во время прямого распространения. Соответствует send_ranks.
- back_receive_queues : Очереди, которые принимают тензоры во время обратного распространения. Соответствует send_ranks. Потому что цель, отправленная в прямом проходе, является объектом, полученным в обратном проходе.
Общая логика следующая:
forward_receive_queues <-----> receive_ranks <-------> backward_send_queues
forward_send_queues <------> send_ranks <-------> backward_receive_queues
Возьмите forward_receive_queues в качестве примера.
- forward_receive_queues Этот список содержит несколько очередей.
- Список Receive_Ranks включает в себя несколько рангов, и каждый ранг соответствует тензору в процессе коммуникации.Можно считать, что Receive_Ranks включает в себя несколько тензоров, соответствующих имени тензора. Имена тензоров, например: target_tensor_names = {"target", "target_length"}.
- В списке forward_receive_queues каждая очередь соответствует тензору в Receive_Ranks.
- Каждый тензор соответствует уникальному тегу.Цель PipeDream — позволить каждому тегу иметь свою собственную группу процессов, потому что любой этап может быть распараллелен.
- Для этого тензора и этого уникального тега зарегистрируйте [tag, rank] в connection_list.
детали следующим образом:
def setup_queues(self):
"""
Setup queues for communication between main compute thread
and helper communication threads. One queue per tensor
in forward / backward direction.
"""
self.forward_receive_queues = {}
self.backward_receive_queues = {}
self.forward_send_queues = {}
self.backward_send_queues = {}
self.num_forward_threads = 0
self.num_backward_threads = 0
self.target_receive_rank_counts = {}
self.target_send_rank_counts = {}
# Setup queues for each tensor to be received and sent.
for input_name in self.receive_ranks: # 遍历张量
# 与 input_name 张量对应的queue,input_name 是张量名字
self.forward_receive_queues[input_name] = []
self.backward_send_queues[input_name] = []
# 遍历该张量对应的每个 ranks
for i in range(len(self.receive_ranks[input_name])):
self.forward_receive_queues[input_name].append(
threadsafe_queue.Queue())
self.backward_send_queues[input_name].append(
threadsafe_queue.Queue())
# 得到 rank
target_receive_rank = self.receive_ranks[input_name][i]
# 针对 rank,注册张量
self.register_tensor(
connected_rank=target_receive_rank,
tag=self.tensor_tags[input_name])
if target_receive_rank not in self.target_receive_rank_counts:
self.target_receive_rank_counts[target_receive_rank] = 0
self.target_receive_rank_counts[target_receive_rank] += 1
self.num_forward_threads += 1
self.num_backward_threads += 1
for output_name in self.send_ranks: # 遍历张量
# 与 output_name 张量对应的queue
self.backward_receive_queues[output_name] = []
self.forward_send_queues[output_name] = []
# 遍历该张量对应的每个 ranks
for i in range(len(self.send_ranks[output_name])):
self.backward_receive_queues[output_name].append(
threadsafe_queue.Queue())
self.forward_send_queues[output_name].append(
threadsafe_queue.Queue())
# 得到 rank
target_send_rank = self.send_ranks[output_name][i]
# 针对 rank,注册张量
self.register_tensor(
connected_rank=target_send_rank,
tag=self.tensor_tags[output_name])
if target_send_rank not in self.target_send_rank_counts:
self.target_send_rank_counts[target_send_rank] = 0
self.target_send_rank_counts[target_send_rank] += 1
self.num_forward_threads += 1
self.num_backward_threads += 1
# 单独处理目标tensor
for target_tensor_name in self.target_tensor_names:
# Queues for target in forward pass.
self.forward_receive_queues[target_tensor_name] = []
self.forward_send_queues[target_tensor_name] = []
if self.num_ranks_in_previous_stage > 0:
self.receive_ranks[target_tensor_name] = self.ranks_in_previous_stage
for i in range(len(self.receive_ranks[target_tensor_name])):
# 针对 rank,注册张量
self.register_tensor(
connected_rank=self.receive_ranks[target_tensor_name][i],
tag=self.tensor_tags[target_tensor_name])
self.forward_receive_queues[target_tensor_name].append(
threadsafe_queue.Queue())
self.num_forward_threads += 1
if self.num_ranks_in_next_stage > 0:
self.send_ranks[target_tensor_name] = self.ranks_in_next_stage
for i in range(len(self.send_ranks[target_tensor_name])):
self.register_tensor(
connected_rank=self.send_ranks[target_tensor_name][i],
tag=self.tensor_tags[target_tensor_name])
self.forward_send_queues[target_tensor_name].append(
threadsafe_queue.Queue())
self.num_forward_threads += 1
print ("Send ranks: ", self.send_ranks)
print ("Receive ranks: ", self.receive_ranks)
# Queues for ack for forward pass-only runs as a clocking mechanism.
# 单独处理 ack 情况
self.num_ack_threads = 0
if "ack" in self.tensor_tags:
self.backward_receive_queues["ack"] = []
self.backward_send_queues["ack"] = []
for i in range(self.num_ranks_in_previous_stage):
# 针对 rank,注册张量
self.register_tensor(
connected_rank=self.ranks_in_previous_stage[i],
tag=self.tensor_tags["ack"])
self.backward_send_queues["ack"].append(
threadsafe_queue.Queue())
self.num_ack_threads += 1
for i in range(self.num_ranks_in_next_stage):
# 针对 rank,注册张量
self.register_tensor(
connected_rank=self.ranks_in_next_stage[i],
tag=self.tensor_tags["ack"])
self.backward_receive_queues["ack"].append(
threadsafe_queue.Queue())
self.num_ack_threads += 1
Обратите внимание, что каждый тензор имеет уникальный тег.Для этого тензора и этого уникального тега зарегистрируйте [tag, rank] в connection_list.
def register_tensor(self, connected_rank, tag):
"""
Builds connections list of tensors that are communicated GPU to GPU.
For tensors that are sent GPU-to-GPU (intra-server for GLOO backend),
make a list of destination/source ranks and the corresponding tag.
This information is then used to crate process groups.
"""
if not self.is_gpu_to_gpu_comm(connected_rank=connected_rank):
return
connection_info = [tag, connected_rank]
self.connection_list.append(connection_info)
Поэтому логика на данный момент такова, в качестве примеров мы берем только некоторые ранги, очереди и т. д. Эти очереди в forward_receive_queues используются как буферы для соответствующих тензоров.
+------------------------+ 'out8' = {list: 1} [2]
| |
| receive_ranks +-----------> 'out9' = {list: 1} [2]
| |
+------------------------+ 'out10' = {list: 1} [2]
+--------------------------+
| | 'out8' : Queue
| forward_receive_queues+-------->
| | 'out9' : Queue
+--------------------------+
'out10' : Queue
+--------------------------+ 'out8' : rank 2
| |
| connection_list +---------> 'out9' : rank 2
| |
+--------------------------+ 'out10' : rank 2
3.3 Последовательность вперед-назад
Затем устанавливается прямой и обратный порядок доставки сообщений, цель которого состоит в том, чтобы позволить каждому рабочему процессу записывать, как обрабатывать ранг, передаваемый прямым уровнем/обратным уровнем.
3.3.1 Последовательность сборки
Метод setup_messaging_schedule используется для создания:
- Порядок принятия во время прямого распространения.
- Порядок отправки при обратном распространении.
Суть здесь в следующем: если номер предыдущего слоя больше номера этого слоя, поставитьi对应的前一层rank
иi + (本层rank数目) * n 对应的前一层rank
добавляются в расписание этого слоя i (self.message_schedule). n равно num_ranks_in_stage.
Наконец, поместите заказ в переменную-член self.messaging_schedule. Если этот этап имеет 3 ранга, self.messaging_schedule представляет собой message_schedule трех рангов соответственно, и каждый message_schedule содержит несколько рангов соответствующего верхнего уровня.
Чтобы уточнить немного больше:
- self.messaging_schedule — это список.
- self.messaging_schedule, где каждый элемент представляет собой список. self.messaging_schedule[ i ] означает, например, расписание (message_schedule), соответствующее i-му рангу этого слоя.
- расписание (message_schedule) — это несколько рангов предыдущего слоя или следующего слоя.
- Ранги, включенные в message_schedule, являются индексом рангов, включенных в этот этап. Поскольку оно используется внутри, оно не обязательно должно быть реальным значением ранга, если его можно сопоставить с другими внутренними структурами данных, такими как внутренние очереди.
код показывает, как показано ниже:
def setup_messaging_schedule(self):
""" Order in which to receive forward and send backwards.
Separate indexes of ranks in previous stage based on their
corresponding offset in this stage. Then each worker will go
in increasing order within a subset, and process subsets in
a decreasing order.
This is done so that messages are processed in the order
that they are sent. Backwards send is done so that that it
matches up with forward receive.
"""
self.messaging_schedule = []
for i in range(self.num_ranks_in_stage): # 本stage的并行数目
idx = i
message_schedule = []
while idx < self.num_ranks_in_previous_stage: # 上一个stage的并行数目
message_schedule.append(idx)
# 如果前一层比本层多,就把 i, i + (本层rank) * n 对应的前一层rank都加入到本层 i 的计划
idx += self.num_ranks_in_stage
if len(message_schedule) > 0:
self.messaging_schedule.append(message_schedule)
self.fwd_messaging_scheduling_row = self.rank_in_stage # 自己的rank index
self.fwd_messaging_scheduling_col = 0 # receive forward
self.bwd_messaging_scheduling_row = self.rank_in_stage # 自己的rank index
self.bwd_messaging_scheduling_col = 0 # send backwards
# For cases where previous stage has less workers than current stage.
while self.fwd_messaging_scheduling_row >= \
len(self.messaging_schedule):
self.fwd_messaging_scheduling_row -= 1
self.bwd_messaging_scheduling_row -= 1
Конкретная логика выглядит следующим образом:
+-------------------+ +--------------------------------------------------+
| Stage 0 | | Stage 1 |
| | | |
| | | |
| | | +----------------------------------------+ |
| | send_ranks | | messaging_schedule | |
| ranks: | | | | |
| +---------------> | | | |
| [0,1,2,3,4,5, | | | message_schedule +---> [0,1,2,9] | |
| 6,7,8,9,10,11,12]| | | | |
| | | | message_schedule +---> [3,4,5,6,10] | |
| | | | | |
| | | | message_schedule +---> [6,7,8,11] | |
| | | | | |
| | | +----------------------------------------+ |
| | | |
+-------------------+ +--------------------------------------------------+
3.3.2 Получить последовательность сообщений
Метод get_messaging_index используется для получения переданного в этот раз объекта, то есть того, с каким рангом следует взаимодействовать.
def get_messaging_index(self, sending):
if sending:
connection_rank = self.messaging_schedule[
self.bwd_messaging_scheduling_row][
self.bwd_messaging_scheduling_col]
else:
connection_rank = self.messaging_schedule[
self.fwd_messaging_scheduling_row][
self.fwd_messaging_scheduling_col]
return connection_rank
Где используется get_messaging_index? Это оказались функции send и recv, которые используются при работе с предыдущим слоем.
Например:
def recv(self, tensor_name, forward_minibatch_id,
backward_minibatch_id, backward=False):
if backward:
index = (backward_minibatch_id + self.rank_in_stage) % \
len(self.backward_receive_queues[tensor_name])
tensor = self.backward_receive_queues[tensor_name][
index].remove()
return tensor
else:
# 这里会使用到,获取与哪一个rank进行交互
index = self.get_messaging_index(sending=False)
# 然后得到使用哪个张量,从queue之中提取对应的最新张量
tensor = self.forward_receive_queues[tensor_name][
index].remove()
if tensor.dtype == torch.float32:
tensor = tensor.requires_grad_()
return tensor
3.3.3 Добавить последовательность сообщений
Метод increment_messaging_index используется для увеличения последовательности сообщений, чтобы определить, какое сообщение следует использовать следующим.
Среди них необходимо пояснить два параметра:
- bwd_messaging_scheduling_col указывает конкретный ранговый индекс восходящего потока.
- bwd_messaging_scheduling_row представляет собственный индекс ранга.
Методы, как показано ниже:
def increment_messaging_index(self, sending):
if sending:
self.bwd_messaging_scheduling_col += 1 # send backwards 对应的下一个 rank
if self.bwd_messaging_scheduling_col == len(
self.messaging_schedule[
self.bwd_messaging_scheduling_row]):
self.bwd_messaging_scheduling_col = 0
self.bwd_messaging_scheduling_row -= 1 # 自己的rank index
if self.bwd_messaging_scheduling_row == -1:
self.bwd_messaging_scheduling_row = \ # 重置回self.messaging_schedule,继续新的一轮本地 rank通讯
len(self.messaging_schedule) - 1
else:
self.fwd_messaging_scheduling_col += 1 # receive forward 对应的下一个 rank
if self.fwd_messaging_scheduling_col == len(
self.messaging_schedule[
self.fwd_messaging_scheduling_row]):
self.fwd_messaging_scheduling_col = 0
self.fwd_messaging_scheduling_row -= 1 # 自己的rank index
if self.fwd_messaging_scheduling_row == -1:
self.fwd_messaging_scheduling_row = \ # 重置回self.messaging_schedule,继续新的一轮本地 rank通讯
len(self.messaging_schedule) - 1
Где он будет использоваться? Он используется в следующих функциях:
def receive_tensors_forward(self):
if self.loader_iter is not None:
# ......
else:
# Receive all required tensors from upstream machines.
# ......
# Used to track where to receive forward from.
self.comm_handler.increment_messaging_index(
sending=False)
def send_tensors_backward(self):
# Send all required gradients upstream.
if self.num_ranks_in_previous_stage > 0:
# Used to track where to send tensors in the
# backward pass.
self.comm_handler.increment_messaging_index(
sending=True)
def run_ack(self):
if self.stage > 0:
self.comm_handler.send(
"ack",
torch.zeros(self.tensor_shapes["ack"],
dtype=torch.int64).cuda(),
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
# Used to track where to receive forward from.
self.comm_handler.increment_messaging_index(sending=True)
3.4 Создание группы процессов
Цель: для каждого тензора настроить две группы процессов, одну для прямого и одну для обратного. У каждого тензора есть свой тег. Каждый тег имеет свои две группы процессов, так как любой этап может быть распараллелен.
3.4.1 Дизайн
Во-первых, давайте посмотрим на аннотации и узнаем, почему он разработан таким образом.
Метод create_process_groups создает группы процессов в одинаковом порядке среди всех рангов. Чтобы установить группы процессов в том же порядке, каждый рабочий процесс собирает connection_list (от GPU к GPU) всех других рабочих процессов. Для этого каждый воркер собирает максимальный размер списка соединений connection_list(L) всех остальных воркеров. Затем каждый рабочий создает тензор размера Lx2, где каждая строка представляет соединение, и заполняет этот тензор в соответствии с «своим размером списка соединений». Рабочий процесс с наибольшим списком соединений заполнит весь тензор.
После построения этого списка выполняется операция all_gather, после чего каждый воркер имеет одинаковый выход NxLx2, где N — количество воркеров (world_size), а каждый индекс вывода представляет связный список воркеров. Для i=self.rank вывод будет таким же, как и локальный список подключений для этого работника.
Каждый рабочий выполняет итерацию по списку соединений в том же порядке, проверяя, было ли создано каждое соединение (каждое соединение будет отображаться дважды в выходных данных), и, если соединение не существует, создайте его как для прямой, так и для обратной новой группы процессов. . Поскольку ранги в группе процессов всегда одни и те же, ранги меньшего размера ранжируются первыми, а ранги большего размера ранжируются последними.
3.4.2 Код
Вернемся к коду, давайте внимательно его проанализируем.
+--------------------------+ 'out8' : rank 2
| |
| connection_list +---------> 'out9' : rank 2
| |
+--------------------------+ 'out10' : rank 2
Поэтому цель состоит в том, чтобы установить одну и ту же группу процессов в каждом рабочем потоке и для каждого тензора настроить две группы процессов, одну прямую и одну обратную.
-
Найдите самый большой connection_list среди воркеров
-
Получить размер connection_list, т.е. connection_list_size
-
Используйте коллективную коммуникацию для агрегирования connection_list_size, а окончательный collect_connection_list_sizes — это connection_list_size, установленный на всех узлах.
-
Получить максимальное значение connection_list
-
Создайте список тензоров connection_list_tensor, используя наибольшее значение
-
Переместите тензоры в GPU
-
Используйте установленную связь для агрегирования connection_list_tensor, чтобы получить агрегированный_connection_list
-
Поверх каждого рабочего используйте dist.new_group для создания той же группы процессов.
-
Пройдите каждое соединение в aggregated_connection_list
- Получить тег, соответствующий тензору
- Для каждого тензора установите две группы процессов, одну вперед и одну назад.
Здесь используется connection_list. Конкретная логика такова:
Конкретный код выглядит следующим образом:
def create_process_groups(self):
""" Create process groups in the same order across all ranks.
To create process groups in the same order, each worker collects
the connection_list of all other workers. To do this, every worker
gathers the largest size of all other worker's connection_lists (L).
Then every worker creates a tensor of size Lx2, where each row
represents a connection, and fills up this tensor depending on how
large its own connection list is. The worker(s) w/ the largest
connection list will fill up the entire tensor.
After constructing this list, an all_gather is performed, after which
each worker has an identical NxLx2 output, where N is the number of
workers (world_size), and each index of output represents a worker's
connection list. For i=self.rank, the output will be identical to the
workers local connection list.
Each worker then iterates in the same order over the connections list,
checking if each connection has been created yet (every connection will
appear twice in the output), and creating a new process group if one
doesn't exist for that connection, for both the forward and backward
direction. Since ranks within process groups must always be identical,
the smaller rank always goes first, followed by the larger rank.
"""
if self.num_ranks_in_server == 1:
return
print("Setting up process groups for broadcasts...")
# Figure out the size of the largest connection list that any worker
# has (L).
# 找到最大的 connection_list
# 获取到 connection_list 的大小,即 connection_list_size
connection_list_size = torch.tensor(
len(self.connection_list), dtype=torch.int)
if self.backend == NCCL:
connection_list_size = connection_list_size.cuda()
gathered_connection_list_sizes = [
torch.ones_like(connection_list_size)
for _ in range(self.world_size)]
# 用集合通信来对 connection_list_size 进行聚合,最后得到的gathered_connection_list_sizes就是所有节点上的 connection_list_size 集合
dist.all_gather(gathered_connection_list_sizes,
connection_list_size)
# 得到最大数值
max_connection_list_size = max(
gathered_connection_list_sizes)
if max_connection_list_size == 0:
return
# 利用最大数值来构建张量列表 connection_list_tensor
# Build tensor to send local connection list to all other workers.
connection_list_tensor = torch.ones([max_connection_list_size, 2],
dtype=torch.int) * -1
# 把张量移动到GPU之上
if self.backend == NCCL:
connection_list_tensor = connection_list_tensor.cuda()
if len(self.connection_list) > 0:
connection_list_tensor[0:len(self.connection_list)] = \
torch.IntTensor(self.connection_list)
# 用集合通信来对 connection_list_tensor进行聚合
# Gather connection lists of all workers.
aggregated_connection_list = [
torch.ones_like(connection_list_tensor)
for _ in range(self.world_size)]
dist.all_gather(aggregated_connection_list,
connection_list_tensor)
# 在每个worker之上,利用 dist.new_group 建立同样的进程组
# Construct identical process groups on each worker.
local_rank_connections = 0
for src_rank in range(len(aggregated_connection_list)):
for connection in aggregated_connection_list[src_rank]:
# 得到张量对应的tag
tag = int(connection[0])
dst_rank = int(connection[1])
if tag == -1:
assert dst_rank == -1
continue
min_rank = min(src_rank, dst_rank)
max_rank = max(src_rank, dst_rank)
assert min_rank != max_rank
if min_rank not in self.process_groups:
self.process_groups[min_rank] = {}
if max_rank not in self.process_groups[min_rank]:
self.process_groups[min_rank][max_rank] = {}
if tag not in self.process_groups[min_rank][max_rank]:
# 用到了pytorch p2p 的api
sub_process_group_fwd = dist.new_group(
ranks=[min_rank, max_rank])
sub_process_group_bwd = dist.new_group(
ranks=[min_rank, max_rank])
# 针对每个张量,设置进程组
self.process_groups[min_rank][max_rank][tag] = {
'forward': sub_process_group_fwd,
'backward': sub_process_group_bwd
}
if min_rank == self.rank or max_rank == self.rank:
local_rank_connections += 1
assert local_rank_connections == len(self.connection_list)
Как использовать группы процессов? Будут использоваться такие функции, как recv_helper_thread_args, например:
def recv_helper_thread_args(self, tensor_name, index, dtype,
backward, num_iterations):
if backward:
src_rank = self.send_ranks[tensor_name][index]
else:
src_rank = self.receive_ranks[tensor_name][index]
sub_process_group = None
# 获取张量 tensor_name 对应的 tag
tag = self.tensor_tags[tensor_name]
if self.is_gpu_to_gpu_comm(connected_rank=src_rank) and tensor_name != "ack":
min_rank = min(self.rank, src_rank)
max_rank = max(self.rank, src_rank)
if src_rank > self.rank:
# 获取 tag 对应的进程组,调用者后续会使用
sub_process_group = \
self.process_groups[min_rank][max_rank][tag]['backward']
else:
# 获取 tag 对应的进程组,调用者后续会使用
sub_process_group = \
self.process_groups[min_rank][max_rank][tag]['forward']
assert sub_process_group
if backward:
queue = self.backward_receive_queues[tensor_name][index]
else:
queue = self.forward_receive_queues[tensor_name][index]
tensor_shape = self.tensor_shapes[tensor_name]
return (queue, self.counter, self.local_rank, tensor_name,
src_rank, tag, tensor_shape, dtype, sub_process_group,
num_iterations)
3.5 Запустите вспомогательный поток
Используйте start_helper_threads для запуска вспомогательных потоков. Эти вспомогательные потоки предназначены для использования P2P.
Во-первых, на примере рангов видно, что ключ — это имя тензора, а значение — список рангов.
receive_ranks = {dict: 3} # 这里就是每个tensor对应的接收目标rank
'out8' = {list: 1} [2]
'out9' = {list: 1} [2]
'out10' = {list: 1} [2]
__len__ = {int} 3
3.5.1 Создание темы
Вспомните 4 очереди, созданные ранее:
- forward_receive_queues : Очереди, которые принимают тензоры во время прямого распространения. Соответствует receive_ranks.
- back_send_queues: очередь для отправки тензоров во время обратного распространения. Соответствует receive_ranks. Потому что объект, полученный при прямом распространении, является целью, отправленной при обратном распространении.
- forward_send_queues : очередь для отправки тензоров во время прямого распространения. Соответствует send_ranks.
- back_receive_queues : Очереди, которые принимают тензоры во время обратного распространения. Соответствует send_ranks. Потому что цель, отправленная в прямом проходе, является объектом, полученным в обратном проходе.
Эти 4 очереди на самом деле соответствуют 4 различным вспомогательным потокам.
Идея такова:
-
Процесс принятия рангов, то есть обход тензоров в receive_ranks
-
Пройдите ранги, соответствующие тензору, для каждого ранга
- Требуется обратная обработка, поэтому устанавливается поток обратной отправки.
- Создайте вспомогательный поток принятия
-
-
Обработка для отправки рангов, то есть обход тензоров в send_ranks
-
Пройдите ранги, соответствующие тензору, для каждого ранга
- Требуется обратная обработка, поэтому устанавливается обратная принимающая нить.
- Создайте вспомогательный поток отправки
-
-
Процесс для цели
-
Если есть только форвард, нужно завершить акк
Конкретный код:
def start_helper_threads(self, num_iterations, forward_only):
"""
Start helper communication threads, one for each queue.
"""
if forward_only:
self.set_counter(self.num_forward_threads +
self.num_ack_threads)
# For validation, receive acks in backward pass from next stage, send
# acks in backward pass to next stage.
self.receive_ranks["ack"] = self.ranks_in_previous_stage
self.send_ranks["ack"] = self.ranks_in_next_stage
else:
self.set_counter(self.num_forward_threads +
self.num_backward_threads)
if "ack" in self.receive_ranks:
del self.receive_ranks["ack"]
if "ack" in self.send_ranks:
del self.send_ranks["ack"]
(num_iterations_for_forward_threads,
num_iterations_for_backward_threads) = \
self.num_iterations_for_helper_threads(
num_iterations=num_iterations)
dtype = torch.float16 if self.fp16 else torch.float32
# Setup queues for each tensor to be received and sent.
# 针对接受rank进行处理
for input_name in self.receive_ranks:
if input_name in self.target_tensor_names or input_name == "ack":
continue
# 遍历张量对应的ranks
for i in range(len(self.receive_ranks[input_name])):
if not forward_only:
# 需要后向处理,所以建立后向发送线程
self.start_helper_thread(
self.send_helper_thread_args,
send_helper_thread,
[input_name, i, True],
num_iterations_for_backward_threads)
# 建立接受助手线程
self.start_helper_thread(
self.recv_helper_thread_args,
recv_helper_thread,
[input_name,
i,
self.training_tensor_dtypes[input_name],
False],
num_iterations_for_backward_threads)
# 针对发送ranks进行处理
for output_name in self.send_ranks:
if output_name in self.target_tensor_names or output_name == "ack":
continue
# 遍历张量对应的ranks
for i in range(len(self.send_ranks[output_name])):
if not forward_only:
# 需要后向处理,所以建立后向接受线程
self.start_helper_thread(
self.recv_helper_thread_args,
recv_helper_thread,
[output_name, i,
self.training_tensor_dtypes[output_name],
True],
num_iterations_for_forward_threads)
# 发送助手线程
self.start_helper_thread(
self.send_helper_thread_args,
send_helper_thread,
[output_name, i, False],
num_iterations_for_forward_threads)
# 针对target进行处理
for target_tensor_name in self.target_tensor_names:
if self.num_ranks_in_previous_stage > 0:
for i in range(len(self.receive_ranks[target_tensor_name])):
self.start_helper_thread(
self.recv_helper_thread_args,
recv_helper_thread,
[target_tensor_name, i, torch.int64,
False],
num_iterations_for_backward_threads)
if self.num_ranks_in_next_stage > 0:
for i in range(len(self.send_ranks[target_tensor_name])):
self.start_helper_thread(
self.send_helper_thread_args,
send_helper_thread,
[target_tensor_name, i, False],
num_iterations_for_forward_threads)
# Start helper threads for ack for forward pass-only run as a clocking
# mechanism.
# 如果只有前向,则需要补齐ack
if forward_only:
# 有前向就补齐 ack
if "ack" in self.receive_ranks:
for i in range(len(self.receive_ranks["ack"])):
self.start_helper_thread(self.send_helper_thread_args,
send_helper_thread,
["ack", i, True],
num_iterations_for_backward_threads)
if "ack" in self.send_ranks:
for i in range(len(self.send_ranks["ack"])):
self.start_helper_thread(self.recv_helper_thread_args,
recv_helper_thread,
["ack", i, torch.int64, True],
num_iterations_for_forward_threads)
Конкретная функция создания потока:
def start_helper_thread(self, args_func, func, args_func_args, num_iterations):
"""
Start passed-in func on a helper thread.
"""
args_func_args += [num_iterations]
args = args_func(*args_func_args) # 需要注意的是使用函数来获取对应的参数
helper_thread = threading.Thread(target=func, # 用线程主函数来执行线程
args=args)
helper_thread.start()
3.5.2 Основная функция потока
recv_helper_thread и send_helper_thread принимают вспомогательный поток и отправляют вспомогательный поток соответственно. Вызовите _recv и _send соответственно, чтобы завершить конкретную работу.
Следует отметить, что функция используется для получения соответствующих параметров. Просто используйте recv_helper_thread_args и send_helper_thread_args для получения параметров.
def recv_helper_thread(queue, counter, local_rank, tensor_name,
src_rank, tag, tensor_shape, dtype,
sub_process_group, num_iterations):
torch.cuda.set_device(local_rank)
# This method is to be executed from a helper daemon thread.
for i in range(num_iterations):
tensor = _recv(
tensor_name, src_rank, tensor_shape=tensor_shape,
dtype=dtype, tag=tag,
sub_process_group=sub_process_group)
queue.add(tensor)
counter.decrement()
def send_helper_thread(queue, counter, local_rank, tensor_name,
src_rank, dst_rank, tag,
sub_process_group, num_iterations):
torch.cuda.set_device(local_rank)
# This method is to be executed from a helper daemon thread.
for i in range(num_iterations):
tensor = queue.remove()
_send(tensor, tensor_name, src_rank, dst_rank,
tag=tag,
sub_process_group=sub_process_group)
counter.decrement()
3.5.3 Параметры сборки
Напомним, что в методе create_process_groups есть следующий код, здесь для каждого тега задается группа процессов, во вспомогательном потоке эти группы процессов используются для завершения логики:
if tag not in self.process_groups[min_rank][max_rank]:
sub_process_group_fwd = dist.new_group(ranks=[min_rank, max_rank])
sub_process_group_bwd = dist.new_group(ranks=[min_rank, max_rank])
self.process_groups[min_rank][max_rank][tag] = {
'forward': sub_process_group_fwd,
'backward': sub_process_group_bwd
}
Используйте следующие функции для завершения сбора параметров основной функции потока. Основная логика такова:
- Используйте имя тензора, чтобы получить соответствующий ранг
- Используйте имя тензора, чтобы получить соответствующий тег
- Используйте тег, чтобы получить соответствующую группу процессов
- Используйте имя тензора и индекс, чтобы получить соответствующую очередь
- возвращаемый параметр
def recv_helper_thread_args(self, tensor_name, index, dtype,
backward, num_iterations):
# 利用张量名字,获取到对应的rank
if backward:
src_rank = self.send_ranks[tensor_name][index]
else:
src_rank = self.receive_ranks[tensor_name][index]
# 利用张量名字,获取到对应的tag
sub_process_group = None
tag = self.tensor_tags[tensor_name]
# 使用tag来获取到对应的进程组
if self.is_gpu_to_gpu_comm(connected_rank=src_rank) and tensor_name != "ack":
min_rank = min(self.rank, src_rank)
max_rank = max(self.rank, src_rank)
if src_rank > self.rank:
sub_process_group = \
self.process_groups[min_rank][max_rank][tag]['backward']
else:
sub_process_group = \
self.process_groups[min_rank][max_rank][tag]['forward']
assert sub_process_group
# 得到对应的queue
if backward:
queue = self.backward_receive_queues[tensor_name][index]
else:
queue = self.forward_receive_queues[tensor_name][index]
tensor_shape = self.tensor_shapes[tensor_name]
# 返回参数
return (queue, self.counter, self.local_rank, tensor_name,
src_rank, tag, tensor_shape, dtype, sub_process_group,
num_iterations)
def send_helper_thread_args(self, tensor_name, index,
backward, num_iterations):
# 利用张量名字得到对应的rank
if backward:
dst_rank = self.receive_ranks[tensor_name][index]
num_ranks_in_connected_stage = self.num_ranks_in_previous_stage
else:
dst_rank = self.send_ranks[tensor_name][index]
num_ranks_in_connected_stage = self.num_ranks_in_next_stage
# 使用tag来获取到对应的进程组
sub_process_group = None
tag = self.tensor_tags[tensor_name]
if self.is_gpu_to_gpu_comm(connected_rank=dst_rank) and tensor_name != "ack":
min_rank = min(self.rank, dst_rank)
max_rank = max(self.rank, dst_rank)
if dst_rank > self.rank:
sub_process_group = \
self.process_groups[min_rank][max_rank][tag]['forward']
else:
sub_process_group = \
self.process_groups[min_rank][max_rank][tag]['backward']
assert sub_process_group
# 得到对应的queue
if backward:
queue = self.backward_send_queues[tensor_name][index]
else:
queue = self.forward_send_queues[tensor_name][index]
# 返回参数
return (queue, self.counter, self.local_rank, tensor_name, self.rank,
dst_rank, tag, sub_process_group, num_iterations)
0x04 Функциональная функция
Следующие функциональные функции — это функции, которые в конечном итоге используются для завершения конвейерной логики RPC.
Вот развязка, выполненная через очередь:
- Recv и send будут работать с очередью, добавляя или извлекая тензоры из очереди.
- Вспомогательный поток вызовет _recv и _send для работы с очередью.
Итак, мы должны сначала посмотреть на реализацию этой Очереди.Вы можете видеть, что threading.Condition используется как для добавления, так и для удаления, что означает, что несколько потоков могут ждать и блокироваться в Очереди через добавление/удаление, то есть производитель и потребитель.
class Queue:
def __init__(self):
self.queue = []
self.cv = threading.Condition()
def add(self, tensor):
self.cv.acquire()
self.queue.append(tensor)
self.cv.notify()
self.cv.release()
def remove(self):
self.cv.acquire()
while len(self.queue) == 0:
self.cv.wait()
tensor = self.queue.pop(0)
self.cv.release()
return tensor
4.1 Логика отправки
Логика отправки следующая:
- Код обучения вызывает StageRuntime.run_backward.
- Метод StageRuntime.run_backward вызывает StageRuntime.send_tensors_backward для отправки тензора tensor_name.
- send_tensors_backward вызовет CommunicationHandler.send, чтобы добавить этот тензор в переменную-член CommunicationHandler back_send_queues[tensor_name][index]. Каждый тензор соответствует нескольким очередям. Вот и развязка.
- Функция отправки вызовет back_send_queues.add, который уведомит send_helper_thread, заблокированный в очереди, для работы.
- В потоке send_helper_thread CommunicationHandler, который ранее был заблокирован в очереди, будет извлечен тензор из reverse_send_queues[tensor_name][index].
- send_helper_thread вызовет _send для отправки тензоров.
- Последний вызов — dist.send, то есть PyTorch P2P.
Подробности следующие:
StageRuntime CommunicationHandler send_helper_thread
+ + +
| | |
| 1 | |
v | |
run_backward | |
| | |
| 2 | |
| | wait on backward_send_queues
v 3 v |
send_tensors_backward +--------> send |
| |
| |
| 4 |
v 5 v
backward_send_queues.add(tensor) +----> tensor = queue.remove()
notify |
|
| 6
v
_send
|
| 7
|
v
dist.send
4.2 Логика принятия
Логика приема следующая:
- run_backward вызывается в обучающем коде StageRuntime.
- run_backward вызывает receive_tensors_backward.
- вызов receive_tensors_backward
self.gradients[output_name] = self.comm_handler.recv
Получите градиент. Функция recv CommunicationHandler будет заблокирована вbackward_receive_queues[tensor_name] [index]
выше. - В то же время поток recv_helper_thread CommunicationHandler вызывает _recv для принятия тензоров с других этапов.
- _recv вызывает dist.recv или dist.broadcast для принятия тензоров.
- _recv добавляет тензор к back_receive_queues[tensor_name][index]. Это информирует блокирующую функцию recv CommunicationHandler о выполнении своей работы.
- Функция recv CommunicationHandler извлечет градиент из reverse_receive_queues[tensor_name] [index] и вернет его в StageRuntime. это возврат 3.
Подробности следующие:
StageRuntime CommunicationHandler recv_helper_thread
+ + +
| | |
| 1 | |
| | | 4
v | v
run_backward | _recv
| | |
| | |
| | | 5
| | |
| 2 | v
| | dist.recv / dist.broadcast
| | |
v 3 v |
receive_tensors_backward +---------> recv |
+ | |
| | |
| | |
| | |
| v |
| backward_receive_queues.remove() |
| | |
| | |
| | |
| | |
| wait on backward_receive_queues |
| | |
| | |
| | |
| | 6 v
| backward_receive_queues <-------+ queue.add(tensor)
| | notify
| | 7
v 3 return |
gradients[output_name] <---------------+
4.3 recv
Это фактически для получения соответствующего тензора из соответствующей очереди по имени тензора.
def recv(self, tensor_name, forward_minibatch_id,
backward_minibatch_id, backward=False):
if backward:
index = (backward_minibatch_id + self.rank_in_stage) % \
len(self.backward_receive_queues[tensor_name])
tensor = self.backward_receive_queues[tensor_name][
index].remove()
return tensor
else:
# 前向时候,需要知道从前一层的哪一个index获取
index = self.get_messaging_index(sending=False)
tensor = self.forward_receive_queues[tensor_name][
index].remove()
if tensor.dtype == torch.float32:
tensor = tensor.requires_grad_()
return tensor
В функциях receive_tensors_forward и receive_tensors_backward во время выполнения будет вызываться функция recv для получения сохраненных тензоров из соответствующей очереди. Например:
def receive_tensors_backward(self):
# Receive all required gradients from downstream
# machines.
for output_name in self.send_ranks:
if output_name in self.target_tensor_names:
continue
self.gradients[output_name] = \
self.comm_handler.recv( # 这里使用了
output_name,
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
self.backward_stats.stats['receive_tensors_size'] += \
(self.gradients[output_name].element_size() *
self.gradients[output_name].nelement())
4.4 send
Здесь нужно поместить тензор в соответствующую очередь.
def send(self, tensor_name, tensor, forward_minibatch_id,
backward_minibatch_id, backward=False):
if backward:
# 后向时候,需要知道发送给前一层的哪一个index
index = self.get_messaging_index(sending=True)
dst_rank = self.receive_ranks[tensor_name][index]
self.backward_send_queues[tensor_name][index].add(tensor)
else:
index = (forward_minibatch_id + self.rank_in_stage) % \
len(self.send_ranks[tensor_name])
self.forward_send_queues[tensor_name][index].add(tensor)
send_tensors_backward, send_tensors_forward будут использоваться, например:
def send_tensors_backward(self):
# Send all required gradients upstream.
for input_name in self.receive_ranks:
if input_name in self.target_tensor_names:
continue
self.comm_handler.send(
input_name,
self.gradients[input_name],
forward_minibatch_id=self.forward_minibatch_id,
backward_minibatch_id=self.backward_minibatch_id,
backward=True)
self.backward_stats.stats['send_tensors_size'] += \
(self.gradients[input_name].element_size() *
self.gradients[input_name].nelement())
if self.num_ranks_in_previous_stage > 0:
# Used to track where to send tensors in the
# backward pass.
self.comm_handler.increment_messaging_index(
sending=True)
4.5 _recv
В параметре _recv sub_process_group создается в приведенном выше коде.
Если на том же узле, используйте dist.broadcast, в противном случае используйте dist.recv.
def _recv(tensor_name, src_rank, tensor_shape=None, dtype=torch.float32,
tensor=None, tag=None, sub_process_group=None):
"""
Receives tensor by calling PyTorch's recv() call.
Tensor will be copied to GPU prior to return.
"""
assert tag is not None
if tensor is None:
assert tensor_shape is not None
assert dtype is not None
assert dtype != torch.float16
if sub_process_group is not None:
# Receive tensor shape.
received_tensor_shape = torch.zeros(len(tensor_shape),
dtype=torch.int)
dist.broadcast(tensor=received_tensor_shape,
src=src_rank,
group=sub_process_group)
received_tensor_shape = list(map(lambda x: int(x),
received_tensor_shape))
# Receive tensor.
tensor = torch.zeros(received_tensor_shape, dtype=dtype).cuda()
dist.broadcast(tensor=tensor,
src=src_rank,
group=sub_process_group)
else:
# Receive tensor shape.
received_tensor_shape = torch.zeros(len(tensor_shape),
dtype=torch.int)
dist.recv(tensor=received_tensor_shape,
src=src_rank,
tag=tag)
received_tensor_shape = list(map(lambda x: int(x),
received_tensor_shape))
# Receive tensor.
tensor = torch.zeros(received_tensor_shape, dtype=dtype)
dist.recv(tensor=tensor,
src=src_rank,
tag=tag)
tensor = tensor.cuda()
assert tensor.is_cuda
return tensor
_recv вызывается в recv_helper_thread.
def recv_helper_thread(queue, counter, local_rank, tensor_name,
src_rank, tag, tensor_shape, dtype,
sub_process_group, num_iterations):
torch.cuda.set_device(local_rank)
# This method is to be executed from a helper daemon thread.
for i in range(num_iterations):
tensor = _recv(
tensor_name, src_rank, tensor_shape=tensor_shape,
dtype=dtype, tag=tag,
sub_process_group=sub_process_group)
queue.add(tensor) # 获取到张量之后,放入queue
counter.decrement()
4.6 _send
Если на том же узле, используйте dist.broadcast, в противном случае используйте dist.send.
def _send(tensor, tensor_name, src_rank, dst_rank, tag, sub_process_group=None):
"""
Sends tensor by calling PyTorch's send() call.
If tensor is being sent not via broadcast(), it will
be first copied to the CPU.
"""
if sub_process_group is not None:
assert tensor.is_cuda
# Send tensor shape.
tensor_shape = torch.tensor(tensor.shape, dtype=torch.int)
dist.broadcast(tensor=tensor_shape, src=src_rank,
group=sub_process_group)
# Send tensor.
contiguous_tensor = tensor.detach().clone()
dist.broadcast(tensor=contiguous_tensor.contiguous(),
src=src_rank,
group=sub_process_group)
else:
assert tensor.is_cuda
tensor = tensor.cpu()
# Send tensor shape.
tensor_shape = torch.tensor(tensor.shape, dtype=torch.int)
dist.send(tensor=tensor_shape, dst=dst_rank, tag=tag)
# Send tensor.
dist.send(tensor=tensor, dst=dst_rank, tag=tag)
recv_helper_thread использует _send для получения тензоров.
def send_helper_thread(queue, counter, local_rank, tensor_name,
src_rank, dst_rank, tag,
sub_process_group, num_iterations):
torch.cuda.set_device(local_rank)
# This method is to be executed from a helper daemon thread.
for i in range(num_iterations):
tensor = queue.remove()
# 从queue提取张量,发送出去。
_send(tensor, tensor_name, src_rank, dst_rank,
tag=tag,
sub_process_group=sub_process_group)
counter.decrement()
На данный момент модуль связи проанализирован, и в следующей статье мы, наконец, представим 1F1B.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси