[Анализ исходного кода] Параллельный конвейер глубокого обучения PipeDream(2) --- Вычислительный раздел

машинное обучение глубокое обучение

0x00 сводка

В предыдущей статье мы представили общую архитектуру и этап профиля PipeDream, а в этой статье мы продолжаем знакомить с этапом расчета раздела. Его функция: определить время работы всех слоев в соответствии с результатами профиля, затем использовать динамическое программирование для разделения модели, разделить модель на разные этапы и получить номер репликации каждого этапа. Результаты расчета представлены на следующем рисунке:

Ссылки на другие статьи о конвейерном параллелизме:

[Анализ исходного кода] Конвейер глубокого обучения, параллельный Gpipe(1) --- Базовая реализация конвейера

[Анализ исходного кода] Конвейер глубокого обучения, параллельный GPipe (2) ----- накопление градиента

[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения

0x01 Предисловие

1.1 Файл профиля

Давайте сначала посмотрим на содержимое файла профиля profiler/translation/profiles/gnmt/graph.txt, здесь только выдержка.

node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000
node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000
node2 -- Input1 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000
node6 -- Dropout(p=0.2) -- forward_compute_time=0.077, backward_compute_time=0.196, activation_size=12582912.0, parameter_size=0.000
node7 -- LSTM(2048, 1024) -- forward_compute_time=3.190, backward_compute_time=5.348, activation_size=[6291456.0; 131072.0; 131072.0], parameter_size=50364416.000
node8 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000
node9 -- __getitem__(1) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=131072.0, parameter_size=0.000
node10 -- Dropout(p=0.2) -- forward_compute_time=0.064, backward_compute_time=0.128, activation_size=6291456.0, parameter_size=0.000
node11 -- LSTM(1024, 1024) -- forward_compute_time=2.491, backward_compute_time=4.203, activation_size=[6291456.0; 131072.0; 131072.0], parameter_size=33587200.000
node12 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000
node13 -- __getitem__(1) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=131072.0, parameter_size=0.000
node14 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000
node15 -- Dropout(p=0.2) -- forward_compute_time=0.059, backward_compute_time=0.121, activation_size=6291456.0, parameter_size=0.000
node16 -- LSTM(1024, 1024) -- forward_compute_time=2.492, backward_compute_time=4.201, activation_size=[6291456.0; 131072.0; 131072.0], parameter_size=33587200.000
node17 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000
......
    node1 -- node4
    node4 -- node5
    node2 -- node5
    node5 -- node6
    node6 -- node7
    node7 -- node8
    node7 -- node9
    node8 -- node10
    node10 -- node11
    node11 -- node12
    node11 -- node13
    node12 -- node14
    node8 -- node14
    node14 -- node15
    node15 -- node16
    node16 -- node17
    node16 -- node18
    node17 -- node19
    node14 -- node19
......

1.2 Общая идея

Ранее мы также упомянули несколько проблем, среди которых:

  • Как эффективно разделить трубопровод.

    • Особенности модели и топология оборудования снижают эффективность. Алгоритмы распределения также должны учитывать особенности модели и топологию оборудования.
    • Чрезмерная связь между машинами снижает эффективность оборудования.
  • Как предотвратить узкие места трубопровода.

    • Из принципа бочки мы можем знать, что пропускная способность конвейера определяется пропускной способностью самого медленного звена конвейера. Поэтому вам нужно убедиться, что все этапы конвейера тратят примерно одинаковое вычислительное время, иначе самый медленный этап станет узким местом всего конвейера.

Таким образом, при разделении слоев на разные этапы на разных машинах алгоритм автоматического разделения PipeDream должен гарантировать, что каждый этап выполняет примерно одинаковый общий объем работы. Также необходимо обеспечить, чтобы объем данных, передаваемых между этапами, был как можно меньше, чтобы избежать прерывания связи.

Общая цель алгоритма автоматического разбиения PipeDream — вывести сбалансированный конвейер.Алгоритм выглядит следующим образом:

  • Разделите слой DNN на этапы так, чтобы каждый этап выполнялся примерно с одинаковой скоростью, т. е. занимал примерно одинаковое количество времени вычислений.
  • Постарайтесь свести к минимуму взаимодействие между рабочими процессами с учетом топологии (например, по возможности отправляйте большие выходные данные на каналы с более высокой пропускной способностью).
  • Поскольку DNN не всегда могут быть равномерно распределены между доступными рабочими процессами, для дальнейшего улучшения балансировки нагрузки PipeDream позволяет дублировать этап, т. е. использовать несколько рабочих процессов на этом этапе для параллелизма данных.

Эта задача разделения эквивалентна минимизации времени, затрачиваемого самым медленным этапом конвейера, и обладает свойством оптимальной подзадачи: конвейер, максимизирующий пропускную способность, состоит из ряда подконвейеров, где каждый подконвейер A максимизирует свою производительность для меньших рабочие нагрузки. Поэтому PipeDream использует динамическое программирование для поиска оптимального решения.

Вот соответствующая схема архитектуры:

Давайте взглянем на подготовительную работу перед вычислением разделов: работу, связанную с графиком, и построение обратных ссылок.

0x02 Относится к изображению

Определение графа находится в файле graph/graph.py.Существуют две основные структуры данных: Graph и Node.

2.1 Graph

Граф — это структура данных графа, и его основными членами являются:

  • узлы : узлы в графе;
  • ребра: выходные ребра каждого узла в графе;
  • in_edges: входные ребра каждого узла графа;
  • _predecessors: предшественники каждого узла;
  • _successors : последующие узлы каждого узла;
  • _antichain_dag : античейн DAG;
class Graph(object):
    def __init__(self, node=None):
        self.nodes = {} # 节点
        if node is not None:
            self.nodes[node.node_id] = node
        self.edges = {} # 出边
        self.in_edges = {} # 入边
​
        self._predecessors = {} #每个节点的前序节点 
        self._successors = {} # 每个节点的后序节点
        self._augmented_antichains = {}
        self._deaugmented_augmented_antichains = {}
        self._next_antichains = {}
        self._antichain_dag = None # 反链DAG
​
        if node is not None:
            self.in_edges[node.node_id] = list()

Ниже приведено определение узла, которое содержит структуру, полученную из профиля, например:

  • forward_compute_time : время распространения вперед;
  • back_compute_time : время обратного распространения;
  • activation_size : размер значения активации;
  • параметр_размер : размер параметра;
class Node(object):
    def __init__(self, node_id, node_desc="", forward_compute_time=0.0,
                 backward_compute_time=0.0, activation_size=0.0, parameter_size=0.0,
                 stage_id=None):
        self.node_id = node_id
        self.node_desc = node_desc
        self.forward_compute_time = forward_compute_time
        self.backward_compute_time = backward_compute_time
        self.activation_size = activation_size
        self.parameter_size = parameter_size
        self.stage_id = stage_id
        self.depth = None
        self.height = None

Мы распечатываем время выполнения, чтобы увидеть конкретную ситуацию с Graph.

gr = {Graph} 
 # 边
 edges = {dict: 39}
  'node1' = {list: 1} 
   0 = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
  'node4' = {list: 1} 
   0 = {Node} node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000
   ......
​
 # 输入边 
 in_edges = {dict: 44} 
  'node4' = {list: 1} 
   0 = {Node} node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000
  'node5' = {list: 2} 
   0 = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
   1 = {Node} node2 -- Input1 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000
   ......
  
 # 节点 
 nodes = {dict: 48}
  'node1' = {Node} node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000
  'node4' = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
  'node5' = {Node} node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000
 ......
​
# 前置节点
_predecessors = {dict: 36} 
 'node4' = {set: 0} set()
  __len__ = {int} 0
 'node5' = {set: 1} {<graph.graph.Node object at 0x7fb055e4bf28>}
  {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
  __len__ = {int} 1
 'node6' = {set: 2} {<graph.graph.Node object at 0x7fb055e4bf98>, <graph.graph.Node object at 0x7fb055e4bf28>}
  {Node} node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000
  {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
  __len__ = {int} 2
 'node7' = {set: 3} {<graph.graph.Node object at 0x7fb055e4bf98>, <graph.graph.Node object at 0x7fb055e4bf28>, <graph.graph.Node object at 0x7fb055e670f0>}
  {Node} node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000
  {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
  {Node} node6 -- Dropout(p=0.2) -- forward_compute_time=0.077, backward_compute_time=0.196, activation_size=12582912.0, parameter_size=0.000
  __len__ = {int} 3
​
 # 其他变量
  _antichain_dag = {NoneType} None
  _augmented_antichains = {dict: 0} {}
  _deaugmented_augmented_antichains = {dict: 0} {}
  _next_antichains = {dict: 0} {}
  _successors = {dict: 0} {}

2.2 Построение диаграммы

График строится из строк в файле профиля. Мы можем узнать содержимое файла профиля, выяснив, в частности, различную обработку для каждой строки.

node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000
node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000
node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000
    node1 -- node4
    node4 -- node5
    node2 -- node5

Построить графикКонкретный код выглядит следующим образом:

@staticmethod
def from_str(graph_str):
    gr = Graph()
    graph_str_lines = graph_str.strip().split('\n') 
    for graph_str_line in graph_str_lines: # 逐行处理
        if not graph_str_line.startswith('\t'):
            node = Node.from_str(graph_str_line.strip()) # 构建节点
            gr.nodes[node.node_id] = node
        else:
            # 构建边
            [in_node_id, node_id] = graph_str_line.strip().split(" -- ")
            if node_id not in gr.in_edges: # 每个节点的输入边
                gr.in_edges[node_id] = [gr.nodes[in_node_id]]
            else:
                gr.in_edges[node_id].append(gr.nodes[in_node_id])
            if in_node_id not in gr.edges: # 每个节点的输出边
                gr.edges[in_node_id] = [gr.nodes[node_id]]
            else:
                gr.edges[in_node_id].append(gr.nodes[node_id])
    return gr

построить узелКонкретный код выглядит следующим образом:

    @staticmethod
    def from_str(node_str):
        node_str_tokens = node_str.strip().split(" -- ")
        node_id = node_str_tokens[0] # 节点名字
        node_desc = node_str_tokens[1] # 节点描述
        node_metadata = node_str_tokens[2] # 元数据
        stage_id = None
        if len(node_str_tokens) > 3:
            stage_id = int(node_str_tokens[3].split("=")[1]) # 阶段信息
        [forward_compute_time, backward_compute_time, activation_size, parameter_size] = node_metadata.split(", ")
        forward_compute_time = float(forward_compute_time.split("=")[1]) # 前向传播计算时间
        backward_compute_time = float(backward_compute_time.split("=")[1]) # 后向传播计算时间
        if "[" in activation_size:
            activation_size = activation_size.split("=")[1] # 激活值大小
            activation_size = sum([float(x) for x in activation_size.lstrip("[").rstrip("]").split("; ")])
        else:
            activation_size = float(activation_size.split("=")[1])
        parameter_size = float(parameter_size.split("=")[1]) # 参数大小
        # 构建节点
        return Node(node_id, node_desc, forward_compute_time=forward_compute_time,
                    backward_compute_time=backward_compute_time, activation_size=activation_size,
                    parameter_size=parameter_size, stage_id=stage_id)

2.3 Обратная ссылка

В ориентированном ациклическом графе есть следующие понятия:

  • Цепочка: Цепочка — это набор точек, любые две точки x, y в цепочке удовлетворяют следующим условиям: либо x может достичь y, либо y может достичь x. Его также можно рассматривать как полностью упорядоченное подмножество частично упорядоченного множества S (так называемый полный порядок означает, что можно сравнивать любые два элемента).
  • Антицепь: Антицепь также представляет собой набор точек Любые две точки x и y в этой цепи удовлетворяют следующим условиям: x не может достичь y, а y не может достичь x. Его также можно рассматривать как подмножество частично упорядоченного множества S, в котором любые два элемента несравнимы.

В структуре данных графа PipeDream также есть концепция антицепи. Узел обратной ссылки определяется следующим образом:

class AntichainNode(Node):
    def __init__(self, node_id, antichain, node_desc=""):
        self.antichain = antichain
        self.output_activation_size = 0.0
        super(AntichainNode, self).__init__(node_id, node_desc)

Поскольку здесь это слишком сложно, ниже мы посвятим раздел анализу.

0x03 Построить антицепочку

Поскольку концепция этого раздела довольно запутанная, мы заранее ее испортим.

Цель поиска последующей обратной ссылки узла состоит в том, чтобы найти следующую точку разделения графа A (которая может быть комбинацией нескольких узлов).Чтобы определить время работы (или другую информацию) узла A, нам нужно найти расширенная обратная ссылка A.

Конкретный код здесь находится вoptimizer_graph_hierarchical.pyдокумент.

Для демонстрации используем следующую логику:

+-------+       +-------+
| node1 |       | node2 |
+---+---+       +---+---+
    |               |
    |               |
    |               |
    v               v
+---+---+       +---+---+        +-------+        +-------+
| node4 +-----> | node5 +------> | node6 +------->+ node7 |
+-------+       +-------+        +-------+        +-+-+---+
                                                    | |
                                                    | |
                                      +-------------+ |
                                      |               |
                                      v               v
                                 +----+--+        +---+---+
                                 | node9 |        | node8 +-----+
                                 +-------+        +---+---+     |
                                                      |         |
                    +---------------------------------+         |
                    |                                           |
                    v                                           |
               +----+---+       +--------+        +--------+    |
               | node10 +-----> | node11 +------> | node12 |    |
               +--------+       +---+----+        +----+---+    |
                                    |                  |        |
                                    |                  |        |
                                    v                  v        |
                                +---+----+        +----+---+    |
                                | node13 |        | node14 +<---+
                                +--------+        +-+----+-+
                                                    |    |
                                             +------+    +---+
                                             |               |
                                             v               v
                                        +----+---+        +--+-----+
                                        | node15 |        | node19 |
                                        +--------+        +--------+

3.1 Вход в основную функцию

Начнем с рассмотрения основной функции. Первая часть основной функции заключается в построении антицепочечной и топологической сортировки следующим образом:

  • Удалите исходный узел из графа. Цель состоит в том, чтобы устранить помехи, потому что ввод должен быть в первом слое, оптимизатору не нужно выбирать, куда поместить ввод, поэтому сначала удалите его, а добавьте позже, когда модель будет преобразована.
  • Обработайте выходные данные графа, удалив неиспользуемые выходные данные.
  • Получают антицепной DAG.
  • Топологически отсортируйте DAG антицепи, чтобы получить отсортированный список узлов.

Конкретный код выглядит следующим образом:

def main(all_num_machines, profile_filename, network_bandwidths, memory_size,
         straight_pipeline, use_memory_constraint, use_fewer_machines,
         activation_compression_ratio, output_directory,
         print_configuration=True, verbose=False):
    gr = graph.Graph.from_str(open(profile_filename, 'r').read())
​
    # Zero out all metadata associated with inputs in graph, since the optimizer
    # shouldn't really get a choice with where to place the input (should always
    # be in the first stage).
    # 排除干扰,因为input必然在第一层,没必要让优化器再来选择把输入放在哪里,所以先去除,后续会再加上。
    sources = gr.sources() # 对图的输入进行处理
    nodes_to_remove = OrderedDict()
    for source in sources:
        if source.node_desc.startswith("Input"): # 只处理input
            source.forward_compute_time = 0.0
            source.backward_compute_time = 0.0
            source.activation_size = 0.0
            source.parameter_size = 0.0
            nodes_to_remove[source] = []
            for out_node in gr.edges[source.node_id]:
                nodes_to_remove[source].append(out_node) # 记录这些删除source对应了哪些out节点,因为后续还要处理
            gr.remove_node(source) # 在图中移除这些input source
​
    # Remove all unneeded sinks that are not used, makes code generation and
    # optimization easier.
    sinks = gr.sinks() # 对图的输出进行处理,移除没有用到的输出
    for sink in sinks:
        if sink.node_desc.startswith("__getitem__"):
            gr.remove_node(sink)
​
    antichain_gr = gr.antichain_dag() # 得到反链DAG
    states = antichain_gr.topological_sort() # 拓扑排序,得到一个排序好的节点列表
​
    # 后续代码暂时省略

Здесь определение узла антицепи вынесено следующим образом, и видно, что оно соответствует коду.

class AntichainNode(Node):
    def __init__(self, node_id, antichain, node_desc=""):
        self.antichain = antichain
        self.output_activation_size = 0.0
        super(AntichainNode, self).__init__(node_id, node_desc)

3.2 Расширенные обратные ссылки

Прежде всего, нам необходимо ввести понятие первой усиливающей антицепи. Расширенная обратная цепочка каждого узла включает в себя: собственный узел + несколько узлов предварительного заказа.

Алгоритм выбора для этого узла предварительного заказа:

  1. Получить список всех узлов предзаказа данного узла;
  2. Если «исходящий узел назначения» узла предварительного заказа отсутствует в списке всех узлов предварительного заказа, а «исходящий узел назначения» не является самим собой, узел предварительного заказа выбирается как часть расширенной антицепочки.

Как видно из следующей иллюстрации, если узел A имеет разветвленный узел Z в своем узле-предшественнике, и один из разветвлений обходит узел A, то для узла A его расширенными обратными ссылками являются [A, Z].

Для концепции расширенной антицепи это можно понимать так: для узла A, только рассматривая узел Z вместе, он может однозначно определить время работы своего собственного узла. Потому что, если я подумаю о времени работы узла А, я пойму, что общая идея такова:

  • Поскольку различные этапы могут выполняться параллельно, время выполнения A должно быть максимальным из следующих трех значений: время вычисления A, время ввода A и время вывода A.
  • Время ввода A является максимальным из следующих двух времен: X --> Время вывода узла A, Z --> Время вывода узла A.
  • Однако, поскольку внутренний механизм работы Z не ясен, невозможно определить, существует ли зависимость между двумя выходами Z, например, «Z -> D должно быть завершено, прежде чем Z -> A может быть выведено». , поэтому также необходимо учитывать время передачи Z --> D.

Следовательно, необходимо рассматривать [ A, Z ] вместе как состояние, собственно, этим и занимается PipeDream, используя состояние [ A, Z ] для унифицированного расчета.

Поскольку это рассматривается как состояние, выходное значение активации вычисляется для узла А. В частности, оно вычисляется путем обхода его антицепи (расширенной антицепи), то есть выход узла предварительного заказа его расширенного узла. накладывается антицепь.

    +-----+            +-----+
    |  X  |            |  Z  |
    +--+--+            +--+-++
       |                  | |
       |                  | |
       +------+   +-------+ |
              |   |         |
              v   v         |
             ++---++        |
             |  A  |        |
             ++-+--+        |
              | |           |
    +---------+ |           |
    |           |           |
    v           v           v
+---+-+      +--+--+      +-+---+
|  B  |      |  C  |      |  D  |
+-----+      +-----+      +-----+

В коде_augmented_antichainsЭто расширенная антицепочка, а также класс словаря.Ключом является имя узла, а значением является расширенная антицепочка ключевого узла, например:

Роль функции augment_antichain состоит в том, чтобы найти свою расширенную антицепочку для каждого узла.

def augment_antichain(self, antichain):
    # 参数 antichain 是一个节点列表
    antichain_key = tuple(sorted(antichain))
    # 如果key已经在扩大反链之中,就直接返回对应key的增强反链
    if antichain_key in self._augmented_antichains:
        return self._augmented_antichains[antichain_key]
    extra_nodes = set()
    all_predecessors = set()
    # 遍历参数list之中的反链节点,获取每个节点的前置节点,归并在all_predecessors之中。
    for antichain_node in antichain:
        predecessors = self.predecessors(antichain_node)
        all_predecessors = all_predecessors.union(predecessors)
    # 遍历参数list之中的反链节点
    for antichain_node in antichain:
        # 获取每个反链节点的前置节点列表
        predecessors = self.predecessors(antichain_node)
        # 遍历每个前置节点
        for predecessor in predecessors:
            # 看每个前置节点的出边,如果出边不在前置节点列表之中,且 出边节点不等于本反链节点
            for out_node in self.edges[predecessor.node_id]:
                if out_node not in predecessors and out_node.node_id != antichain_node:
                    # 把这个前置节点插入到附加节点列表中
                    extra_nodes.add(predecessor.node_id)
    # 最终把个附加节点列表插入到增强节点之中
    self._augmented_antichains[antichain_key] = list(extra_nodes) + antichain
    return self._augmented_antichains[antichain_key]

Например, согласно логике на рисунке ниже, после инициализации _augmented_antichains

_augmented_antichains = {dict: 1} 
 ('node4',) = {list: 1} ['node4']

После следующей итерации узла 5 _augmented_antichains

_augmented_antichains = {dict: 2} 
 ('node4',) = {list: 1} ['node4']
 ('node5',) = {list: 1} ['node5']
 __len__ = {int} 2

Продолжайте итерацию и улучшайте антицепь следующим образом:

_augmented_antichains = {dict: 7} 
('node4',) = {list: 1} ['node4'] # node4的增强反链只有自己
('node5',) = {list: 1} ['node5'] # node5的增强反链只有自己
('node6',) = {list: 1} ['node6']
('node7',) = {list: 1} ['node7']
('node8',) = {list: 1} ['node8']
('node10',) = {list: 2} ['node8', 'node10'] # node10的增强反链是'node8', 'node10'
('node14',) = {list: 1} ['node14']
('node11',) = {list: 2} ['node8', 'node11'] # node11的增强反链是'node8', 'node11'
('node15',) = {list: 2} ['node14', 'node15']
('node19',) = {list: 1} ['node19']
('node12',) = {list: 2} ['node8', 'node12']
('node16',) = {list: 2} ['node14', 'node16']
('node23',) = {list: 2} ['node20', 'node23']
('node17',) = {list: 2} ['node14', 'node17']  

Как видно из легенды, поскольку существует исходящая грань [узел 8, узел 14] узла 8, для узла 10, узла 11, узла 12 они должны добавить узел 8 в свою расширенную обратную цепочку.

Для узла 10 мы можем подумать, что он должен быть объединен с узлом 8, прежде чем узел 10 сможет определить время выполнения узла 10. Расширенная обратная ссылка узла 10 (собственный узел + несколько узлов предварительного заказа) отмечена на рисунке ниже.

+-------+       +-------+
| node1 |       | node2 |
+---+---+       +---+---+
    |               |
    |               |
    |               |
    v               v
+---+---+       +---+---+        +-------+        +-------+
| node4 +-----> | node5 +------> | node6 +------->+ node7 |
+-------+       +-------+        +-------+        +-+-+---+
                                                    | |
                                                    | |
                                      +-------------+ |
                                      |               |
                                      v               v  augmented
                                 +----+--+        +---+---+
                                 | node9 |        | node8 +-----+
                                 +-------+        +---+---+     |
                                                      |         |
                    +---------------------------------+         |
                    |                                           |
                    v                                           |
               +----+---+       +--------+        +--------+    |
     antichain | node10 +-----> | node11 +------> | node12 |    |
               +--------+       +---+----+        +----+---+    |
             augmented              |                  |        |
                                    |                  |        |
                                    v                  v        |
                                +---+----+        +----+---+    |
                                | node13 |        | node14 +<---+
                                +--------+        +-+----+-+
                                                    |    |
                                             +------+    +---+
                                             |               |
                                             v               v
                                        +----+---+        +--+-----+
                                        | node15 |        | node19 |
                                        +--------+        +--------+

3.3 Последующие обратные ссылки

В коде _next_antichains — это класс словаря, ключ — это имя узла, а значение — последующая антицепочка ключевого узла.

Например, для узла A следующая обратная ссылка — это [узел B, узел C], где узел B и узел C не могут быть упорядочены относительно друг друга. Цель поиска обратных ссылок — найти следующую точку разделения графика.

    +-----+            +-----+
    |  X  |            |  Z  |
    +--+--+            +--+-++
       |                  | |
       |                  | |
       +------+   +-------+ |
              |   |         |
              v   v         |
             ++---++        |
             |  A  |        |
             ++-+--+        |
              | |           |
    +---------+ |           |
    |           |           |
    v           v           v
+---+-+      +--+--+      +-+---+
|  B  |      |  C  |      |  D  |
+-----+      +-----+      +-----+

Для каждой антицепи узла функция next_antichains получает последующие антицепи.

    def next_antichains(self, antichain):
        # 构建antichain的反链key,其实就是 antichain 自己作为key
        antichain_key = tuple(sorted(antichain))
        # 如果key已经在后续反链之中,则返回这个后续反链
        if antichain_key in self._next_antichains:
            return self._next_antichains[antichain_key]
​
        next_antichains = []
        antichain_set = set(antichain)
        # 获取 antichain 的增强反链
        augmented_antichain = self.augment_antichain(antichain)
        # 遍历增强反链
        for augmented_antichain_node in augmented_antichain:
            # 遍历增强反链某节点的出边
            next_nodes = self.edges[augmented_antichain_node] if augmented_antichain_node in self.edges else []
            # 遍历增强反链某节点的出边
            for next_node in next_nodes:
                # 如果出边节点已经在反链集合之中,跳过,进入下一循环
                if next_node.node_id in antichain_set:
                    continue
                # 如果出边节点是后续反链,则假如到反链列表   
                if self.is_next_antichain(augmented_antichain, next_node.node_id):
                    next_antichain = self.construct_antichain(augmented_antichain,
                                                              augmented_antichain_node,
                                                              next_node.node_id)
                    next_antichains.append(next_antichain)
        # 最终把反链列表设置为key对应的反链            
        self._next_antichains[antichain_key] = next_antichains
        return self._next_antichains[antichain_key]

Метод is_next_antichain используется для определения того, является ли новый узел последующей антицепочкой.

def is_next_antichain(self, augmented_antichain, new_node):
    successors = self.successors(new_node)
    augmented_antichain_set = set(augmented_antichain)
    # 遍历新节点的后续节点
    for successor in successors:
        # 如果后续节点有一个在增强节点之中,就返回false,说明不是后续反链
        if successor.node_id in augmented_antichain_set:
            return False
    # 否则就是后续反链      
    return True

Пример _next_antichains следующий, вы можете сравнить его с предыдущими усиленными антицепями.

  • Взяв в качестве примера узел 10, его расширенные узлы: [узел 8, узел 10],

  • Пройдитесь по этим дополненным узлам и посмотрите на исходящие ребра каждого дополненного узла. Исходящее ребро 8 — [узел 10, узел 14], а исходящее ребро 10 — [узел 11].

  • Итак, есть три точки: узел 10, узел 11, узел 14, которые нужно продолжать видеть. где узел 10 уже находится в [узле 8, узле 10], поэтому он не рассматривается.

  • Вызов is_next_antichain с 14.

    • Среди is_next_antichain augmented_antichain — это [узел 8, узел 10], а new_node — это узел 14.
    • Набор преемников получается как [node31, node16, node23, node44, node48 ....] и другие 22 узла, эти узлы не находятся в [узле 8, узле 10], поэтому is_next_antichain имеет значение true, 14 является одним из последующих антицепные узлы один.
  • Вызов is_next_antichain с 11.

    • Среди is_next_antichain augmented_antichain — это [узел 8, узел 10], а new_node — это узел 11.
    • Набор преемников получается как [node16, node40, node23, ....] и другие узлы, эти узлы не находятся в [узле 8, узле 10], поэтому is_next_antichain имеет значение true, 11 является одним из последующих узлов антицепочки .

Таким образом, следующая обратная ссылка для узла 10 — [['node14'],['node11']].

В сравненииПосмотрите, расширенная обратная ссылка для узла 10 ['node8', 'node10'],

_next_antichains = {dict: 99} 
 ('node4',) = {list: 1} [['node5']]
 ('node5',) = {list: 1} [['node6']]
 ('node6',) = {list: 1} [['node7']]
 ('node7',) = {list: 1} [['node8']]
 ('node8',) = {list: 2} [['node10'], ['node14']]
 ('node10',) = {list: 2} [['node14'], ['node11']] # 这里
 ('node14',) = {list: 2} [['node15'], ['node19']]
 ('node11',) = {list: 2} [['node14'], ['node12']]
 ('node15',) = {list: 2} [['node19'], ['node16']]
 ('node19',) = {list: 1} [['node23']]
 ('node12',) = {list: 2} [['node14'], ['node14']]
 ('node16',) = {list: 2} [['node19'], ['node17']]

Как показано на рисунке ниже, видно, что узел 11 и узел 14 действительно являются последующими антицепями узла 10, то есть граф можно разделить на эти два узла.

Это можно понять так: для узла 10 следующая обратная ссылка [узел 11, узел 14], где узел 11 и узел 14 не могут быть упорядочены относительно друг друга. Цель поиска последующих обратных ссылок — найти следующую точку разделения графика.

+-------+       +-------+
| node1 |       | node2 |
+---+---+       +---+---+
    |               |
    |               |
    |               |
    v               v
+---+---+       +---+---+        +-------+        +-------+
| node4 +-----> | node5 +------> | node6 +------->+ node7 |
+-------+       +-------+        +-------+        +-+-+---+
                                                    | |
                                                    | |
                                      +-------------+ |
                                      |               |
                                      v               v  augmented
                                 +----+--+        +---+---+
                                 | node9 |        | node8 +-----+
                                 +-------+        +---+---+     |
                                                      |         |
                    +---------------------------------+         |
                    |                                           |
                    v              next                         |
               +----+---+       +--------+        +--------+    |
     antichain | node10 +-----> | node11 +------> | node12 |    |
               +--------+       +---+----+        +----+---+    |
             augmented              |                  |        |
                                    |                  |        |
                                    v             next v        |
                                +---+----+        +----+---+    |
                                | node13 |        | node14 +<---+
                                +--------+        +-+----+-+
                                                    |    |
                                             +------+    +---+
                                             |               |
                                             v               v
                                        +----+---+        +--+-----+
                                        | node15 |        | node19 |
                                        +--------+        +--------+
​

3.4 Общая конструкция

Назначение antichain_dag основано наРасширенный список обратных ссылокиСписок последующих обратных ссылокпостроить антицепной DAG.

Давайте возьмем приведенный выше пример и возьмем узел 8 в качестве примера.

def antichain_dag(self):
    if self._antichain_dag is not None:
        return self._antichain_dag
​
    antichain_dag = Graph()
    antichain_id = 0
    antichain = [self.sources()[0].node_id] # 获取source第一个节点。
    # 构建首节点,同时利用 augment_antichain 来往_augmented_antichains 之中添加首节点。
    source_node = AntichainNode("antichain_%d" % antichain_id, self.augment_antichain(antichain))
    antichain_dag.source = source_node
    antichain_queue = [antichain] # 把第一个节点插入queue
    antichain_mapping = {tuple(sorted(antichain)): source_node}
​
    # 如果queue之中还有节点
    while len(antichain_queue) > 0:
        antichain = antichain_queue.pop(0) # 弹出第一个节点,赋值为 antichain,这里为 node 8
        # key就是由 antichain 节点名字构建,比如 antichain_key = {tuple: 1} node8
        antichain_key = tuple(sorted(antichain)) 
        # 如果 antichain_key 已经位于self._next_antichains之中,即 antichain_key 的后续反链已经被记录,就跳过去
        if antichain_key in self._next_antichains:  
            continue
        # 获取 antichain 的后续反链,对于8,这里是[[10],[14]]
        next_antichains = self.next_antichains(antichain)
        # 遍历后续反链[10,14]
        for next_antichain in next_antichains:
            # 下一个反链节点的key 10
            next_antichain_key = tuple(sorted(next_antichain))
            if next_antichain_key not in antichain_mapping: # 如果存在,就跳过
                antichain_id += 1
                # 下一反链节点 10 被设置为其增强节点 [ 8, 10 ]
                next_antichain_node = AntichainNode("antichain_%d" % antichain_id, self.augment_antichain(next_antichain))
                # 设置 antichain_mapping
                antichain_mapping[next_antichain_key] = next_antichain_node
            # 向 反链DAG 插入边:    
            antichain_dag.add_edge(antichain_mapping[antichain_key],
                                   antichain_mapping[next_antichain_key])
            # 把最新反链节点插入queue,下次迭代使用
            antichain_queue.append(next_antichain)
​
    self._antichain_dag = antichain_dag
    return antichain_dag

Цель здесь — установить antichain_mapping.

Процесс:

  • Вытащите первый узел из antichain_queue и назначьте его антицепочке, вот узел 8.
  • Получите последующую антицепь антицепи, для 8, здесь [[10],[14]].
  • Пройдите по последующим обратным ссылкам [10, 14].
  • Возьмите 10 в качестве примера, установите ключ следующего узла антицепи на 10.
  • Обратная цепочка следующего узла 10 устанавливается в его расширенный узел [8, 10], т. е. ('node10',) = {AntichainNode} antichain_5 - ['node8', 'node10'].

можно увидеть,Цель нахождения последующей антицепи узла состоит в том, чтобы найти следующую точку разделения графа A, а затем, чтобы определить время работы (или другую информацию) узла A, необходимо найти расширенную антицепь узла A (некоторые усиленные антицепи - это некоторые состояния), antichain_mapping A усиливает антицепь.

Пример antichain_mapping выглядит следующим образом:

antichain_mapping = {dict: 99} 
 ('node4',) = {AntichainNode} antichain_0 -- ['node4']
 ('node5',) = {AntichainNode} antichain_1 -- ['node5']
 ('node6',) = {AntichainNode} antichain_2 -- ['node6']
 ('node7',) = {AntichainNode} antichain_3 -- ['node7']
 ('node8',) = {AntichainNode} antichain_4 -- ['node8']
 ('node10',) = {AntichainNode} antichain_5 -- ['node8', 'node10'] # 最新设置
 ('node14',) = {AntichainNode} antichain_6 -- ['node14']
 ('node11',) = {AntichainNode} antichain_7 -- ['node8', 'node11']
 ('node15',) = {AntichainNode} antichain_8 -- ['node14', 'node15']
 ('node19',) = {AntichainNode} antichain_9 -- ['node19']
 ('node12',) = {AntichainNode} antichain_10 -- ['node8', 'node12']
 ('node16',) = {AntichainNode} antichain_11 -- ['node14', 'node16']
 ('node23',) = {AntichainNode} antichain_12 -- ['node20', 'node23']
 ('node17',) = {AntichainNode} antichain_13 -- ['node14', 'node17']

Пример antichain_dag выглядит следующим образом, который можно рассматривать какУсовершенствованный антицепной DAG:

antichain_dag = {Graph}
    nodes = {dict: 99} 
   'antichain_0' = {AntichainNode} antichain_0 -- ['node4']
   'antichain_1' = {AntichainNode} antichain_1 -- ['node5']
   'antichain_2' = {AntichainNode} antichain_2 -- ['node6']
   'antichain_3' = {AntichainNode} antichain_3 -- ['node7']
   'antichain_4' = {AntichainNode} antichain_4 -- ['node8']
   'antichain_5' = {AntichainNode} antichain_5 -- ['node8', 'node10']
   'antichain_6' = {AntichainNode} antichain_6 -- ['node14']
   'antichain_7' = {AntichainNode} antichain_7 -- ['node8', 'node11']
   'antichain_8' = {AntichainNode} antichain_8 -- ['node14', 'node15']
   'antichain_9' = {AntichainNode} antichain_9 -- ['node19']
   'antichain_10' = {AntichainNode} antichain_10 -- ['node8', 'node12']
   'antichain_11' = {AntichainNode} antichain_11 -- ['node14', 'node16']
   'antichain_12' = {AntichainNode} antichain_12 -- ['node20', 'node23']
   'antichain_13' = {AntichainNode} antichain_13 -- ['node14', 'node17']
   'antichain_14' = {AntichainNode} antichain_14 -- ['node20', 'node30', 'node23']
   'antichain_15' = {AntichainNode} antichain_15 -- ['node20', 'node36', 'node23']
   'antichain_16' = {AntichainNode} antichain_16 -- ['node20', 'node43', 'node23']
   'antichain_17' = {AntichainNode} antichain_17 -- ['node20', 'node23', 'node24']

3.5 Топологическая сортировка

После получения усиленной антицепи ее необходимо топологически отсортировать, прежде чем ее можно будет использовать.

antichain_gr = gr.antichain_dag()
states = antichain_gr.topological_sort()

Цель получения топологического упорядочения: если в соответствии с порядком вершин топологической последовательности, до достижения определенного узла, можно гарантировать, что все его действия предварительного порядка были завершены, так что весь проект выполняется в порядке без конфликта.

В теории графовТопологическая сортировкаЯвляетсяНаправленный ациклический граф (DAG, направленный ациклический граф)Линейная последовательность всех вершин . И последовательность должна удовлетворять следующим двум условиям:

  1. Каждая вершина появляется и появляется только один раз.
  2. Если существует путь из вершины A в вершину B, то вершина A стоит перед вершиной B в последовательности.

Направленные ациклические графы (DAG) имеют топологическую сортировку, а не-DAG-графы не имеют топологической сортировки. Ориентированный ациклический граф может иметьодин или большеТопологически отсортированная последовательность.

Например, следующее изображение:

+--------+                  +--------+
|        +----------------> |        |
|   1    |                  |   4    +------------+
|        |    +-----------> |        |            |
+-----+--+    |             +---+----+            |
      |       |                 |                 v
      |       |                 |              +--+--+
      |       |                 |        +---> |  5  |
      |       |                 |        |     +-----+
      v       |                 |        |
              |                 v        |
+--------+    |             +---+-----+  |
|        +----+             |         |  |
|    2   +----------------->+    3    +--+
|        |                  |         |
+--------+                  +---------+
​

Результат после топологической сортировки {1, 2, 4, 3, 5}.

Алгоритм топологической сортировки здесь использует сортировку в глубину.

    def topological_sort(self):
        # Algorithm from https://en.wikipedia.org/wiki/Topological_sorting
        self.sorted_nodes = []
        self.marked_nodes = set()
        self.temporarily_marked_nodes = set()
        nodes = list(self.nodes.values())
        nodes.sort(key=lambda x: x.node_desc)
        for node in nodes:
            if node.node_id in self.marked_nodes:
                continue
            self.topological_sort_helper(node.node_id)
        return [self.nodes[node_id] for node_id in self.sorted_nodes]
​
    def topological_sort_helper(self, node_id):
        if node_id in self.marked_nodes:
            return
        if node_id in self.temporarily_marked_nodes:
            raise Exception("Graph has a cycle")
        self.temporarily_marked_nodes.add(node_id)
        if node_id in self.edges:
            out_nodes = list(self.edges[node_id])
            out_nodes.sort(key=lambda x: (x.node_desc, x.height))
            for out_node in out_nodes:
                self.topological_sort_helper(out_node.node_id)
        self.marked_nodes.add(node_id)
        self.temporarily_marked_nodes.remove(node_id)
        self.sorted_nodes.insert(0, node_id)

Ниже приведен пример конечного результата, который можно сравнить с антицепным DAG antichain_dag выше, чтобы увидеть сходства и различия:

states = {list: 99} 
 00 = {AntichainNode} antichain_0 -- ['node4']
 01 = {AntichainNode} antichain_1 -- ['node5']
 02 = {AntichainNode} antichain_2 -- ['node6']
 03 = {AntichainNode} antichain_3 -- ['node7']
 04 = {AntichainNode} antichain_4 -- ['node8']
 05 = {AntichainNode} antichain_5 -- ['node8', 'node10']
 06 = {AntichainNode} antichain_7 -- ['node8', 'node11']
 07 = {AntichainNode} antichain_10 -- ['node8', 'node12']
 08 = {AntichainNode} antichain_6 -- ['node14']
 09 = {AntichainNode} antichain_8 -- ['node14', 'node15']
 10 = {AntichainNode} antichain_11 -- ['node14', 'node16']
 11 = {AntichainNode} antichain_13 -- ['node14', 'node17']
 12 = {AntichainNode} antichain_9 -- ['node19']
 13 = {AntichainNode} antichain_12 -- ['node20', 'node23']
 14 = {AntichainNode} antichain_18 -- ['node23', 'node20', 'node26']
 15 = {AntichainNode} antichain_17 -- ['node23', 'node20', 'node24']
 16 = {AntichainNode} antichain_32 -- ['node23', 'node20', 'node28']
 17 = {AntichainNode} antichain_31 -- ['node23', 'node20', 'node26', 'node24']
 18 = {AntichainNode} antichain_63 -- ['node23', 'node20', 'node26', 'node28']
 19 = {AntichainNode} antichain_33 -- ['node20', 'node26', 'node29']
 20 = {AntichainNode} antichain_16 -- ['node20', 'node43', 'node23']
 21 = {AntichainNode} antichain_30 -- ['node23', 'node20', 'node43', 'node26']
 22 = {AntichainNode} antichain_29 -- ['node23', 'node20', 'node43', 'node24']
 23 = {AntichainNode} antichain_59 -- ['node23', 'node20', 'node43', 'node28']

Мы также можем сравнить со следующей расширенной антицепью и увидеть, что состояния являются результатом топологической сортировки расширенной антицепи DAG, Логично тренироваться в таком порядке.

_augmented_antichains = {dict: 99} 
 ('node4',) = {list: 1} ['node4']
 ('node5',) = {list: 1} ['node5']
 ('node6',) = {list: 1} ['node6']
 ('node7',) = {list: 1} ['node7']
 ('node8',) = {list: 1} ['node8']
 ('node10',) = {list: 2} ['node8', 'node10']
 ('node14',) = {list: 1} ['node14']
 ('node11',) = {list: 2} ['node8', 'node11']
 ('node15',) = {list: 2} ['node14', 'node15']
 ('node19',) = {list: 1} ['node19']
 ('node12',) = {list: 2} ['node8', 'node12']
 ('node16',) = {list: 2} ['node14', 'node16']
 ('node23',) = {list: 2} ['node20', 'node23']
 ('node17',) = {list: 2} ['node14', 'node17']
 ('node23', 'node30') = {list: 3} ['node20', 'node30', 'node23']
 ('node23', 'node36') = {list: 3} ['node20', 'node36', 'node23']
 ('node23', 'node43') = {list: 3} ['node20', 'node43', 'node23']
 ('node24',) = {list: 3} ['node23', 'node20', 'node24']
 ('node26',) = {list: 3} ['node23', 'node20', 'node26']
 ('node23', 'node30', 'node36') = {list: 4} ['node20', 'node36', 'node30', 'node23']
 ('node23', 'node30', 'node43') = {list: 4} ['node20', 'node43', 'node30', 'node23']
 ('node31',) = {list: 3} ['node20', 'node26', 'node31']
 ('node24', 'node30') = {list: 4} ['node23', 'node20', 'node30', 'node24']
 ('node26', 'node30') = {list: 4} ['node23', 'node20', 'node30', 'node26']
 ('node23', 'node36', 'node43') = {list: 4} ['node20', 'node43', 'node36', 'node23']
 ('node37',) = {list: 4} ['node32', 'node20', 'node26', 'node37']
 ('node24', 'node36') = {list: 4} ['node23', 'node20', 'node36', 'node24']
 ('node26', 'node36') = {list: 4} ['node23', 'node20', 'node36', 'node26']
 ('node44',) = {list: 2} ['node40', 'node44']
 ('node24', 'node43') = {list: 4} ['node23', 'node20', 'node43', 'node24']
 ('node26', 'node43') = {list: 4} ['node23', 'node20', 'node43', 'node26']
 ('node24', 'node26') = {list: 4} ['node23', 'node20', 'node26', 'node24']

3.6 Резюме

Поскольку текущий алгоритм относительно сложен, мы временно подведем итоги работы:

  • Рассчитывается расширенная антицепочка каждого узла, и, наконец, получается усиленная комбинация антицепей._augmented_antichains.
  • Рассчитываются последующие обратные ссылки для каждого узла.Цель нахождения последующей обратной ссылки узла состоит в том, чтобы найти следующую точку разделения графа A, а затем, чтобы определить время работы (или другую информацию) узла A, необходимо найти расширенную обратную ссылку узла A (некоторое расширенное обратные ссылки - это некоторые состояния). _next_antichains — это комбинация последующих антицепей.
  • основа функции antichain_dag_next_antichainsи_augmented_antichainsОбработайте и создайте античейн DAG, который представляет собой переменную antichain_dag.
  • После получения усиленного антицепочечного DAG его необходимо топологически отсортировать, прежде чем его можно будет использовать. Цель получения топологического упорядочения: если в соответствии с порядком вершин топологической последовательности, до достижения определенного узла, можно гарантировать, что все его действия предварительного порядка были завершены, так что весь проект выполняется в порядке без конфликта.
  • состояний является результатом топологической сортировки усиленного антицепочечного DAG, и логично тренироваться в таком порядке. Таким образом, последующая работа должна выполняться на основе состояний.

0x04 Вычислительный раздел

До сих пор граф был разделен на несколько состояний в соответствии с последующими обратными ссылками, и важным атрибутом каждого состояния являются его расширенные обратные ссылки. Состояния являются результатом топологической сортировки усиленной антицепи, и логично тренироваться в таком порядке.

Алгоритм автоматического разбиения разделен на две части.

  • calculate_partitioning использует алгоритм динамического программирования для получения оптимального результата для этих состояний, но не выполняет конкретное разбиение.
  • Analyze_partitioning использует результаты оптимизации для создания конкретных разделов, и после сортировки получается результат частичного порядка.

Ниже мы проанализируем их один за другим.

4.1 Логика основной функции

Следующая логика, связанная с расчетным разделом основной функции, выглядит следующим образом:

  • Установите индекс для каждого состояния.
  • Рассчитайте значение активации вывода для каждого состояния, в частности, путем прохождения его анти цепочки (усиленная анти цепочка), которые можно рассматривать как вывод его необходимого узла предварительного порядка для себя.
  • Вычисление его информации для каждого состояния, такой как время расчета, размер активации, размер параметра и т. д., выполняется через предварительные узлы.
  • Получите общий выходной размер output_activation_sizes и все идентификаторы предварительного узла, которые потребуются позже при расчете разделов.
  • Время расчета внутри системы оценивается в соответствии с профилем, calculate_times_row — это время расчета от i-го узла до последующих узлов (i+1, i+2, ...), аналогично следующему.
  • По профилю оценивается значение активации внутри системы.
  • По профилю оценивается размер параметра внутри системы.
  • Комбинации набора машин и пропускной способности сети. Конвейер может быть прямым (число 1) или параллельным (число num_machines).В соответствии с текущей информацией, а также количеством машин, пропускной способностью сети и т. д. для расчета раздела используется алгоритм динамического программирования . Если есть две комбинации машинного набора и пропускной способности сети, каждая комбинация будет использоваться для выполнения алгоритма динамического программирования, и, наконец, all_As.append(A) Вот результат двух динамических программ, который является оптимальным после рассмотрения различных необходимых факторы результат.

Конкретный код выглядит следующим образом:

def main(all_num_machines, profile_filename, network_bandwidths, memory_size,
         straight_pipeline, use_memory_constraint, use_fewer_machines,
         activation_compression_ratio, output_directory,
         print_configuration=True, verbose=False):
    gr = graph.Graph.from_str(open(profile_filename, 'r').read())
​
    # Zero out all metadata associated with inputs in graph, since the optimizer
    # shouldn't really get a choice with where to place the input (should always
    # be in the first stage).
    # 排除干扰,因为input必然在第一层,没必要让优化器再来选择把输入放在哪里,所以先去除,后续会再加上。
    sources = gr.sources() # 对图的输入进行处理
    nodes_to_remove = OrderedDict()
    for source in sources:
        if source.node_desc.startswith("Input"): # 只处理input
            source.forward_compute_time = 0.0
            source.backward_compute_time = 0.0
            source.activation_size = 0.0
            source.parameter_size = 0.0
            nodes_to_remove[source] = []
            for out_node in gr.edges[source.node_id]:
                nodes_to_remove[source].append(out_node) # 记录这些删除source对应了哪些out节点,因为后续还要处理
            gr.remove_node(source) # 在图中移除这些input source
​
    # Remove all unneeded sinks that are not used, makes code generation and
    # optimization easier.
    sinks = gr.sinks() # 对图的输出进行处理,移除没有用到的输出
    for sink in sinks:
        if sink.node_desc.startswith("__getitem__"):
            gr.remove_node(sink)
​
    antichain_gr = gr.antichain_dag() # 得到反链DAG
    states = antichain_gr.topological_sort() # 拓扑排序,得到一个排序好的节点列表
​
    ###########################################################################
    # 之前代码在上节分析过,我们本节从这里继续分析
    ###########################################################################
    
    states_indices = {} # 为每个状态设置index
    for i in range(len(states)):
        states_indices[states[i]] = i
        
##################################### 运行时如下        
#states_indices = {dict: 99} 
# antichain_0 -- ['node4'] = {int} 0
# antichain_1 -- ['node5'] = {int} 1
# antichain_2 -- ['node6'] = {int} 2
# antichain_3 -- ['node7'] = {int} 3
# antichain_4 -- ['node8'] = {int} 4
# ......
         
    # 给每个状态计算出输出激活值大小,具体是通过遍历其反链(增强反链),可以认为就是其必要前序节点给自己的输出
    for i in range(len(states)):
        for antichain_node in states[i].antichain:
            states[i].output_activation_size += gr.nodes[antichain_node].activation_size
       
    # 给每个状态计算其信息,比如计算时间,激活大小,参数大小等等,都是通过前置节点完成的      
    for i in range(len(states)):
        antichain = states[i].antichain
        all_predecessors = gr.all_predecessors(antichain)
        states[i].compute_time = 0.0
        states[i].activation_size = 0.0
        states[i].parameter_size = 0.0
        for predecessor in all_predecessors: # 计算所有前置节点的信息
            states[i].compute_time += ((predecessor.forward_compute_time +
                                        predecessor.backward_compute_time) / 1000.0)
            states[i].activation_size += predecessor.activation_size
            states[i].parameter_size += predecessor.parameter_size
    gr.reset()
​
    # 得到总体输出大小 & 所有前置节点id,后面计算分区时候需要
    output_activation_sizes = [state.output_activation_size for state in states]
    all_predecessor_ids = [[states_indices[predecessor] for predecessor in
                            antichain_gr.predecessors(states[i].node_id)]
                           for i in range(len(states))]
​
##################################### 运行时如下      
# output_activation_sizes = {list: 99} 
# 00 = {float} 6291456.0
# 01 = {float} 12582912.0
# 02 = {float} 12582912.0
# 03 = {float} 6553600.0    
# .....
# all_predecessor_ids = {list: 99} 
#  00 = {list: 0} []
#  01 = {list: 1} [0]
#  02 = {list: 2} [0, 1]
#  03 = {list: 3} [0, 1, 2]
#  04 = {list: 4} [0, 1, 2, 3]
#  05 = {list: 5} [2, 3, 4, 0, 1]
#  06 = {list: 6} [2, 3, 4, 0, 1, 5]
#  07 = {list: 7} [6, 2, 3, 4, 0, 1, 5]
# ......
    
    compute_times = [] # 初始化计算时间
    activation_sizes = [] # 初始化激活值大小
    parameter_sizes = [] # 初始化参数值大小
    for i in range(len(states)+1): # 具体计算每一个节点的信息,去除他之前节点的影响
        compute_times_row = []
        activation_sizes_row = []
        parameter_sizes_row = []
        for j in range(len(states)): # 去除之前的节点
            if i == 0: # 列表中第一个节点
                compute_times_row.append(states[j].compute_time) # i 到 j 的计算时间
                activation_sizes_row.append(states[j].activation_size)
                parameter_sizes_row.append(states[j].parameter_size)
            else: # 列表中后续节点
                if j > (i-1):
                    compute_times_row.append(states[j].compute_time -
                        states[i-1].compute_time) # i 到 j 的计算时间
                    activation_sizes_row.append(states[j].activation_size -
                        states[i-1].activation_size)
                    parameter_sizes_row.append(states[j].parameter_size -
                        states[i-1].parameter_size)
                else:
                    compute_times_row.append(None)
                    activation_sizes_row.append(None)
                    parameter_sizes_row.append(None)
        compute_times.append(compute_times_row) # 依据profile估计出系统内部的计算时间,compute_times_row 是 i 节点到 后续节点(i+1, i+2, ...)的计算时间,下面类似
        activation_sizes.append(activation_sizes_row) # 依据profile估计出系统内部的激活值大小
        parameter_sizes.append(parameter_sizes_row) # 依据profile估计出系统内部的参数大小
​
##################################### 运行时如下  
# compute_times = {list: 100} 
# 000 = {list: 99} [0.0070220000000000005, 0.012285, 0.012558, 0.021096000000,...
# 001 = {list: 99} [None, 0.005263, 0.005535999999999999, 0.014074000000000003, ...
# 002 = {list: 99} [None, None, 0.00027299999999999894, 0.008811000000000003, ...
# 003 = {list: 99} [None, None, None, 0.008538000000000004, 0.008538, ...
# 004 = {list: 99} [None, None, None, None, -3.469446951953614e-18, 0.000191999999...
​
    counter = 1
    all_As = []
    num_machines_in_machine = 1 #第一个节点就是1
    # all_num_machines, network_bandwidths 是用户在输入中指定
    # 遍历机器集&网络带宽组合。流水线可以是straight(数目为1)或者并行(数目为num_machines)
    for num_machines, network_bandwidth in zip(all_num_machines, network_bandwidths):
        print("Solving optimization problem with %d machines with inter-machine bandwidth of %.2f GB/s" % (num_machines, network_bandwidth / 10**9))
        import numpy as np
        print(np.array(compute_times))
        # 依据目前的信息,以及机器数量,网络带宽等计算分区
        A = compute_partitioning(compute_times, activation_sizes, parameter_sizes,
                                 output_activation_sizes, all_predecessor_ids,
                                 num_machines, num_machines_in_machine,
                                 network_bandwidth,
                                 final_level=(counter==len(network_bandwidths)))
        num_machines_in_machine = num_machines # 因为计算完了,所以设置为本阶段的机器数目
        for i in range(len(compute_times)): # 遍历机器
            for j in range(len(compute_times[0])): # 后续机器
                compute_times[i][j] = A[i][j][-1][0] # 记录计算时间(本阶段最后一个机器的计算时间)
        counter += 1
        all_As.append(A) # 添加逻辑关系,就是里面包括了不同阶段的优化逻辑
    print(np.array(compute_times))
    
    # 省略后续代码

Среди них, calculate_times представляет собой двумерный массив времени вычислений, который также можно рассматривать как матрицу Конкретные примеры приведены ниже.

[w12,w13,w14,w15], // 第一个节点到后续节点的计算时间
​
[None, w23,w24,w25], // 第二个节点到后续节点的计算时间
​
[None, None, w34, w35], // 第三个节点到后续节点的计算时间
​
[None, None, None, w45], // 第四个节点到后续节点的计算时间

активация_размеры и параметры_размеры аналогичны.

4.2 Динамическое программирование

4.2.1 Общая идея

Вот некоторые алгоритмы динамического программирования для анализа.

Алгоритм сегментации пытается сократить общее время обучения модели. Для конвейерных систем эта проблема эквивалентна минимизации времени, затрачиваемого на самую медленную стадию конвейера. Проблема носит характер подзадачи оптимизации: при заданном количестве машин конвейер, максимизирующий пропускную способность, состоит из подканалов, каждый из которых максимизирует пропускную способность своего собственного подканала. Следовательно, мы можем использовать динамическое программирование, чтобы найти оптимальное решение этой проблемы.

Алгоритм разделения берет выходные данные шага профилирования и вычисляет:

1) Разделите слой на несколько этапов,

2) коэффициент репликации (количество рабочих) для каждого этапа,

3) Оптимальное количество динамических мини-пакетов, чтобы конвейер обучения был загружен.

Оптимизатор PipeDream предполагает, что топология машины является иерархической и может быть организована на несколько уровней, как показано на рисунке ниже. Пропускная способность внутри класса одинакова, а пропускная способность между классами различна. Предположим, что уровень k состоит из mk k-1 компонентов уровня, соединенных связями с пропускной способностью Bk. На рисунке ниже m2=2, m1=4. Кроме того, мы определяем m0 равным 1. То есть 4 m0s образуют m1, а 2 m1s образуют m2.

Уровень 0 — это зеленый прямоугольник, представляющий самое низкое вычислительное устройство, такое как GPU, 4 GPU составляют уровень 1 (пунктирный прямоугольник, представляющий сервер), а 2 уровня 1 составляют уровень 2 (то есть все модули в рисунок ниже).

Оптимизатор PipeDream шаг за шагом решает задачу динамического программирования от самого низкого уровня до самого высокого уровня. Интуитивно этот процесс находит лучшие разделы среди серверов, а затем использует эти разделы для оптимального разделения модели между серверами.

4.2.2 Специальный анализ

Пусть A(j, m) обозначает время, затраченное на самый медленный этап оптимального конвейера между уровнем 1 и уровнем j с использованием m машин.

Цель нашего алгоритма — найти A(N,M) и соответствующий раздел. Пусть T(i → j,m) обозначает время, необходимое для охвата одного уровня от слоев i до j, воспроизведенных на m машинах.

в:

  • Левый член в max — это общее время вычислений всех уровней в этой фазе, а правый член — это общее время связи всех уровней в этой фазе.
  • Поскольку вычисления и обмен данными могут перекрываться, добавлять их не нужно, просто возьмите максимальное значение.
  • Оптимальный конвейер из m машин от 1 до j может представлять собой одну стадию, воспроизводимую m раз, или состоять из нескольких стадий.

Когда оптимальный конвейер содержит несколько стадий, его можно разбить на оптимальный подконвейер (состоящий из m − m' машин от 1 до i) и последующую индивидуальную стадию (состоит из i+1 до j, состоящую из m' копируемых машин). ). Поэтому, используя свойства оптимальной подзадачи, получаем

Среди них макс.:

  • Первый член — это время, затрачиваемое самой медленной стадией оптимального субконвейера (состоящего из m-m' машин) между уровнем 1 и уровнем i.
  • Второй член — это время, необходимое для прохождения активаций и градиентов между слоями i и i + 1.
  • Третий член — это время последней одиночной стадии (состоящей из m' машин с параллельными данными).

Давайте посмотрим, как конкретно его рассчитать, предполагая, что логика графа следующая:

                       +----------------+
+-----+                |                +--------+
|     +------------->  |  k[m_prime]    |        |          +-----+
|  i  |                |                |        +--------->+     |
|     +----+           +----------------+                   |  j  |
+-----+    |                                      +-------->+     |
           |           +----------------+         |         +-----+
           |           |                |         |
           +-------->  |  k[m-m_prime]  +---------+
                       |                |
                       +----------------+

Выберите наибольшее из (A[i][k][m-m_prime][0], last_stage_time, output_transfer_time, input_transfer_time):

  • A [i] [k] [m-m_prime] [0] : время вычисления между i и k, которое является вычисляемой подзадачей.

  • last_stage_time : last_stage_time (время расчета от k до j) + время передачи.

    • Где calculate_times[k + 1] [j] — время вычисления от k до j, а calculate_times[k + 1] соответствует выходу k.
    • Время передачи рассчитывается на основе размеров параметров следующего этапа от k до j (parameter_sizes[k + 1][j]).
    • то есть: last_stage_time = время_вычислений[k + 1] + (размеры_параметров[k + 1][j])
  • input_transfer_time : время передачи, рассчитанное с использованием размера активации вывода k (то есть ввода j).

  • output_transfer_time : время передачи, рассчитанное с использованием размера активации выхода j.

Поскольку передача и вычисление могут перекрываться, таким образом можно взять максимальное значение.

Финал А является результатом оптимизации динамического программирования, где каждый элементA[i][j][m]это тройка(min_pipeline_time, optimal_split, optimal_num_machines).A[i][j][m]Представляет результат расчета между узлом i и узлом j. Тройка (минимальное время конвейера, оптимальная точка разделения между i и j, оптимальное количество машин).

Примерные этапы показаны на рисунке ниже:

                                                       +----------------+
                                                       | i              |
                                                       |                |
                                                       |                |
                                                       +--+------+------+
                                                          |      |
                                                          |      +----------+
                                  A[i][k][m+m_prime][0]   |                 |
                                                          |                 |
                                                          v                 v
                                        +-----------------+-------+    +----+--------+
                                        | k[m-m_prime]            |    | k[m_prime]  |
                                        |                         |    |             |
last_stage_time = compute_times[k+1][j] |                         |    |             |
            + (parameter_sizes[k+1][j]) | output_activation_sizes |    |             |
                                        |                         |    |             |
                                        |                         |    |             |
                                        +-----------------+-------+    +-----+-------+
                                     input_transfer_time  |                  |
                                                          |      +-----------+
                                                          |      |
                                                          |      |
                                                          v      v
                                             +------------+------+------+
                                             | j                        |
                                             |                          |
                                             |                          |
                                             |                          |
                                             |  output_activation_sizes |
                                             |                          |
                                             +------------------+-------+
                                          output_transfer_time  |
                                                                |
                                                                |
                                                                v

Конкретный код выглядит следующим образом:

def compute_partitioning(compute_times, activation_sizes, parameter_sizes,
                         output_activation_sizes, all_predecessor_ids,
                         num_machines, num_machines_within_machine,
                         bandwidth, final_level=True):
    # 初始化
    A = []
    for i in range(len(compute_times)): # 遍历所有节点
        row_A = []
        for j in range(len(compute_times[0])): # 所有后续节点(即第一个节点的所有后续节点)
            row_row_A = []
            for m in range(num_machines): # 机器数目
                row_row_A.append((None, None, None))
            row_A.append(row_row_A)
        A.append(row_A)
​
    # 得到计算时间
    for i in range(len(compute_times)): # 遍历所有节点
        for j in range(i, len(compute_times[0])): # 所有后续节点
            cum_compute_time = compute_times[i][j] # i --> j 的计算时间
            cum_activation_size = activation_sizes[i][j] # i --> j 的激活大小
            cum_parameter_size = parameter_sizes[i][j] # i --> j 的参数大小
            max_m = 1 if straight_pipeline else num_machines # 线性还是并行流水线
            for m in range(max_m): # 遍历流水线下一阶段的机器
                # 存储的数据大小
                stashed_data_size = math.ceil((num_machines - (m+1)) / (m+1)) * \
                                              (cum_activation_size + cum_parameter_size)
                # memory_size 是用户传进来的参数,就是每个机器有效的内存  
                # use_memory_constraint 也是用户传进来的参数,就是使用的内存限制
                if use_memory_constraint and stashed_data_size > memory_size:
                    continue
                # 数据并行通讯时间依据参数尺寸,带宽,下一阶段机器数量计算    
                data_parallel_communication_time = (4 * m * cum_parameter_size) / (bandwidth * (m+1))
                # 除以本阶段机器数量,如果本阶段机器多,当然就是分开计算了
                data_parallel_communication_time /= num_machines_within_machine
​
                if cum_compute_time is None:
                    # 需要计算下一阶段中,每个机器的计算时间,所以还要除以(m+1)
                    A[i][j][m] = (None, None, None) # 直接赋值
                else:
                    # 三元组,分别是[(计算时间 + 通信时间), None,(m+1)],对应的意义是 min_pipeline_time, optimal_split, optimal_num_machines,就对应了前面的公式 2
                    A[i][j][m] = (sum([cum_compute_time,
                                       data_parallel_communication_time]) / (m+1), None, (m+1))
​
    # 需要得到最小计算时间                
    min_machines = 1
    max_i = len(compute_times) if not final_level else 1
    for i in range(max_i): # 遍历节点
        for m in range(min_machines, num_machines): # 遍历下一阶段机器的可能选择
            for j in range(i+1, len(compute_times[0])): # 遍历 i 的后续节点
                (min_pipeline_time, optimal_split, optimal_num_machines) = A[i][j][m]
                if use_fewer_machines and m > 0 and ( # 如果设置了用尽量少的机器,则如果小于min_pipeline_time,就设置新的 min_pipeline_time
                    min_pipeline_time is None or A[i][j][m-1][0] < min_pipeline_time):
                    (min_pipeline_time, optimal_split, optimal_num_machines) = A[i][j][m-1]
                # 遍历 j 节点的前置机器 k,注意,j 是 i 的后续节点之一
                # 就是在 i --> k --> j 之间找到一个计算时间最小的,其中A[i][k][m-m_prime][0]已经是一个最优子问题了
                for k in all_predecessor_ids[j]:
                    # 如果k已经在之前计算过了,就跳过
                    if i > 0 and k in all_predecessor_ids[i-1]:
                        continue
                    # 设置质数    
                    max_m_prime = 2 if straight_pipeline else (m+1)
                    for m_prime in range(1, max_m_prime): # prime就是看看如何分割
                        # 输入传输时间 input_transfer_time 使用 k 的输出激活尺寸计算
                        input_transfer_time = (2.0 * output_activation_sizes[k]) / \
                            (bandwidth * m_prime)
                        # 输出传输时间 output_transfer_time 使用 j 的输出激活尺寸计算
                        output_transfer_time = None
                        if j < len(output_activation_sizes) -1:
                            output_transfer_time = (2.0 *
                                output_activation_sizes[j]) / (bandwidth * m_prime)
                        # last_stage_time 设置为 k 到 j 的计算时间, compute_times[k+1] 就对应了k的输出
                        last_stage_time = compute_times[k+1][j]
                        if last_stage_time is None:
                            continue
                        # 设置为 k 到 j 的下一阶段参数尺寸
                        last_stage_parameter_size = parameter_sizes[k+1][j]
                        # 设置为 k 到 j 的存储数据尺寸
                        stashed_data_size = (activation_sizes[k+1][j]) + last_stage_parameter_size
                        # 依据机器数据计算
                        stashed_data_size *= math.ceil((num_machines - (m+1)) / m_prime)
                        # 超过机器内存就跳过
                        if use_memory_constraint and stashed_data_size > memory_size:
                            continue
                        # 加上传输时间,所以 last_stage_time 是 (k 到 j 的计算时间) + 传输时间
                        last_stage_time = sum([last_stage_time,
                                               ((4 * (m_prime - 1) *
                                                last_stage_parameter_size) / (bandwidth * m_prime))])
                        last_stage_time /= m_prime
​
                        # 如果从i到k没有边,则跳过
                        if A[i][k][m-m_prime][0] is None:
                            continue
                        # 如果i到k已经有计算时间,则选一个较大的    
                        pipeline_time = max(A[i][k][m-m_prime][0], last_stage_time)
                        if activation_compression_ratio is not None: # 如果压缩
                            # 在(A[i][k][m-m_prime][0], last_stage_time, output_transfer_time, input_transfer_time 之中选一个最大的)
                            input_transfer_time /= activation_compression_ratio
                            # output_transfer_time 也压缩
                            if output_transfer_time is not None:
                                output_transfer_time /= activation_compression_ratio
                            # 选一个大的    
                            pipeline_time = max(pipeline_time, input_transfer_time)
                            if output_transfer_time is not None:
                                pipeline_time = max(pipeline_time, output_transfer_time)
                                
                        # 如果比min_pipeline_time小,则设定 min_pipeline_time,为了下一次循环
                        if min_pipeline_time is None or min_pipeline_time > pipeline_time:
                            optimal_split = (k, m-m_prime) # 选一个优化分割点
                            optimal_num_machines = m_prime
                            min_pipeline_time = pipeline_time
                # 设置            
                A[i][j][m] = (min_pipeline_time, optimal_split, optimal_num_machines)
​
    return A

all_As — результат динамического программирования, пример такой:

all_As = {list: 2}  
 0 = {list: 100} 
  000 = {list: 99} 
   00 = {list: 5} [(0.0070220000000000005, None, 1), (0.1689894, None, 2), (0.14943257777777777, None, 3), (0.1258643, None, 4), (0.107310576, None, 5)]
   01 = {list: 5} [(0.012285, None, 1), (0.0070220000000000005, (0, 0), 1), (0.0865995, (0, 0), 2), (0.07639255555555556, (0, 0), 3), (0.06429175000000001, (0, 0), 4)]
   02 = {list: 5} [(0.012558, None, 1), (0.0070220000000000005, (0, 0), 1), (0.0070220000000000005, (1, 1), 1), (0.0070220000000000005, (1, 1), 2), (0.0070220000000000005, (1, 1), 3)]
   03 = {list: 5} [(0.021096, None, 1), (0.012285, (1, 0), 1), (0.008538, (2, 1), 1), (0.008538, (2, 2), 1), (0.008538, (2, 3), 1)]
   ......
  __len__ = {int} 100
  
1 = {list: 100} 
 000 = {list: 99} 
  00 = {list: 5} [(0.107310576, None, 1), (0.080131832, None, 2), (0.05930489777777778, None, 3), (0.046685052000000005, None, 4), (0.03840710336000001, None, 5)]
  01 = {list: 5} [(0.06429175000000001, None, 1), (0.072057299, None, 2), (0.05690740466666667, None, 3), (0.0460065055, None, 4), (0.03840166136, None, 5)]
  02 = {list: 5} [(0.0070220000000000005, None, 1), (0.043422424, None, 2), (0.037817488, None, 3), (0.031689068, None, 4), (0.026947711359999998, None, 5)]
  03 = {list: 5} [(0.008538, None, 1), (0.0419991328, (2, 0), 1), (0.043422424, (2, 1), 1), (0.0396227304, None, 4), (0.033697556608, None, 5)]
 ......
  __len__ = {int} 100
 __len__ = {int} 2

4.2.3 Различия

Далее мы анализируем разницу между двумя одноименными переменными автора кода.

activation_sizes: Сумма активаций_размера всех предыдущих узлов узла.

for predecessor in all_predecessors:
    states[i].compute_time += ((predecessor.forward_compute_time +
                                predecessor.backward_compute_time) / 1000.0)
    states[i].activation_size += predecessor.activation_size
    states[i].parameter_size += predecessor.parameter_size

Используется для расчета размера спрятанных данных, чтобы увидеть, превышает ли он квоту памяти, настроенную узлом.

stashed_data_size = (activation_sizes[k+1][j]) + last_stage_parameter_size
stashed_data_size *= math.ceil((num_machines - (m+1)) / m_prime)
if use_memory_constraint and stashed_data_size > memory_size:
        continue

output_activation_sizes: Сумма активаций_размера всех расширенных антицепей узла.

for i in range(len(states)):
    for antichain_node in states[i].antichain:
        states[i].output_activation_size += gr.nodes[antichain_node].activation_size

Используется для расчета выходного времени в пути и входного времени в пути.

input_transfer_time = (2.0 * output_activation_sizes[k]) / \
    (bandwidth * m_prime)
output_transfer_time = None
if j < len(output_activation_sizes) -1:
    output_transfer_time = (2.0 *
        output_activation_sizes[j]) / (bandwidth * m_prime)

0x05 Раздел анализа

5.1 логика основной функции

Предыдущий раздел расчета получил только результат оптимизации динамического программирования, который необходимо проанализировать и разделить в analysis_partitioning и назначить каждому этапу.

Следующая логика, связанная с расчетным разделом основной функции, выглядит следующим образом:

  • состояния являются результатом работы DAG антицепи, а all_As — результат оптимизации, полученный с помощью динамического программирования, которых может быть несколько.
  • splits инициализируется только одним элементом из двух кортежей: начальным разбиением (0, len(states)).
  • Пройдитесь по результатам динамической оптимизации all_As, для каждого результата динамической оптимизации пройдитесь по его различным логическим взаимосвязям, вызовите analysis_partitioning для анализа раздела, пройдитесь по разбиениям, разбиения будут постепенно обновляться (точки разделения постепенно уточняются шаг за шагом) , analysis_partitioning возвращает partial_splits.
  • Перейдите partial_splits, для каждой точки разделения получите все предыдущие узлы его расширенной обратной цепи (состояния) и назначьте stage_id этим узлам. Здесь он проходится спереди назад, поэтому значение stage_id постепенно увеличивается.
  • Запишите изображение в файл. Последующий convert_graph_to_model.py преобразует этот файл в модель.
  • Сделайте анализ и сравнение.

Конкретный код выглядит следующим образом:

def main(all_num_machines, profile_filename, network_bandwidths, memory_size,
         straight_pipeline, use_memory_constraint, use_fewer_machines,
         activation_compression_ratio, output_directory,
         print_configuration=True, verbose=False):
    gr = graph.Graph.from_str(open(profile_filename, 'r').read())

    # Zero out all metadata associated with inputs in graph, since the optimizer
    # shouldn't really get a choice with where to place the input (should always
    # be in the first stage).
    # 排除干扰,因为input必然在第一层,没必要让优化器再来选择把输入放在哪里,所以先去除,后续会再加上。
    sources = gr.sources() # 对图的输入进行处理
    nodes_to_remove = OrderedDict()
    for source in sources:
        if source.node_desc.startswith("Input"): # 只处理input
            source.forward_compute_time = 0.0
            source.backward_compute_time = 0.0
            source.activation_size = 0.0
            source.parameter_size = 0.0
            nodes_to_remove[source] = []
            for out_node in gr.edges[source.node_id]:
                nodes_to_remove[source].append(out_node) # 记录这些删除source对应了哪些out节点,因为后续还要处理
            gr.remove_node(source) # 在图中移除这些input source

    # Remove all unneeded sinks that are not used, makes code generation and
    # optimization easier.
    sinks = gr.sinks() # 对图的输出进行处理,移除没有用到的输出
    for sink in sinks:
        if sink.node_desc.startswith("__getitem__"):
            gr.remove_node(sink)

    antichain_gr = gr.antichain_dag() # 得到反链DAG
    states = antichain_gr.topological_sort() # 拓扑排序,得到一个排序好的节点列表

    ###########################################################################
    # 计算阶段
    ###########################################################################
    states_indices = {} # 为每个状态设置index
    for i in range(len(states)):
        states_indices[states[i]] = i
        
##################################### 运行时如下        
#states_indices = {dict: 99} 
# antichain_0 -- ['node4'] = {int} 0
# antichain_1 -- ['node5'] = {int} 1
# antichain_2 -- ['node6'] = {int} 2
# antichain_3 -- ['node7'] = {int} 3
# antichain_4 -- ['node8'] = {int} 4
# ......
         
    # 给每个状态计算出输出激活值大小,具体是通过遍历其反链(增强反链),可以认为就是其必要前序节点给自己的输出
    for i in range(len(states)):
        for antichain_node in states[i].antichain:
            states[i].output_activation_size += gr.nodes[antichain_node].activation_size
       
    # 给每个状态计算其信息,比如计算时间,激活大小,参数大小等等,都是通过前置节点完成的      
    for i in range(len(states)):
        antichain = states[i].antichain
        all_predecessors = gr.all_predecessors(antichain)
        states[i].compute_time = 0.0
        states[i].activation_size = 0.0
        states[i].parameter_size = 0.0
        for predecessor in all_predecessors: # 计算所有前置节点的信息
            states[i].compute_time += ((predecessor.forward_compute_time +
                                        predecessor.backward_compute_time) / 1000.0)
            states[i].activation_size += predecessor.activation_size
            states[i].parameter_size += predecessor.parameter_size
    gr.reset()

    # 得到总体输出大小 & 所有前置节点id,后面计算分区时候需要
    output_activation_sizes = [state.output_activation_size for state in states]
    all_predecessor_ids = [[states_indices[predecessor] for predecessor in
                            antichain_gr.predecessors(states[i].node_id)]
                           for i in range(len(states))]
                           

##################################### 运行时如下      
# output_activation_sizes = {list: 99} 
# 00 = {float} 6291456.0
# 01 = {float} 12582912.0
# 02 = {float} 12582912.0
# 03 = {float} 6553600.0    
# .....
# all_predecessor_ids = {list: 99} 
#  00 = {list: 0} []
#  01 = {list: 1} [0]
#  02 = {list: 2} [0, 1]
#  03 = {list: 3} [0, 1, 2]
#  04 = {list: 4} [0, 1, 2, 3]
#  05 = {list: 5} [2, 3, 4, 0, 1]
#  06 = {list: 6} [2, 3, 4, 0, 1, 5]
#  07 = {list: 7} [6, 2, 3, 4, 0, 1, 5]
# ......
    
    compute_times = [] # 初始化计算时间
    activation_sizes = [] # 初始化激活值大小
    parameter_sizes = [] # 初始化参数值大小
    for i in range(len(states)+1): # 具体计算每一个节点的信息,去除他之前节点的影响
        compute_times_row = []
        activation_sizes_row = []
        parameter_sizes_row = []
        for j in range(len(states)): # 去除之前的节点
            if i == 0: # 列表中第一个节点
                compute_times_row.append(states[j].compute_time) # i 到 j 的计算时间
                activation_sizes_row.append(states[j].activation_size)
                parameter_sizes_row.append(states[j].parameter_size)
            else: # 列表中后续节点
                if j > (i-1):
                    compute_times_row.append(states[j].compute_time -
                        states[i-1].compute_time) # i 到 j 的计算时间
                    activation_sizes_row.append(states[j].activation_size -
                        states[i-1].activation_size)
                    parameter_sizes_row.append(states[j].parameter_size -
                        states[i-1].parameter_size)
                else:
                    compute_times_row.append(None)
                    activation_sizes_row.append(None)
                    parameter_sizes_row.append(None)
        compute_times.append(compute_times_row) # 依据profile估计出系统内部的计算时间,compute_times_row 是 i 节点到 后续节点(i+1, i+2, ...)的计算时间,下面类似
        activation_sizes.append(activation_sizes_row) # 依据profile估计出系统内部的激活值大小
        parameter_sizes.append(parameter_sizes_row) # 依据profile估计出系统内部的参数大小

##################################### 运行时如下  
# compute_times = {list: 100} 
# 000 = {list: 99} [0.0070220000000000005, 0.012285, 0.012558, 0.021096000000,...
# 001 = {list: 99} [None, 0.005263, 0.005535999999999999, 0.014074000000000003, ...
# 002 = {list: 99} [None, None, 0.00027299999999999894, 0.008811000000000003, ...
# 003 = {list: 99} [None, None, None, 0.008538000000000004, 0.008538, ...
# 004 = {list: 99} [None, None, None, None, -3.469446951953614e-18, 0.000191999999...

    counter = 1
    all_As = []
    num_machines_in_machine = 1 #第一个节点就是1
    # all_num_machines, network_bandwidths 是用户在输入中指定
    # 遍历机器集&网络带宽组合。流水线可以是straight(数目为1)或者并行(数目为num_machines)
    for num_machines, network_bandwidth in zip(all_num_machines, network_bandwidths):
        print("Solving optimization problem with %d machines with inter-machine bandwidth of %.2f GB/s" % (num_machines, network_bandwidth / 10**9))
        import numpy as np
        print(np.array(compute_times))
        # 依据目前的信息,以及机器数量,网络带宽等计算分区
        A = compute_partitioning(compute_times, activation_sizes, parameter_sizes,
                                 output_activation_sizes, all_predecessor_ids,
                                 num_machines, num_machines_in_machine,
                                 network_bandwidth,
                                 final_level=(counter==len(network_bandwidths)))
        num_machines_in_machine = num_machines # 因为计算完了,所以设置为本阶段的机器数目
        for i in range(len(compute_times)): # 遍历机器
            for j in range(len(compute_times[0])): # 后续机器
                compute_times[i][j] = A[i][j][-1][0] # 记录计算时间(本阶段最后一个机器的计算时间)
        counter += 1
        all_As.append(A) # 添加逻辑关系,就是里面包括了不同阶段的优化逻辑
    print(np.array(compute_times))
    
    ###########################################################################
    # 我们从这里继续分析
    ###########################################################################
    
    # 分析阶段
    # 在 analyze_partitioning 内部做了具体分析
    # 这里最重要的是对 gr.all_predecessors 做设置,就是设置 gr 之中每个node的stage_id,这样就是利用stage_id把初始流水线重新划分
    splits = [(0, len(states))] # 如何分割,states是反链DAG的结果,所以 splits 初始化时候就只有一个二元组元素:最初的划分 (0, len(states))
    i = len(all_As) - 1 # all_As 就是动态规划得到的优化结果
    while i >= 0: # 遍历优化的出来的各个逻辑关系
        print("======================================")
        print("Level %d" % (i+1))
        print("======================================")
        new_splits = []
        stage_id = 0 # 在后续的convert_graph_to_model.py 之中会使用到
        for (start, end) in splits: # 在分割中遍历,splits会逐步更新
            # 依据新的splits中的二元组重新计算
            partial_splits = \
                analyze_partitioning(all_As[i], states, start, end,
                                     network_bandwidths[i], all_num_machines[i],
                                     activation_compression_ratio,
                                     print_configuration, verbose)
            start_point = start # 起始点
            for split in partial_splits: # 遍历分析得出的节点
                new_splits.append((start_point, split)) # 添加一个新的二元祖
                if i == 0:
                    predecessors = gr.all_predecessors(states[split-1].antichain)
                    for predecessor in predecessors:
                        if predecessor.stage_id is None:
                            predecessor.set_stage_id(stage_id) # 设置所在阶段
                start_point = split # 下一个阶段
                stage_id += 1 # 增加所在阶段
            new_splits.append((start_point, end)) # 添加一个新的二元祖
            if i == 0:                
                predecessors = gr.all_predecessors(states[end-1].antichain)
                for predecessor in predecessors:
                    if predecessor.stage_id is None:
                        predecessor.set_stage_id(stage_id) # 设置所在阶段
            stage_id += 1 # 增加所在阶段
        
        print("Total number of stages: %d" % stage_id)
        splits = new_splits # 加入新的分割
        i -= 1

    # 以下是为了把图写到文件之中。后续convert_graph_to_model.py会把这个文件转换成模型 
    for source in nodes_to_remove: # 之前移除了input节点,现在需要加回到图中
        for out_node in nodes_to_remove[source]: # input对应的哪些输出
            source.stage_id = 0
            gr.add_edge(source, out_node)

    if output_directory is not None:
        total_num_machines = 1
        for num_machines in all_num_machines:
            total_num_machines *= num_machines
        gr.to_dot(os.path.join(output_directory, "gpus=%d" % total_num_machines))
        gr_str = str(gr)
        with open(os.path.join(output_directory, "gpus=%d.txt" % total_num_machines), 'w') as f:
            f.write(gr_str)

    # 以下是为了做分析对比        
    # 计算数据并行需要的时间,以便接下来做比较,这个时间要比动态规划时间长。        
    total_time = states[-1].compute_time # 最后一个阶段的计算时间,是没有经过优化的最初计算时间
    total_parameter_size = states[-1].parameter_size
    data_parallel_total_time = total_time # 先赋值为最后一阶段的计算时间
    num_machines_in_machine = 1 # 本阶段的机器数目
    # 遍历流水线上各个阶段,因为没有优化,所以就是严格按照用户原始配置的流水线阶段来逐一计算
    for (num_machines, network_bandwidth) in zip(all_num_machines, network_bandwidths):
        # 计算传输时间。num_machines是下一阶段流水线机器数目,所以带宽需要乘以这个数字
        data_parallel_communication_time = (
            (4 * (num_machines - 1) * total_parameter_size) /
            (network_bandwidth * num_machines)) / num_machines_in_machine
        # 总时间需要加上传输时间
        data_parallel_total_time = sum(
            [data_parallel_total_time, data_parallel_communication_time]) / num_machines
        # 下个迭代中,本阶段的机器数目需要设置为num_machines
        num_machines_in_machine = num_machines

    # 这个是用动态规划算法得出来的优化时间    
    pipeline_parallel_total_time = A[0][len(states)-1][num_machines-1][0]

    # 可以看到用户需要注意哪些数据
    if verbose:
        print()
        print("Time taken by single-stage pipeline:", total_time)
        print("Time per stage in pipeline:", pipeline_parallel_total_time)
        print("Throughput increase (compared to single machine):",
              total_time / pipeline_parallel_total_time)
        dp_str = ",".join([str(elem) for elem in all_num_machines])
        print(("[Note that single-machine and (%s)-machine DP might not fit "
               "given memory constraints]") % dp_str)
        print("Throughput increase of (%s)-machine DP compared to single "
              "machine:" % dp_str, total_time / data_parallel_total_time)
        print("Throughput increase (compared to (%s)-machine DP):" % dp_str,
              data_parallel_total_time / pipeline_parallel_total_time)
    return pipeline_parallel_total_time, data_parallel_total_time                             
                           

5.2 Фаза анализа

Подробности этапа анализа можно найти в примечаниях ниже.

def analyze_partitioning(A, states, start, end, network_bandwidth, num_machines,
                         activation_compression_ratio, print_configuration, verbose):
    # start,end 是本组节点的起始点,终止点
    metadata = A[start][end-1][num_machines-1] # 这是个三元组  (min_pipeline_time, optimal_split, optimal_num_machines)
    next_split = metadata[1] # metadata[1] 是 optimal_split,即 (k, m-m_prime)
    remaining_machines_left = num_machines
    splits = []
    replication_factors = []
    prev_split = end - 1 # 前一个分割点
    
    while next_split is not None: #是否继续分割
        num_machines_used = metadata[2] # optimal_num_machines
        if verbose:
            print("-------------------------------------")
            print("Number of machines used: %d..." % num_machines_used)
            print("Split between layers %d and %d..." % (next_split[0], next_split[0] + 1))
            print("Split before antichain %s..." % (states[next_split[0]+1].antichain))
        splits.append(next_split[0]+1) # 得到了 k + 1,这是关键点,因为最后返回的是splits
        compute_time = states[prev_split-1].compute_time - \
            states[next_split[0]].compute_time
        parameter_size = states[prev_split-1].parameter_size - \
            states[next_split[0]].parameter_size
​
        dp_communication_time = (4 * (num_machines_used - 1) * parameter_size) \
            / (network_bandwidth * num_machines_used)
        pp_communication_time_input = ( # 下个阶段的数据输入时间
            2.0 * states[next_split[0]].output_activation_size *
            (1.0 / float(num_machines_used))) / network_bandwidth
        pp_communication_time_output = ( # 上个阶段的数据输出时间
            2.0 * states[prev_split-1].output_activation_size *
            (1.0 / float(num_machines_used))) / network_bandwidth
        # 如果需要压缩,就进行压缩
        if activation_compression_ratio is not None:
            pp_communication_time_input /= activation_compression_ratio
            pp_communication_time_output /= activation_compression_ratio
        if activation_compression_ratio is None:
            pp_communication_time_input = 0.0
            pp_communication_time_output = 0.0
​
        compute_time /= num_machines_used # 本阶段计算时间
        dp_communication_time /= num_machines_used # 数据并行时间
​
        if verbose:
            print(("Compute time = %f, Data-parallel communication time = %f, "
                   "Pipeline-parallel communication time = %f...") % (
                compute_time, dp_communication_time,
                max(pp_communication_time_input, pp_communication_time_output)))
        prev_split = splits[-1] # 设定新的前一分割点
        # next_split 格式是 (k, m-m_prime),就是 optimal_split 的格式
        # A[i][j][m] 格式是 (min_pipeline_time, optimal_split, optimal_num_machines)
        metadata = A[start][next_split[0]][next_split[1]]
        next_split = metadata[1] # 设定新的下一次分割点,就是 optimal_split
        replication_factors.append(num_machines_used) # 每个阶段的 replication factor
        remaining_machines_left -= num_machines_used # 剩余机器
    if verbose:
        print("-------------------------------------")
        print("Number of machines used: %d..." % metadata[2])
​
    #     
    num_machines_used = metadata[2]
    remaining_machines_left -= num_machines_used # 剩余的机器
    compute_time = states[prev_split-1].compute_time 
    parameter_size = states[prev_split-1].parameter_size
    dp_communication_time = ((4 * (num_machines_used - 1) * parameter_size) /
                             (network_bandwidth * num_machines_used)) 
    compute_time /= num_machines_used # 计算时间
    dp_communication_time /= num_machines_used # 数据并行通信时间
​
    if verbose:
        print("Compute time = %f, Data-parallel communication time = %f..." %
              (compute_time, dp_communication_time))
        print("-------------------------------------")
    if print_configuration:
        print("Number of machines in budget not used: %d..." %
              remaining_machines_left)
        print()
        print("(Split start, split end) / compute time taken per stage "
              "/ replication factor per stage:")
    # 下面就是打印 (Split start, split end) / compute time taken per stage / replication factor per stage    
    prev_split = start
    splits.reverse() # 
    splits.append(end)
    replication_factors.append(num_machines_used)
    replication_factors.reverse()
    for i in range(len(splits)):
        time = 0.0
        if prev_split > 0:
            time = states[splits[i]-1].compute_time - states[prev_split-1].compute_time
        else:
            time = states[splits[i]-1].compute_time
        if print_configuration:
            print((prev_split, splits[i]), time, replication_factors[i])
        prev_split = splits[i]
    if print_configuration:
        print()
    return splits[:-1] # 最后一个不返回

Мы по-прежнему используем пример для иллюстрации.

Вот сегментация со спины.Пример анализа выглядит следующим образом.Здесь общее количество машин установлено на 10:

Воспоминания в вычислительном разделе,A[i][j][m] = (min_pipeline_time, optimal_split, optimal_num_machines),optimal_split = (k, m-m_prime)является точкой оптимизации на данном этапе.

Итак, в этой функции start = 0, end = 99, поэтому метаданныеA[0][99][10], то есть (0,01903199999999998, (95, 8), 1), next_split = (95, 8), prev_split = end - 1 = 98.

next_split — следующая точка разделения, а splits — текущая последовательность разделения.

Первый цикл while:

Так как next_split = (95, 8), splits = append(next_split[0]+1) = [96], поэтому вычислить состояния[prev_split-1] - состояния[next_split[0]] = состояние[97] - состояние[95 ]. Это делит 0~99 на 0~95 и 96~99.

Затем prev_split = 96, перейдите к A[0] [95] [8], чтобы получить мета = (0,019031999999999993, (78, 7), 1), next_split = (78, 7).

Итак, следующий раунд начинается с точки разделения 78.

второй цикл:

Так как next_split = (78, 7), splits = [96, 79], что является новой разделенной последовательностью. , поэтому вычислить состояния[96-1] - состояния[next_split[0]] = состояние[96] - состояние[78]. Это разбивает 0~99 на 0~78, 79~95 и 96~99, используя splits = [96, 79].

Затем prev_split = 79, перейдите к A[0] [78] [7], чтобы получить meta = (0,011081, (48, 6), 1), next_split = (48, 6).

Таким образом, следующий раунд начинается с точки разделения 48 и так далее.

После цикла while мы получаем разделение = [96, 79, 49, 15, 12, 7, 5, 3, 1].

Поэтому следующий код должен изменить порядок.

prev_split = start
splits.reverse()
splits.append(end)
replication_factors.append(num_machines_used)
replication_factors.reverse()

Получилось: расщепления = {1, 3, 5, 7, 12, 15, 49, 79, 96}. Затем добавьте конец = 99.

Наконец, возвращается splits[:-1], то есть возвращается {1, 3, 5, 7, 12, 15, 49, 79, 96}, а только что добавленный конец удаляется.

И окончательная последовательность сегментации, полученная в соответствии с {1, 3, 5, 7, 12, 15, 49, 79, 96}, равна [(0, 1), (1, 3), (3, 5), (5, 7 ), (7, 12), (12, 15), (15, 49), (49, 79), (79, 96), (96, 99)], этот список будет приведен в последующем «наборе stage" будет использоваться в .

5.3 Установка сцены

На данный момент у нас есть идеальная последовательность сегментации, но дело не закончено.Вспомним назначение алгоритма разбиения: определить время работы всех слоев по результатам профиля, затем с помощью динамического программирования разбить модель, разделить модель на разные этапы и получить количество повторений для каждого этапа.

Поэтому конечной целью анализа является назначение этапа каждому подслою модели.Если некоторые подслои принадлежат одному и тому же этапу, эти подслои в конечном итоге будут назначены одному и тому же рабочему (узлу) для выполнения.

Поскольку задействовано несколько подсетей, мы по-прежнему используем примеры для анализа.

Если разделить на две подсети, предположим:

all_num_machines = [5,5]
network_bandwidths = [800000000, 1000000000]

Инициализировать расщепления = [0,99].

В первом раунде while i = 1,

Для результата разбиения [(0, 99)] обхода примените analysis_partitioning к каждому сегменту, в результате чего partial_splits будет [3, 6, 30, 75, 99].

Наконец, разбиения обновляются до: [(0, 3), (3, 6), (6, 30), (30, 75), (75, 99)].

stage_id в настоящее время не установлен.

Во втором раунде while, i = 0,

Для первого раунда сплитов результаты [(0, 3), (3, 6), (6, 30), (30, 75), (75, 99)] Траверс, для здеськаждый абзацТакже примените analysis_partitioning, например, примените analysis_partitioning к (0,3), примените analysis_partitioning к (3,6), примените Analyze_partitioning к (6,30) и, наконец, получите новые partial_splits как [1, 2 , 3, 4, 5 , 6, 8, 10, 13, 28, 30, 45, 49, 51, 75, 79, 96, 99].

Наконец, разбиения обновляются до: [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 8), (8, 10), (10, 13), (13, 28), (28, 30), (30, 45), (45, 49), (49, 51), (51, 75), (75 , 79), (79, 96), (96, 99)].

Этот списокидеальная последовательность сегментации.

В этом раунде, после получения partial_splits, он пройдетfor split in partial_splits:Затем для каждого разделения используйте

states[split-1].antichainПолучите все предыдущие узлы его расширенной антицепи и назначьте stage_id, соответствующий разделению, этим узлам.

Вспомните, что значит укреплять обратные ссылки:

  • Расширенная обратная цепочка каждого узла включает в себя: собственный узел + несколько узлов предварительного заказа.
  • Для концепции расширенной антицепи это можно понимать так: для узла A, только рассматривая узел Z вместе, он может однозначно определить время работы своего собственного узла.

Итак, для разделения = 1, 1 - 1 = 0, поэтому мы получаемstates[0].antichain, является 'node4', тогда 'node4' помечается stage_id=0, что указывает на то, что 'node4' назначен рабочему узлу, "соответствующему stage_id=0" для обучения.

Если есть сомнения, давайте вспомним, как строится состояние, как упорядоченная «композиция узлов».

antichain_gr = gr.antichain_dag()
states = antichain_gr.topological_sort()

детали следующим образом.

states = {list: 99} 
 00 = {AntichainNode} antichain_0 -- ['node4'] # states[0].antichain
 01 = {AntichainNode} antichain_1 -- ['node5']
 02 = {AntichainNode} antichain_2 -- ['node6']
 03 = {AntichainNode} antichain_3 -- ['node7']
 04 = {AntichainNode} antichain_4 -- ['node8']
 05 = {AntichainNode} antichain_5 -- ['node8', 'node10']
 06 = {AntichainNode} antichain_7 -- ['node8', 'node11']
 07 = {AntichainNode} antichain_10 -- ['node8', 'node12']
 08 = {AntichainNode} antichain_6 -- ['node14']
 09 = {AntichainNode} antichain_8 -- ['node14', 'node15']
 10 = {AntichainNode} antichain_11 -- ['node14', 'node16']
 11 = {AntichainNode} antichain_13 -- ['node14', 'node17']
 12 = {AntichainNode} antichain_9 -- ['node19']
 13 = {AntichainNode} antichain_12 -- ['node20', 'node23']
 14 = {AntichainNode} antichain_18 -- ['node23', 'node20', 'node26']
 15 = {AntichainNode} antichain_17 -- ['node23', 'node20', 'node24']
 16 = {AntichainNode} antichain_32 -- ['node23', 'node20', 'node28']
 17 = {AntichainNode} antichain_31 -- ['node23', 'node20', 'node26', 'node24']
 18 = {AntichainNode} antichain_63 -- ['node23', 'node20', 'node26', 'node28']
 19 = {AntichainNode} antichain_33 -- ['node20', 'node26', 'node29']
 20 = {AntichainNode} antichain_16 -- ['node20', 'node43', 'node23']
 21 = {AntichainNode} antichain_30 -- ['node23', 'node20', 'node43', 'node26']
 22 = {AntichainNode} antichain_29 -- ['node23', 'node20', 'node43', 'node24']
 23 = {AntichainNode} antichain_59 -- ['node23', 'node20', 'node43', 'node28']

Конкретный код для установки сцены выглядит следующим образом:

splits = [(0, len(states))]
i = len(all_As) - 1
while i >= 0:
    new_splits = []
    stage_id = 0
    for (start, end) in splits:
        partial_splits = \
            analyze_partitioning(all_As[i], states, start, end,
                                 network_bandwidths[i], all_num_machines[i],
                                 activation_compression_ratio,
                                 print_configuration, verbose)
        start_point = start
        for split in partial_splits: # 遍历这个偏序列表
            new_splits.append((start_point, split))
            if i == 0: # 最终的while
                # 针对每个节点,找到每个节点的所有反链
                predecessors = gr.all_predecessors(states[split-1].antichain)
                for predecessor in predecessors:
                    if predecessor.stage_id is None:
                        predecessor.set_stage_id(stage_id) # 打上stage id
            start_point = split
            stage_id += 1
        new_splits.append((start_point, end))
        if i == 0: # 最终的while
            predecessors = gr.all_predecessors(states[end-1].antichain)
            for predecessor in predecessors:
                if predecessor.stage_id is None:
                    predecessor.set_stage_id(stage_id) # 打上stage id
        stage_id += 1
    splits = new_splits
    i -= 1

5.4 Резюме

Подытожим, что делают вычислительное и аналитическое разбиение:

  • Граф DAG обратных ссылок был разделен на несколько состояний, и важным свойством каждого состояния является его расширенная обратная ссылка. Состояния являются результатом топологической сортировки усиленной антицепи, и логично тренироваться в таком порядке.
  • Compute_partitioning использует алгоритм динамического программирования для получения результата оптимизации для этих состояний, но этот вычислительный раздел получает только результат оптимизации динамического программирования, который необходимо проанализировать и разделить в analysis_partitioning и назначить каждому слою (этапу).
  • Analyze_partitioning использует результаты оптимизации алгоритма динамического программирования для создания определенных разделов.После сортировки получается результат частичного порядка, который является идеальной последовательностью разделов.
  • По результату analysis_partitioning назначить этап каждому подслою модели, если несколько подслоев принадлежат одному этапу, то эти подслои в конечном итоге будут назначены одному и тому же воркеру (узлу) для выполнения.

0x06 вывод

Выходной файл выглядит следующим образом (отрывок).Как видите, ключевым моментом является добавление этапа к каждому узлу.Как его использовать, мы разберем в следующей статье. Например:

stage_id=0 соответствует node4.

stage_id=1 соответствует node5, node6.

stage_id=2 соответствует node7.

stage_id=3 соответствует node8, node10, node11, node12.

......

детали следующим образом:

node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000 -- stage_id=0
node5 -- EmuBidirLSTM(  (bidir): LSTM(1024, 1024, bidirectional=True)  (layer1): LSTM(1024, 1024)  (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000 -- stage_id=1
node6 -- Dropout(p=0.2) -- forward_compute_time=0.077, backward_compute_time=0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1
node7 -- LSTM(2048, 1024) -- forward_compute_time=3.190, backward_compute_time=5.348, activation_size=6553600.0, parameter_size=50364416.000 -- stage_id=2
node8 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=3
node10 -- Dropout(p=0.2) -- forward_compute_time=0.064, backward_compute_time=0.128, activation_size=6291456.0, parameter_size=0.000 -- stage_id=3
node11 -- LSTM(1024, 1024) -- forward_compute_time=2.491, backward_compute_time=4.203, activation_size=6553600.0, parameter_size=33587200.000 -- stage_id=3
node12 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=3
node14 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=4
node15 -- Dropout(p=0.2) -- forward_compute_time=0.059, backward_compute_time=0.121, activation_size=6291456.0, parameter_size=0.000 -- stage_id=4
node16 -- LSTM(1024, 1024) -- forward_compute_time=2.492, backward_compute_time=4.201, activation_size=6553600.0, parameter_size=33587200.000 -- stage_id=4
node17 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=5
node19 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=5
    node1 -- node4
    node4 -- node5
    node2 -- node5
    node5 -- node6
    node6 -- node7
    node7 -- node8
    node8 -- node10
    node10 -- node11
    node11 -- node12
    node12 -- node14
    node8 -- node14
    node14 -- node15
    node15 -- node16
    node16 -- node17
    node17 -- node19

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

[Анализ исходного кода] PipeDream (1) --- Этап профиля параллельного конвейера глубокого обучения