[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (5) --- коммуникационный модуль

машинное обучение глубокое обучение

0x00 сводка

В предыдущей статье мы представили общую архитектуру PipeDream, этап профиля, этап расчета раздела, этап преобразования модели и механизм выполнения.В этой статье мы представляем коммуникационный модуль PipeDream.Коммуникационный модуль является основой движок, а также то, как используются PyTorch DDP и P2P.Калейдоскоп и прекрасный пример.

Ссылки на другие статьи о конвейерном параллелизме:

[Анализ исходного кода] Конвейер глубокого обучения, параллельный Gpipe(1) --- Базовая реализация конвейера

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe (2) ----- накопление градиента

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe(3) -- перерасчет

[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(2) --- Вычислительный раздел

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream (3) --- модель преобразования

[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(4) --- механизм выполнения

0x01 Предисловие

Код коммуникационного модуля находится по адресу: runtime/communication.py. Давайте сначала подумаем, какие функции нужны коммуникационным модулям?

  • Связь между этапами (Stages), если этапы обрабатываются на разных машинах? Как с этим бороться на той же машине?
  • Поскольку это в основном асинхронная связь, производительность разных узлов может быть разной.Нужен ли вам механизм кэширования для координации разных узлов, аналогичный функции обратного давления?
  • Глубокое обучение имеет много параметров, много задействованных тензоров и градиентов, много слоев, и количество параллельных данных в каждом слое также разное, так как же обеспечить, чтобы прямое и обратное распространение выполнялись в определенном порядке?
  • Поскольку на узле необходимо выполнять прямое и обратное распространение, необходимо установить несколько потоков для отдельной передачи.

Поэтому мы рассмотрим эти вопросы в следующем анализе.

0x02 определение класса

CommunicationHandler отвечает за связь между этапами.

  • Если этапы находятся на разных машинах, используйте PyTorch p2p send/recv.
  • Если этапы находятся на одном компьютере, используйте трансляцию PyTorch p2p.

В следующем коде в основном инициализируются различные переменные-члены.Нам больше всего знакомы переменные, связанные с DDP, такие как init_process_group.

class CommunicationHandler(object):
    """ Handles communication between stages.
​
    For stages on different machines, use send/recv.
    For stages on same machine, use broadcast.
    """
    def __init__(self, master_addr, master_port, rank,
                 local_rank, num_ranks_in_server,
                 world_size, fp16, backend):
        """ Set up process groups.
​
        Note: To turn off broadcasting, set num_ranks_in_server = 1.
        """
        self.rank = rank
        self.local_rank = local_rank
        self.backend = backend
        self.num_ranks_in_server = num_ranks_in_server
        self.world_size = world_size
        self.fp16 = fp16
        assert num_ranks_in_server > 0
​
        # Initialize the distributed environment.
        # 以下是为了 DDP
        os.environ['MASTER_ADDR'] = master_addr
        os.environ['MASTER_PORT'] = str(master_port)
        dist.init_process_group(backend, rank=rank, world_size=world_size)
        assert dist.get_world_size() == self.world_size
​
        # Stores list of ranks of GPUs on the same server.
        self.ranks_in_server = []
​
        if num_ranks_in_server == 1:
            return
​
        # Stores information about tensors sent directly GPU-to-GPU.
        self.connection_list = []
​
        # Stores process groups (for broadcast() connections).
        self.process_groups = {}
​
        # Populate ranks_in_server.
        rank_of_first_gpu_in_server = rank - rank % num_ranks_in_server
        for connected_rank in range(
            rank_of_first_gpu_in_server,
            rank_of_first_gpu_in_server + num_ranks_in_server):
            if connected_rank == rank:
                continue
            self.ranks_in_server.append(connected_rank)
        assert len(self.ranks_in_server) == num_ranks_in_server - 1, \
            self.ranks_in_server
​

сборка 0x03

3.1 Инициализация

Как упоминалось в предыдущей главе, когда создается CommunicationHandler, для инициализации вызывается initialize.

        if self.comm_handler is not None:
            self.comm_handler.initialize(
                self.receive_ranks,
                self.send_ranks,
                self.tensor_tags,
                self.target_tensor_names,
                self.training_tensor_dtypes,
                self.rank_in_stage,
                self.num_ranks_in_stage,
                self.ranks_in_previous_stage,
                self.ranks_in_next_stage)
​

В коде инициализации выполните следующие операции, в основном:

  • Создайте очередь, необходимую для связи.
  • Установите порядок отправки сообщений.
  • Создавайте группы процессов.
    def initialize(self, receive_ranks, send_ranks,
                   tensor_tags, target_tensor_names,
                   training_tensor_dtypes,
                   rank_in_stage,
                   num_ranks_in_stage,
                   ranks_in_previous_stage,
                   ranks_in_next_stage):
        """
        Initialize state needed for CommunicationHandler.
        """
        self.receive_ranks = receive_ranks
        self.send_ranks = send_ranks
        self.tensor_tags = tensor_tags
        self.target_tensor_names = target_tensor_names
        self.training_tensor_dtypes = training_tensor_dtypes
        self.rank_in_stage = rank_in_stage
        self.num_ranks_in_stage = num_ranks_in_stage
        self.ranks_in_previous_stage = ranks_in_previous_stage
        self.num_ranks_in_previous_stage = len(ranks_in_previous_stage)
        self.ranks_in_next_stage = ranks_in_next_stage
        self.num_ranks_in_next_stage = len(ranks_in_next_stage)
​
        self.setup_queues() # 构建通信需要的queue
        self.setup_messaging_schedule() # 构建发送消息的次序
        self.create_process_groups() # 构建进程组

Наш конкретный анализ заключается в следующем.

3.2 Создать очередь

Роль очереди состоит в том, чтобы служить основой для отправки и получения.Система находит, какая очередь через индекс, а затем выполняет соответствующие операции.

В функцию инициализации передаются два списка рангов.

  • receive_ranks — входной ранг этого узла.
  • send_ranks — это выходной ранг этого узла.

Примерный список рангов выглядит следующим образом:

receive_ranks = {dict: 3}  # 这里就是每个tensor对应的接收目标rank
 'out8' = {list: 1} [2] # out8 是tensor name, {list: 1} [2] 是 out8 对应的 ranks
 'out9' = {list: 1} [2] # 就是这几个张量都要从 rank 2 接收
 'out10' = {list: 1} [2]
 __len__ = {int} 3

setup_queues соответственно создает в общей сложности 4 списка очередей:

  • forward_receive_queues : Очереди, которые принимают тензоры во время прямого распространения. Соответствует receive_ranks.
  • back_send_queues: очередь для отправки тензоров во время обратного распространения. Соответствует receive_ranks. Потому что объект, полученный при прямом распространении, является целью, отправленной при обратном распространении.
  • forward_send_queues : очередь для отправки тензоров во время прямого распространения. Соответствует send_ranks.
  • back_receive_queues : Очереди, которые принимают тензоры во время обратного распространения. Соответствует send_ranks. Потому что цель, отправленная в прямом проходе, является объектом, полученным в обратном проходе.

Общая логика следующая:

forward_receive_queues <-----> receive_ranks <------->  backward_send_queues
forward_send_queues  <------>  send_ranks    <------->  backward_receive_queues

Возьмите forward_receive_queues в качестве примера.

  • forward_receive_queues Этот список содержит несколько очередей.
  • Список Receive_Ranks включает в себя несколько рангов, и каждый ранг соответствует тензору в процессе коммуникации.Можно считать, что Receive_Ranks включает в себя несколько тензоров, соответствующих имени тензора. Имена тензоров, например: target_tensor_names = {"target", "target_length"}.
  • В списке forward_receive_queues каждая очередь соответствует тензору в Receive_Ranks.
  • Каждый тензор соответствует уникальному тегу.Цель PipeDream — позволить каждому тегу иметь свою собственную группу процессов, потому что любой этап может быть распараллелен.
  • Для этого тензора и этого уникального тега зарегистрируйте [tag, rank] в connection_list.

детали следующим образом:

    def setup_queues(self):
        """
        Setup queues for communication between main compute thread
        and helper communication threads. One queue per tensor
        in forward / backward direction.
        """
        self.forward_receive_queues = {}
        self.backward_receive_queues = {}
        self.forward_send_queues = {}
        self.backward_send_queues = {}
        self.num_forward_threads = 0
        self.num_backward_threads = 0
​
        self.target_receive_rank_counts = {}
        self.target_send_rank_counts = {}
        # Setup queues for each tensor to be received and sent.
        for input_name in self.receive_ranks: # 遍历张量
            # 与 input_name 张量对应的queue,input_name 是张量名字
            self.forward_receive_queues[input_name] = []
            self.backward_send_queues[input_name] = []
            # 遍历该张量对应的每个 ranks
            for i in range(len(self.receive_ranks[input_name])):
                self.forward_receive_queues[input_name].append(
                    threadsafe_queue.Queue())
                self.backward_send_queues[input_name].append(
                    threadsafe_queue.Queue())
                # 得到 rank
                target_receive_rank = self.receive_ranks[input_name][i]
                # 针对 rank,注册张量
                self.register_tensor(
                    connected_rank=target_receive_rank,
                    tag=self.tensor_tags[input_name])
                if target_receive_rank not in self.target_receive_rank_counts:
                    self.target_receive_rank_counts[target_receive_rank] = 0
                self.target_receive_rank_counts[target_receive_rank] += 1
                self.num_forward_threads += 1
                self.num_backward_threads += 1
                
        for output_name in self.send_ranks: # 遍历张量
            # 与 output_name 张量对应的queue
            self.backward_receive_queues[output_name] = []
            self.forward_send_queues[output_name] = []
            # 遍历该张量对应的每个 ranks
            for i in range(len(self.send_ranks[output_name])):
                self.backward_receive_queues[output_name].append(
                    threadsafe_queue.Queue())
                self.forward_send_queues[output_name].append(
                    threadsafe_queue.Queue())
                # 得到 rank
                target_send_rank = self.send_ranks[output_name][i]
                # 针对 rank,注册张量
                self.register_tensor(
                    connected_rank=target_send_rank,
                    tag=self.tensor_tags[output_name])
                if target_send_rank not in self.target_send_rank_counts:
                    self.target_send_rank_counts[target_send_rank] = 0
                self.target_send_rank_counts[target_send_rank] += 1
                self.num_forward_threads += 1
                self.num_backward_threads += 1
​
        # 单独处理目标tensor
        for target_tensor_name in self.target_tensor_names:
            # Queues for target in forward pass.
            self.forward_receive_queues[target_tensor_name] = []
            self.forward_send_queues[target_tensor_name] = []
​
            if self.num_ranks_in_previous_stage > 0:
                self.receive_ranks[target_tensor_name] = self.ranks_in_previous_stage
                for i in range(len(self.receive_ranks[target_tensor_name])):
                    # 针对 rank,注册张量
                    self.register_tensor(
                        connected_rank=self.receive_ranks[target_tensor_name][i],
                        tag=self.tensor_tags[target_tensor_name])
                    self.forward_receive_queues[target_tensor_name].append(
                        threadsafe_queue.Queue())
                    self.num_forward_threads += 1
​
            if self.num_ranks_in_next_stage > 0:
                self.send_ranks[target_tensor_name] = self.ranks_in_next_stage
                for i in range(len(self.send_ranks[target_tensor_name])):
                    self.register_tensor(
                        connected_rank=self.send_ranks[target_tensor_name][i],
                        tag=self.tensor_tags[target_tensor_name])
                    self.forward_send_queues[target_tensor_name].append(
                        threadsafe_queue.Queue())
                    self.num_forward_threads += 1
​
        print ("Send ranks: ", self.send_ranks)
        print ("Receive ranks: ", self.receive_ranks)
​
        # Queues for ack for forward pass-only runs as a clocking mechanism.
        # 单独处理 ack 情况
        self.num_ack_threads = 0
        if "ack" in self.tensor_tags:
            self.backward_receive_queues["ack"] = []
            self.backward_send_queues["ack"] = []
            for i in range(self.num_ranks_in_previous_stage):
                # 针对 rank,注册张量
                self.register_tensor(
                    connected_rank=self.ranks_in_previous_stage[i],
                    tag=self.tensor_tags["ack"])
                self.backward_send_queues["ack"].append(
                    threadsafe_queue.Queue())
                self.num_ack_threads += 1
            for i in range(self.num_ranks_in_next_stage):
                # 针对 rank,注册张量
                self.register_tensor(
                    connected_rank=self.ranks_in_next_stage[i],
                    tag=self.tensor_tags["ack"])
                self.backward_receive_queues["ack"].append(
                    threadsafe_queue.Queue())
                self.num_ack_threads += 1

Обратите внимание, что каждый тензор имеет уникальный тег.Для этого тензора и этого уникального тега зарегистрируйте [tag, rank] в connection_list.

    def register_tensor(self, connected_rank, tag):
        """
        Builds connections list of tensors that are communicated GPU to GPU.
​
        For tensors that are sent GPU-to-GPU (intra-server for GLOO backend),
        make a list of destination/source ranks and the corresponding tag.
        This information is then used to crate process groups.
        """
        if not self.is_gpu_to_gpu_comm(connected_rank=connected_rank):
            return
        connection_info = [tag, connected_rank]
        self.connection_list.append(connection_info)
​

Поэтому логика на данный момент такова, в качестве примеров мы берем только некоторые ранги, очереди и т. д. Эти очереди в forward_receive_queues используются как буферы для соответствующих тензоров.

+------------------------+         'out8' = {list: 1} [2]
|                        |
|     receive_ranks +----------->  'out9' = {list: 1} [2]
|                        |
+------------------------+         'out10' = {list: 1} [2]
​
​
​
+--------------------------+
|                          |         'out8' : Queue
| forward_receive_queues+-------->
|                          |         'out9' : Queue
+--------------------------+
                                     'out10' : Queue
​
​
​
​
+--------------------------+       'out8' : rank 2
|                          |
|    connection_list  +--------->  'out9' : rank 2
|                          |
+--------------------------+       'out10' : rank 2
​

3.3 Последовательность вперед-назад

Затем устанавливается прямой и обратный порядок доставки сообщений, цель которого состоит в том, чтобы позволить каждому рабочему процессу записывать, как обрабатывать ранг, передаваемый прямым уровнем/обратным уровнем.

3.3.1 Последовательность сборки

Метод setup_messaging_schedule используется для создания:

  • Порядок принятия во время прямого распространения.
  • Порядок отправки при обратном распространении.

Суть здесь в следующем: если номер предыдущего слоя больше номера этого слоя, поставитьi对应的前一层rankиi + (本层rank数目) * n 对应的前一层rankдобавляются в расписание этого слоя i (self.message_schedule). n равно num_ranks_in_stage.

Наконец, поместите заказ в переменную-член self.messaging_schedule. Если этот этап имеет 3 ранга, self.messaging_schedule представляет собой message_schedule трех рангов соответственно, и каждый message_schedule содержит несколько рангов соответствующего верхнего уровня.

Чтобы уточнить немного больше:

  • self.messaging_schedule — это список.
  • self.messaging_schedule, где каждый элемент представляет собой список. self.messaging_schedule[ i ] означает, например, расписание (message_schedule), соответствующее i-му рангу этого слоя.
  • расписание (message_schedule) — это несколько рангов предыдущего слоя или следующего слоя.
  • Ранги, включенные в message_schedule, являются индексом рангов, включенных в этот этап. Поскольку оно используется внутри, оно не обязательно должно быть реальным значением ранга, если его можно сопоставить с другими внутренними структурами данных, такими как внутренние очереди.

код показывает, как показано ниже:

    def setup_messaging_schedule(self):
        """ Order in which to receive forward and send backwards.
​
        Separate indexes of ranks in previous stage based on their
        corresponding offset in this stage. Then each worker will go
        in increasing order within a subset, and process subsets in
        a decreasing order.
​
        This is done so that messages are processed in the order
        that they are sent. Backwards send is done so that that it
        matches up with forward receive.
        """
        self.messaging_schedule = []
        for i in range(self.num_ranks_in_stage): # 本stage的并行数目
            idx = i
            message_schedule = []
            while idx < self.num_ranks_in_previous_stage: # 上一个stage的并行数目
                message_schedule.append(idx)
                # 如果前一层比本层多,就把 i, i + (本层rank) * n 对应的前一层rank都加入到本层 i 的计划
                idx += self.num_ranks_in_stage
            if len(message_schedule) > 0:
                self.messaging_schedule.append(message_schedule)
​
        self.fwd_messaging_scheduling_row = self.rank_in_stage # 自己的rank index
        self.fwd_messaging_scheduling_col = 0 # receive forward
        self.bwd_messaging_scheduling_row = self.rank_in_stage # 自己的rank index
        self.bwd_messaging_scheduling_col = 0 # send backwards
​
        # For cases where previous stage has less workers than current stage.
        while self.fwd_messaging_scheduling_row >= \
            len(self.messaging_schedule):
            self.fwd_messaging_scheduling_row -= 1
            self.bwd_messaging_scheduling_row -= 1

Конкретная логика выглядит следующим образом:

+-------------------+                 +--------------------------------------------------+
| Stage 0           |                 | Stage 1                                          |
|                   |                 |                                                  |
|                   |                 |                                                  |
|                   |                 |     +----------------------------------------+   |
|                   |   send_ranks    |     | messaging_schedule                     |   |
|  ranks:           |                 |     |                                        |   |
|                   +---------------> |     |                                        |   |
|  [0,1,2,3,4,5,    |                 |     |   message_schedule +---> [0,1,2,9]     |   |
|  6,7,8,9,10,11,12]|                 |     |                                        |   |
|                   |                 |     |   message_schedule +---> [3,4,5,6,10]  |   |
|                   |                 |     |                                        |   |
|                   |                 |     |   message_schedule +---> [6,7,8,11]    |   |
|                   |                 |     |                                        |   |
|                   |                 |     +----------------------------------------+   |
|                   |                 |                                                  |
+-------------------+                 +--------------------------------------------------+
​

3.3.2 Получить последовательность сообщений

Метод get_messaging_index используется для получения переданного в этот раз объекта, то есть того, с каким рангом следует взаимодействовать.

    def get_messaging_index(self, sending):
        if sending:
            connection_rank = self.messaging_schedule[
                self.bwd_messaging_scheduling_row][
                    self.bwd_messaging_scheduling_col]
        else:
            connection_rank = self.messaging_schedule[
                self.fwd_messaging_scheduling_row][
                    self.fwd_messaging_scheduling_col]
​
        return connection_rank
​

Где используется get_messaging_index? Это оказались функции send и recv, которые используются при работе с предыдущим слоем.

Например:

    def recv(self, tensor_name, forward_minibatch_id,
             backward_minibatch_id, backward=False):
        if backward:
            index = (backward_minibatch_id + self.rank_in_stage) % \
                len(self.backward_receive_queues[tensor_name])
            tensor = self.backward_receive_queues[tensor_name][
                index].remove()
            return tensor
        else:
            # 这里会使用到,获取与哪一个rank进行交互
            index = self.get_messaging_index(sending=False)
            # 然后得到使用哪个张量,从queue之中提取对应的最新张量
            tensor = self.forward_receive_queues[tensor_name][
                index].remove()
            if tensor.dtype == torch.float32:
                tensor = tensor.requires_grad_()
            return tensor

3.3.3 Добавить последовательность сообщений

Метод increment_messaging_index используется для увеличения последовательности сообщений, чтобы определить, какое сообщение следует использовать следующим.

Среди них необходимо пояснить два параметра:

  • bwd_messaging_scheduling_col указывает конкретный ранговый индекс восходящего потока.
  • bwd_messaging_scheduling_row представляет собственный индекс ранга.

Методы, как показано ниже:

    def increment_messaging_index(self, sending):
        if sending:
            self.bwd_messaging_scheduling_col += 1 # send backwards 对应的下一个 rank
            if self.bwd_messaging_scheduling_col == len(
                    self.messaging_schedule[
                        self.bwd_messaging_scheduling_row]):
                self.bwd_messaging_scheduling_col = 0
                self.bwd_messaging_scheduling_row -= 1 # 自己的rank index
                if self.bwd_messaging_scheduling_row == -1:
                    self.bwd_messaging_scheduling_row = \ # 重置回self.messaging_schedule,继续新的一轮本地 rank通讯
                        len(self.messaging_schedule) - 1
        else:
            self.fwd_messaging_scheduling_col += 1 # receive forward 对应的下一个 rank
            if self.fwd_messaging_scheduling_col == len(
                    self.messaging_schedule[
                        self.fwd_messaging_scheduling_row]): 
                self.fwd_messaging_scheduling_col = 0
                self.fwd_messaging_scheduling_row -= 1 # 自己的rank index
                if self.fwd_messaging_scheduling_row == -1:
                    self.fwd_messaging_scheduling_row = \ # 重置回self.messaging_schedule,继续新的一轮本地 rank通讯
                        len(self.messaging_schedule) - 1
​

Где он будет использоваться? Он используется в следующих функциях:

    def receive_tensors_forward(self):
        if self.loader_iter is not None:
      # ......
        else:
            # Receive all required tensors from upstream machines.
      # ......
            # Used to track where to receive forward from.
            self.comm_handler.increment_messaging_index(
                sending=False)
​
    def send_tensors_backward(self):
        # Send all required gradients upstream.
​
        if self.num_ranks_in_previous_stage > 0:
            # Used to track where to send tensors in the
            # backward pass.
            self.comm_handler.increment_messaging_index(
                sending=True)    
            
    def run_ack(self):
        if self.stage > 0:
            self.comm_handler.send(
                "ack",
                torch.zeros(self.tensor_shapes["ack"],
                            dtype=torch.int64).cuda(),
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)
​
            # Used to track where to receive forward from.
            self.comm_handler.increment_messaging_index(sending=True)        

3.4 Создание группы процессов

Цель: для каждого тензора настроить две группы процессов, одну для прямого и одну для обратного. У каждого тензора есть свой тег. Каждый тег имеет свои две группы процессов, так как любой этап может быть распараллелен.

3.4.1 Дизайн

Во-первых, давайте посмотрим на аннотации и узнаем, почему он разработан таким образом.

Метод create_process_groups создает группы процессов в одинаковом порядке среди всех рангов. Чтобы установить группы процессов в том же порядке, каждый рабочий процесс собирает connection_list (от GPU к GPU) всех других рабочих процессов. Для этого каждый воркер собирает максимальный размер списка соединений connection_list(L) всех остальных воркеров. Затем каждый рабочий создает тензор размера Lx2, где каждая строка представляет соединение, и заполняет этот тензор в соответствии с «своим размером списка соединений». Рабочий процесс с наибольшим списком соединений заполнит весь тензор.

После построения этого списка выполняется операция all_gather, после чего каждый воркер имеет одинаковый выход NxLx2, где N — количество воркеров (world_size), а каждый индекс вывода представляет связный список воркеров. Для i=self.rank вывод будет таким же, как и локальный список подключений для этого работника.

Каждый рабочий выполняет итерацию по списку соединений в том же порядке, проверяя, было ли создано каждое соединение (каждое соединение будет отображаться дважды в выходных данных), и, если соединение не существует, создайте его как для прямой, так и для обратной новой группы процессов. . Поскольку ранги в группе процессов всегда одни и те же, ранги меньшего размера ранжируются первыми, а ранги большего размера ранжируются последними.

3.4.2 Код

Вернемся к коду, давайте внимательно его проанализируем.

+--------------------------+       'out8' : rank 2
|                          |
|    connection_list  +--------->  'out9' : rank 2
|                          |
+--------------------------+       'out10' : rank 2

Поэтому цель состоит в том, чтобы установить одну и ту же группу процессов в каждом рабочем потоке и для каждого тензора настроить две группы процессов, одну прямую и одну обратную.

  • Найдите самый большой connection_list среди воркеров

  • Получить размер connection_list, т.е. connection_list_size

  • Используйте коллективную коммуникацию для агрегирования connection_list_size, а окончательный collect_connection_list_sizes — это connection_list_size, установленный на всех узлах.

  • Получить максимальное значение connection_list

  • Создайте список тензоров connection_list_tensor, используя наибольшее значение

  • Переместите тензоры в GPU

  • Используйте установленную связь для агрегирования connection_list_tensor, чтобы получить агрегированный_connection_list

  • Поверх каждого рабочего используйте dist.new_group для создания той же группы процессов.

  • Пройдите каждое соединение в aggregated_connection_list

    • Получить тег, соответствующий тензору
    • Для каждого тензора установите две группы процессов, одну вперед и одну назад.

Здесь используется connection_list. Конкретная логика такова:

Конкретный код выглядит следующим образом:

    def create_process_groups(self):
        """ Create process groups in the same order across all ranks.
​
        To create process groups in the same order, each worker collects
        the connection_list of all other workers. To do this, every worker
        gathers the largest size of all other worker's connection_lists (L).
        Then every worker creates a tensor of size Lx2, where each row
        represents a connection, and fills up this tensor depending on how
        large its own connection list is. The worker(s) w/ the largest
        connection list will fill up the entire tensor.
​
        After constructing this list, an all_gather is performed, after which
        each worker has an identical NxLx2 output, where N is the number of
        workers (world_size), and each index of output represents a worker's
        connection list. For i=self.rank, the output will be identical to the
        workers local connection list.
​
        Each worker then iterates in the same order over the connections list,
        checking if each connection has been created yet (every connection will
        appear twice in the output), and creating a new process group if one
        doesn't exist for that connection, for both the forward and backward
        direction. Since ranks within process groups must always be identical,
        the smaller rank always goes first, followed by the larger rank.
        """
        if self.num_ranks_in_server == 1:
            return
​
        print("Setting up process groups for broadcasts...")
​
        # Figure out the size of the largest connection list that any worker
        # has (L).
        # 找到最大的 connection_list
        # 获取到 connection_list 的大小,即 connection_list_size
        connection_list_size = torch.tensor(
            len(self.connection_list), dtype=torch.int)
        if self.backend == NCCL:
            connection_list_size = connection_list_size.cuda()
        gathered_connection_list_sizes = [
            torch.ones_like(connection_list_size)
            for _ in range(self.world_size)]
        
        # 用集合通信来对 connection_list_size 进行聚合,最后得到的gathered_connection_list_sizes就是所有节点上的 connection_list_size 集合
        dist.all_gather(gathered_connection_list_sizes,
                        connection_list_size)
        # 得到最大数值
        max_connection_list_size = max(
            gathered_connection_list_sizes)
​
        if max_connection_list_size == 0:
            return 
​
        # 利用最大数值来构建张量列表 connection_list_tensor
        # Build tensor to send local connection list to all other workers.
        connection_list_tensor = torch.ones([max_connection_list_size, 2],
                                            dtype=torch.int) * -1
        # 把张量移动到GPU之上
        if self.backend == NCCL:
            connection_list_tensor = connection_list_tensor.cuda()
        if len(self.connection_list) > 0:
            connection_list_tensor[0:len(self.connection_list)] = \
                torch.IntTensor(self.connection_list)
​
        # 用集合通信来对 connection_list_tensor进行聚合       
        # Gather connection lists of all workers.
        aggregated_connection_list = [
            torch.ones_like(connection_list_tensor)
            for _ in range(self.world_size)]
        dist.all_gather(aggregated_connection_list,
                        connection_list_tensor)
​
        # 在每个worker之上,利用 dist.new_group 建立同样的进程组
        # Construct identical process groups on each worker.
        local_rank_connections = 0
        for src_rank in range(len(aggregated_connection_list)):
            for connection in aggregated_connection_list[src_rank]:
                # 得到张量对应的tag
                tag = int(connection[0])
                dst_rank = int(connection[1])
​
                if tag == -1:
                    assert dst_rank == -1
                    continue
​
                min_rank = min(src_rank, dst_rank)
                max_rank = max(src_rank, dst_rank)
                assert min_rank != max_rank
​
                if min_rank not in self.process_groups:
                    self.process_groups[min_rank] = {}
​
                if max_rank not in self.process_groups[min_rank]:
                    self.process_groups[min_rank][max_rank] = {}
​
                if tag not in self.process_groups[min_rank][max_rank]:
                    # 用到了pytorch p2p 的api
                    sub_process_group_fwd = dist.new_group(
                        ranks=[min_rank, max_rank])
                    sub_process_group_bwd = dist.new_group(
                        ranks=[min_rank, max_rank])
​
                    # 针对每个张量,设置进程组
                    self.process_groups[min_rank][max_rank][tag] = {
                        'forward': sub_process_group_fwd,
                        'backward': sub_process_group_bwd
                    }
​
                    if min_rank == self.rank or max_rank == self.rank:
                        local_rank_connections += 1
        assert local_rank_connections == len(self.connection_list)

Как использовать группы процессов? Будут использоваться такие функции, как recv_helper_thread_args, например:

    def recv_helper_thread_args(self, tensor_name, index, dtype,
                                backward, num_iterations):
        if backward:
            src_rank = self.send_ranks[tensor_name][index]
        else:
            src_rank = self.receive_ranks[tensor_name][index]
​
        sub_process_group = None
        # 获取张量 tensor_name 对应的 tag
        tag = self.tensor_tags[tensor_name]
        if self.is_gpu_to_gpu_comm(connected_rank=src_rank) and tensor_name != "ack":
            min_rank = min(self.rank, src_rank)
            max_rank = max(self.rank, src_rank)
            
            if src_rank > self.rank:
                # 获取 tag 对应的进程组,调用者后续会使用
                sub_process_group = \
                    self.process_groups[min_rank][max_rank][tag]['backward']
            else:
                # 获取 tag 对应的进程组,调用者后续会使用
                sub_process_group = \
                    self.process_groups[min_rank][max_rank][tag]['forward']
            assert sub_process_group
​
        if backward:
            queue = self.backward_receive_queues[tensor_name][index]
        else:
            queue = self.forward_receive_queues[tensor_name][index]
        tensor_shape = self.tensor_shapes[tensor_name]
​
        return (queue, self.counter, self.local_rank, tensor_name,
                src_rank, tag, tensor_shape, dtype, sub_process_group,
                num_iterations)

3.5 Запустите вспомогательный поток

Используйте start_helper_threads для запуска вспомогательных потоков. Эти вспомогательные потоки предназначены для использования P2P.

Во-первых, на примере рангов видно, что ключ — это имя тензора, а значение — список рангов.

receive_ranks = {dict: 3}  # 这里就是每个tensor对应的接收目标rank
 'out8' = {list: 1} [2]
 'out9' = {list: 1} [2]
 'out10' = {list: 1} [2]
 __len__ = {int} 3

3.5.1 Создание темы

Вспомните 4 очереди, созданные ранее:

  • forward_receive_queues : Очереди, которые принимают тензоры во время прямого распространения. Соответствует receive_ranks.
  • back_send_queues: очередь для отправки тензоров во время обратного распространения. Соответствует receive_ranks. Потому что объект, полученный при прямом распространении, является целью, отправленной при обратном распространении.
  • forward_send_queues : очередь для отправки тензоров во время прямого распространения. Соответствует send_ranks.
  • back_receive_queues : Очереди, которые принимают тензоры во время обратного распространения. Соответствует send_ranks. Потому что цель, отправленная в прямом проходе, является объектом, полученным в обратном проходе.

Эти 4 очереди на самом деле соответствуют 4 различным вспомогательным потокам.

Идея такова:

  • Процесс принятия рангов, то есть обход тензоров в receive_ranks

    • Пройдите ранги, соответствующие тензору, для каждого ранга

      • Требуется обратная обработка, поэтому устанавливается поток обратной отправки.
      • Создайте вспомогательный поток принятия
  • Обработка для отправки рангов, то есть обход тензоров в send_ranks

    • Пройдите ранги, соответствующие тензору, для каждого ранга

      • Требуется обратная обработка, поэтому устанавливается обратная принимающая нить.
      • Создайте вспомогательный поток отправки
  • Процесс для цели

  • Если есть только форвард, нужно завершить акк

Конкретный код:

    def start_helper_threads(self, num_iterations, forward_only):
        """
        Start helper communication threads, one for each queue.
        """
        if forward_only:
            self.set_counter(self.num_forward_threads +
                             self.num_ack_threads)
            # For validation, receive acks in backward pass from next stage, send
            # acks in backward pass to next stage.
            self.receive_ranks["ack"] = self.ranks_in_previous_stage
            self.send_ranks["ack"] = self.ranks_in_next_stage
        else:
            self.set_counter(self.num_forward_threads +
                             self.num_backward_threads)
            if "ack" in self.receive_ranks:
                del self.receive_ranks["ack"]
            if "ack" in self.send_ranks:
                del self.send_ranks["ack"]
​
        (num_iterations_for_forward_threads,
         num_iterations_for_backward_threads) = \
            self.num_iterations_for_helper_threads(
                num_iterations=num_iterations)
        dtype = torch.float16 if self.fp16 else torch.float32
​
        # Setup queues for each tensor to be received and sent.
        # 针对接受rank进行处理
        for input_name in self.receive_ranks:
            if input_name in self.target_tensor_names or input_name == "ack":
                continue
​
            # 遍历张量对应的ranks
            for i in range(len(self.receive_ranks[input_name])):
                if not forward_only:
                    # 需要后向处理,所以建立后向发送线程
                    self.start_helper_thread(
                        self.send_helper_thread_args,
                        send_helper_thread,
                        [input_name, i, True],
                        num_iterations_for_backward_threads)
                # 建立接受助手线程    
                self.start_helper_thread(
                    self.recv_helper_thread_args,
                    recv_helper_thread,
                    [input_name,
                     i,
                     self.training_tensor_dtypes[input_name],
                     False],
                    num_iterations_for_backward_threads)
             
        # 针对发送ranks进行处理
        for output_name in self.send_ranks:
            if output_name in self.target_tensor_names or output_name == "ack":
                continue
​
            # 遍历张量对应的ranks
            for i in range(len(self.send_ranks[output_name])):
                if not forward_only:
                    # 需要后向处理,所以建立后向接受线程
                    self.start_helper_thread(
                        self.recv_helper_thread_args,
                        recv_helper_thread,
                        [output_name, i,
                         self.training_tensor_dtypes[output_name],
                         True],
                        num_iterations_for_forward_threads)
                # 发送助手线程
                self.start_helper_thread(
                    self.send_helper_thread_args,
                    send_helper_thread,
                    [output_name, i, False],
                    num_iterations_for_forward_threads)
​
        # 针对target进行处理
        for target_tensor_name in self.target_tensor_names:
            if self.num_ranks_in_previous_stage > 0:
                for i in range(len(self.receive_ranks[target_tensor_name])):
                    self.start_helper_thread(
                        self.recv_helper_thread_args,
                        recv_helper_thread,
                        [target_tensor_name, i, torch.int64,
                         False],
                        num_iterations_for_backward_threads)
​
            if self.num_ranks_in_next_stage > 0:
                for i in range(len(self.send_ranks[target_tensor_name])):
                    self.start_helper_thread(
                        self.send_helper_thread_args,
                        send_helper_thread,
                        [target_tensor_name, i, False],
                        num_iterations_for_forward_threads)
​
        # Start helper threads for ack for forward pass-only run as a clocking
        # mechanism.
        # 如果只有前向,则需要补齐ack
        if forward_only:
            # 有前向就补齐 ack
            if "ack" in self.receive_ranks:
                for i in range(len(self.receive_ranks["ack"])):
                    self.start_helper_thread(self.send_helper_thread_args,
                                             send_helper_thread,
                                             ["ack", i, True],
                                             num_iterations_for_backward_threads)
            if "ack" in self.send_ranks:
                for i in range(len(self.send_ranks["ack"])):
                    self.start_helper_thread(self.recv_helper_thread_args,
                                             recv_helper_thread,
                                             ["ack", i, torch.int64, True],
                                             num_iterations_for_forward_threads)
​
​

Конкретная функция создания потока:

    def start_helper_thread(self, args_func, func, args_func_args, num_iterations):
        """
        Start passed-in func on a helper thread.
        """
        args_func_args += [num_iterations]
        args = args_func(*args_func_args) # 需要注意的是使用函数来获取对应的参数
        helper_thread = threading.Thread(target=func, # 用线程主函数来执行线程
                                         args=args)
        helper_thread.start()

3.5.2 Основная функция потока

recv_helper_thread и send_helper_thread принимают вспомогательный поток и отправляют вспомогательный поток соответственно. Вызовите _recv и _send соответственно, чтобы завершить конкретную работу.

Следует отметить, что функция используется для получения соответствующих параметров. Просто используйте recv_helper_thread_args и send_helper_thread_args для получения параметров.

def recv_helper_thread(queue, counter, local_rank, tensor_name,
                       src_rank, tag, tensor_shape, dtype,
                       sub_process_group, num_iterations):
    torch.cuda.set_device(local_rank)
    # This method is to be executed from a helper daemon thread.
    for i in range(num_iterations):
        tensor = _recv(
            tensor_name, src_rank, tensor_shape=tensor_shape,
            dtype=dtype, tag=tag,
            sub_process_group=sub_process_group)
        queue.add(tensor)
    counter.decrement()
​
def send_helper_thread(queue, counter, local_rank, tensor_name,
                       src_rank, dst_rank, tag,
                       sub_process_group, num_iterations):
    torch.cuda.set_device(local_rank)
    # This method is to be executed from a helper daemon thread.
    for i in range(num_iterations):
        tensor = queue.remove()
        _send(tensor, tensor_name, src_rank, dst_rank,
              tag=tag,
              sub_process_group=sub_process_group)
    counter.decrement()

3.5.3 Параметры сборки

Напомним, что в методе create_process_groups есть следующий код, здесь для каждого тега задается группа процессов, во вспомогательном потоке эти группы процессов используются для завершения логики:

if tag not in self.process_groups[min_rank][max_rank]:
  sub_process_group_fwd = dist.new_group(ranks=[min_rank, max_rank])
    sub_process_group_bwd = dist.new_group(ranks=[min_rank, max_rank])
​
  self.process_groups[min_rank][max_rank][tag] = {
      'forward': sub_process_group_fwd,
        'backward': sub_process_group_bwd
  }

Используйте следующие функции для завершения сбора параметров основной функции потока. Основная логика такова:

  • Используйте имя тензора, чтобы получить соответствующий ранг
  • Используйте имя тензора, чтобы получить соответствующий тег
  • Используйте тег, чтобы получить соответствующую группу процессов
  • Используйте имя тензора и индекс, чтобы получить соответствующую очередь
  • возвращаемый параметр
    def recv_helper_thread_args(self, tensor_name, index, dtype,
                                backward, num_iterations):
        # 利用张量名字,获取到对应的rank
        if backward:
            src_rank = self.send_ranks[tensor_name][index]
        else:
            src_rank = self.receive_ranks[tensor_name][index]

        # 利用张量名字,获取到对应的tag
        sub_process_group = None
        tag = self.tensor_tags[tensor_name]
        
        # 使用tag来获取到对应的进程组
        if self.is_gpu_to_gpu_comm(connected_rank=src_rank) and tensor_name != "ack":
            min_rank = min(self.rank, src_rank)
            max_rank = max(self.rank, src_rank)
            if src_rank > self.rank:
                sub_process_group = \
                    self.process_groups[min_rank][max_rank][tag]['backward']
            else:
                sub_process_group = \
                    self.process_groups[min_rank][max_rank][tag]['forward']
            assert sub_process_group

        # 得到对应的queue
        if backward:
            queue = self.backward_receive_queues[tensor_name][index]
        else:
            queue = self.forward_receive_queues[tensor_name][index]
        tensor_shape = self.tensor_shapes[tensor_name]

        # 返回参数
        return (queue, self.counter, self.local_rank, tensor_name,
                src_rank, tag, tensor_shape, dtype, sub_process_group,
                num_iterations)

    def send_helper_thread_args(self, tensor_name, index,
                                backward, num_iterations):
        # 利用张量名字得到对应的rank
        if backward:
            dst_rank = self.receive_ranks[tensor_name][index]
            num_ranks_in_connected_stage = self.num_ranks_in_previous_stage
        else:
            dst_rank = self.send_ranks[tensor_name][index]
            num_ranks_in_connected_stage = self.num_ranks_in_next_stage

        # 使用tag来获取到对应的进程组
        sub_process_group = None
        tag = self.tensor_tags[tensor_name]
        if self.is_gpu_to_gpu_comm(connected_rank=dst_rank) and tensor_name != "ack":
            min_rank = min(self.rank, dst_rank)
            max_rank = max(self.rank, dst_rank)
            if dst_rank > self.rank:
                sub_process_group = \
                     self.process_groups[min_rank][max_rank][tag]['forward']
            else:
                sub_process_group = \
                    self.process_groups[min_rank][max_rank][tag]['backward']
            assert sub_process_group

        # 得到对应的queue
        if backward:
            queue = self.backward_send_queues[tensor_name][index]
        else:
            queue = self.forward_send_queues[tensor_name][index]

        # 返回参数
        return (queue, self.counter, self.local_rank, tensor_name, self.rank,
                dst_rank, tag, sub_process_group, num_iterations)

0x04 Функциональная функция

Следующие функциональные функции — это функции, которые в конечном итоге используются для завершения конвейерной логики RPC.

Вот развязка, выполненная через очередь:

  • Recv и send будут работать с очередью, добавляя или извлекая тензоры из очереди.
  • Вспомогательный поток вызовет _recv и _send для работы с очередью.

Итак, мы должны сначала посмотреть на реализацию этой Очереди.Вы можете видеть, что threading.Condition используется как для добавления, так и для удаления, что означает, что несколько потоков могут ждать и блокироваться в Очереди через добавление/удаление, то есть производитель и потребитель.

class Queue:
    def __init__(self):
        self.queue = []
        self.cv = threading.Condition()
​
    def add(self, tensor):
        self.cv.acquire()
        self.queue.append(tensor)
        self.cv.notify()
        self.cv.release()
​
    def remove(self):
        self.cv.acquire()
        while len(self.queue) == 0:
            self.cv.wait()
        tensor = self.queue.pop(0)
        self.cv.release()
        return tensor

4.1 Логика отправки

Логика отправки следующая:

  1. Код обучения вызывает StageRuntime.run_backward.
  2. Метод StageRuntime.run_backward вызывает StageRuntime.send_tensors_backward для отправки тензора tensor_name.
  3. send_tensors_backward вызовет CommunicationHandler.send, чтобы добавить этот тензор в переменную-член CommunicationHandler back_send_queues[tensor_name][index]. Каждый тензор соответствует нескольким очередям. Вот и развязка.
  4. Функция отправки вызовет back_send_queues.add, который уведомит send_helper_thread, заблокированный в очереди, для работы.
  5. В потоке send_helper_thread CommunicationHandler, который ранее был заблокирован в очереди, будет извлечен тензор из reverse_send_queues[tensor_name][index].
  6. send_helper_thread вызовет _send для отправки тензоров.
  7. Последний вызов — dist.send, то есть PyTorch P2P.

Подробности следующие:

 StageRuntime            CommunicationHandler              send_helper_thread
​
      +                           +                                 +
      |                           |                                 |
      | 1                         |                                 |
      v                           |                                 |
 run_backward                     |                                 |
      |                           |                                 |
      | 2                         |                                 |
      |                           |                    wait on backward_send_queues
      v                  3        v                                 |
send_tensors_backward +--------> send                               |
                                  |                                 |
                                  |                                 |
                                  |  4                              |
                                  v               5                 v
               backward_send_queues.add(tensor) +----> tensor = queue.remove()
                                                notify              |
                                                                    |
                                                                    | 6
                                                                    v
                                                                  _send
                                                                    |
                                                                    | 7
                                                                    |
                                                                    v
                                                                 dist.send
​

4.2 Логика принятия

Логика приема следующая:

  1. run_backward вызывается в обучающем коде StageRuntime.
  2. run_backward вызывает receive_tensors_backward.
  3. вызов receive_tensors_backwardself.gradients[output_name] = self.comm_handler.recvПолучите градиент. Функция recv CommunicationHandler будет заблокирована вbackward_receive_queues[tensor_name] [index]выше.
  4. В то же время поток recv_helper_thread CommunicationHandler вызывает _recv для принятия тензоров с других этапов.
  5. _recv вызывает dist.recv или dist.broadcast для принятия тензоров.
  6. _recv добавляет тензор к back_receive_queues[tensor_name][index]. Это информирует блокирующую функцию recv CommunicationHandler о выполнении своей работы.
  7. Функция recv CommunicationHandler извлечет градиент из reverse_receive_queues[tensor_name] [index] и вернет его в StageRuntime. это возврат 3.

Подробности следующие:

    StageRuntime             CommunicationHandler           recv_helper_thread
          +                            +                            +
          |                            |                            |
          | 1                          |                            |
          |                            |                            | 4
          v                            |                            v
    run_backward                       |                         _recv
          |                            |                            |
          |                            |                            |
          |                            |                            | 5
          |                            |                            |
          | 2                          |                            v
          |                            |                  dist.recv / dist.broadcast
          |                            |                            |
          v                  3         v                            |
receive_tensors_backward +--------->  recv                          |
          +                            |                            |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |                            v                            |
          |                 backward_receive_queues.remove()        |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |               wait on backward_receive_queues           |
          |                            |                            |
          |                            |                            |
          |                            |                            |
          |                            |                 6          v
          |                  backward_receive_queues <-------+ queue.add(tensor)
          |                            |               notify
          |                            |  7
          v                  3 return  |
gradients[output_name] <---------------+
​

4.3 recv

Это фактически для получения соответствующего тензора из соответствующей очереди по имени тензора.

    def recv(self, tensor_name, forward_minibatch_id,
             backward_minibatch_id, backward=False):
        if backward:
            index = (backward_minibatch_id + self.rank_in_stage) % \
                len(self.backward_receive_queues[tensor_name])
            tensor = self.backward_receive_queues[tensor_name][
                index].remove()
            return tensor
        else:
            # 前向时候,需要知道从前一层的哪一个index获取
            index = self.get_messaging_index(sending=False)
            tensor = self.forward_receive_queues[tensor_name][
                index].remove()
            if tensor.dtype == torch.float32:
                tensor = tensor.requires_grad_()
            return tensor

В функциях receive_tensors_forward и receive_tensors_backward во время выполнения будет вызываться функция recv для получения сохраненных тензоров из соответствующей очереди. Например:

    def receive_tensors_backward(self):
        # Receive all required gradients from downstream
        # machines.
        for output_name in self.send_ranks:
             if output_name in self.target_tensor_names:
                continue
​
             self.gradients[output_name] = \
                self.comm_handler.recv( # 这里使用了
                    output_name,
                    forward_minibatch_id=self.forward_minibatch_id,
                    backward_minibatch_id=self.backward_minibatch_id,
                    backward=True)
​
             self.backward_stats.stats['receive_tensors_size'] += \
                 (self.gradients[output_name].element_size() *
                  self.gradients[output_name].nelement())

4.4 send

Здесь нужно поместить тензор в соответствующую очередь.

    def send(self, tensor_name, tensor, forward_minibatch_id,
             backward_minibatch_id, backward=False):
        if backward:
            # 后向时候,需要知道发送给前一层的哪一个index
            index = self.get_messaging_index(sending=True)
            dst_rank = self.receive_ranks[tensor_name][index]
            self.backward_send_queues[tensor_name][index].add(tensor)
        else:
            index = (forward_minibatch_id + self.rank_in_stage) % \
                len(self.send_ranks[tensor_name])
            self.forward_send_queues[tensor_name][index].add(tensor)
​
​

send_tensors_backward, send_tensors_forward будут использоваться, например:

    def send_tensors_backward(self):
        # Send all required gradients upstream.
        for input_name in self.receive_ranks:
            if input_name in self.target_tensor_names:
                continue
​
            self.comm_handler.send(
                input_name,
                self.gradients[input_name],
                forward_minibatch_id=self.forward_minibatch_id,
                backward_minibatch_id=self.backward_minibatch_id,
                backward=True)
​
            self.backward_stats.stats['send_tensors_size'] += \
                (self.gradients[input_name].element_size() *
                 self.gradients[input_name].nelement())
​
        if self.num_ranks_in_previous_stage > 0:
            # Used to track where to send tensors in the
            # backward pass.
            self.comm_handler.increment_messaging_index(
                sending=True)

4.5 _recv

В параметре _recv sub_process_group создается в приведенном выше коде.

Если на том же узле, используйте dist.broadcast, в противном случае используйте dist.recv.

def _recv(tensor_name, src_rank, tensor_shape=None, dtype=torch.float32,
          tensor=None, tag=None, sub_process_group=None):
    """
    Receives tensor by calling PyTorch's recv() call.
​
    Tensor will be copied to GPU prior to return.
    """
    assert tag is not None
    if tensor is None:
        assert tensor_shape is not None
        assert dtype is not None
        assert dtype != torch.float16
​
    if sub_process_group is not None:
        # Receive tensor shape.
        received_tensor_shape = torch.zeros(len(tensor_shape),
                                            dtype=torch.int)
        dist.broadcast(tensor=received_tensor_shape,
                       src=src_rank,
                       group=sub_process_group)
        received_tensor_shape = list(map(lambda x: int(x),
                                         received_tensor_shape))
​
        # Receive tensor.
        tensor = torch.zeros(received_tensor_shape, dtype=dtype).cuda()
        dist.broadcast(tensor=tensor,
                       src=src_rank,
                       group=sub_process_group)
    else:
        # Receive tensor shape.
        received_tensor_shape = torch.zeros(len(tensor_shape),
                                            dtype=torch.int)
        dist.recv(tensor=received_tensor_shape,
                  src=src_rank,
                  tag=tag)
        received_tensor_shape = list(map(lambda x: int(x),
                                         received_tensor_shape))
​
        # Receive tensor.
        tensor = torch.zeros(received_tensor_shape, dtype=dtype)
        dist.recv(tensor=tensor,
                  src=src_rank,
                  tag=tag)
        tensor = tensor.cuda()
​
    assert tensor.is_cuda
    return tensor

_recv вызывается в recv_helper_thread.

def recv_helper_thread(queue, counter, local_rank, tensor_name,
                       src_rank, tag, tensor_shape, dtype,
                       sub_process_group, num_iterations):
    torch.cuda.set_device(local_rank)
    # This method is to be executed from a helper daemon thread.
    for i in range(num_iterations):
        tensor = _recv(
            tensor_name, src_rank, tensor_shape=tensor_shape,
            dtype=dtype, tag=tag,
            sub_process_group=sub_process_group)
        queue.add(tensor) # 获取到张量之后,放入queue
    counter.decrement()

4.6 _send

Если на том же узле, используйте dist.broadcast, в противном случае используйте dist.send.

def _send(tensor, tensor_name, src_rank, dst_rank, tag, sub_process_group=None):
    """
    Sends tensor by calling PyTorch's send() call.
​
    If tensor is being sent not via broadcast(), it will
    be first copied to the CPU.
    """
    if sub_process_group is not None:
        assert tensor.is_cuda
​
        # Send tensor shape.
        tensor_shape = torch.tensor(tensor.shape, dtype=torch.int)
        dist.broadcast(tensor=tensor_shape, src=src_rank,
                      group=sub_process_group)
​
        # Send tensor.
        contiguous_tensor = tensor.detach().clone()
        dist.broadcast(tensor=contiguous_tensor.contiguous(),
                       src=src_rank,
                       group=sub_process_group)
    else:
        assert tensor.is_cuda
        tensor = tensor.cpu()
​
        # Send tensor shape.
        tensor_shape = torch.tensor(tensor.shape, dtype=torch.int)
        dist.send(tensor=tensor_shape, dst=dst_rank, tag=tag)
​
        # Send tensor.
        dist.send(tensor=tensor, dst=dst_rank, tag=tag)

recv_helper_thread использует _send для получения тензоров.

def send_helper_thread(queue, counter, local_rank, tensor_name,
                       src_rank, dst_rank, tag,
                       sub_process_group, num_iterations):
    torch.cuda.set_device(local_rank)
    # This method is to be executed from a helper daemon thread.
    for i in range(num_iterations):
        tensor = queue.remove()
        # 从queue提取张量,发送出去。
        _send(tensor, tensor_name, src_rank, dst_rank,
              tag=tag,
              sub_process_group=sub_process_group)
    counter.decrement()

На данный момент модуль связи проанализирован, и в следующей статье мы, наконец, представим 1F1B.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF