[Анализ исходного кода] Сервер параметров машинного обучения Paracel (3) ------- Обработка данных

машинное обучение

0x00 сводка

Paracel — это среда распределенных вычислений, разработанная Douban, основанная на парадигме сервера параметров и используемая для решения задач машинного обучения: логистическая регрессия, SVD, матричная факторизация (BFGS, sgd, als, cg), LDA, Lasso... .

Paracel поддерживает параллелизм данных и моделей, предоставляет пользователям простой в использовании коммуникационный интерфейс и является более гибким, чем системы в стиле mapreduce. Paracel также поддерживает асинхронный режим обучения, что ускоряет сходимость итерационных задач. Кроме того, структура программы Paracel очень похожа на структуру последовательной программы, пользователи могут больше сосредоточиться на самом алгоритме и не должны уделять слишком много внимания распределенной логике.

В предыдущей статье была представлена ​​часть обработки данных PyTorch, а в этой статье представлена ​​часть обработки данных Paracel, которую можно подтвердить с помощью PyTorch.

Для полноты некоторые базовые сведения из этой статьи повторяются с предыдущей статьей, а некоторый код, не относящийся к теме, будет удален во время синтаксического анализа.

Другие статьи из серии серверов параметров:

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite (1) ----- PostOffice

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite(2) ----- Коммуникационный модуль Van

[Анализ исходного кода] (3) сервера параметров машинного обучения ps-lite ----- Агент Заказчик

[анализ исходного кода] сервер параметров машинного обучения ps-lite(4) ----- реализация узла приложения

[Анализ исходного кода] Сервер параметров машинного обучения Paracel (1) ----- общая архитектура

[Анализ исходного кода] Сервер параметров машинного обучения Paracel (2) ----- Реализация SSP

[Анализ исходного кода] Распределенный PyTorch (1) --- DistributedSampler загрузки данных

[Анализ исходного кода] Распределенный PyTorch (2) --- DataLoader для загрузки данных

требуется сегментация 0x01

1.1 Преимущества сегментации

Характеристики области глубокого обучения: массивные данные + массивные вычисления. Поскольку время операции слишком велико или модель слишком велика, данные или модель будут сегментированы для решения проблемы параллельно и распределены, что мы часто слышим о параллелизме данных или параллелизме модели.

Проблема сегментации включает в себя сегментацию обучающих данных и обучающих моделей. То есть: сегментация модели для обработки больших моделей, нарезка данных для ускорения обучения.

1.2 Параллелизм данных

Например, на рисунке ниже у каждого узла есть полная копия модели, но обучающие данные каждого узла разные. Процесс обучения выполняется на каждом узле, который мы называем рабочим. Эти рабочие процессы считывают пакет данных, выполняют прямой расчет и обратное распространение, получают градиенты, а затем отправляют свои градиенты на сервер параметров.Сервер параметров выполняет операцию слияния/обновления параметров.Модель отправляется обратно на каждый узел, и каждый вычислительный узел отвечает за обновление параметров локальной модели. Выполните новый раунд итеративного обучения.

img

1.3 Параллелизм модели

Параллелизм модели теоретически возможен, если модель может быть осмысленно сегментирована, загружена в сегменты и отправлена ​​на сервер параметров, и если алгоритм также поддерживает сегментарную параллельную обработку. Сначала мы можем разделить модель на линейную разделимую модель и нелинейную модель (нейронную сеть).

1.3.1 Линейная модель

Для линейной модели мы можем разделить модель и данные в соответствии с измерением признаков и назначить их различным вычислительным узлам.Расчет параметров локальной модели каждого узла не зависит от признаков других измерений и относительно независим друг от друга. и не требует выполнения с другими узлами обмена параметрами. Таким образом, алгоритм оптимизации градиентного спуска можно использовать на каждом вычислительном узле для оптимизации и параллельной обработки моделей.

Некоторые задачи машинного обучения, такие как матричная факторизация, тематическое моделирование и линейная регрессия, часто позволяют сократить время обучения по сравнению с параллелизмом данных, поскольку размеры используемых мини-пакетов не очень велики, что повышает статистическую эффективность.

1.3.2 Нелинейные модели (нейронные сети)

Модель нейронной сети отличается от традиционной модели машинного обучения и имеет следующие характеристики:

  • Нейронная сеть имеет сильную нелинейность, а параметры имеют сильную корреляционную зависимость.
  • Вычисления глубокого обучения — это, по сути, матричные операции, и эти матрицы хранятся в памяти графического процессора.
  • Из-за своей сложности нейронные сети требуют высокой пропускной способности сети для обеспечения связи между узлами.

По этим характеристикам нейронные сети можно разделить на межуровневую сегментацию и внутриуровневую сегментацию:

  • Межуровневая сегментация: сеть делится по горизонтали по слоям или по вертикали по слоям. Каждый вычислительный узел вычисляет, а затем передает параметры другим узлам через RPC для слияния параметров. С точки зрения сети, это разделение структуры нейронной сети.
  • Внутрислойная сегментация: Если матрица слишком велика, видеокарта не может загрузить всю матрицу, что требует разбиения огромной матрицы и размещения ее на разных GPU для расчета, каждый GPU отвечает только за часть модели. С вычислительной точки зрения это разбить матрицу на блоки.

Для получения подробной информации см. следующий рисунок:

img

1.4 Смешанное использование

Иногда параллелизм данных и параллелизм моделей используются одновременно.

  • Для таких моделей, связанных с данными (таких как декомпозиция матрицы, pagerank, svd и т. д., другими словами, модель ключ-значение, а ключ соответствует объекту или человеку), мы можем управлять сегментацией, сегментируя данные , образ модели.
  • Некоторые модели не имеют прямого отношения к данным (например, LR, нейронная сеть и т. д.), в этом случае нужно только разделить данные и модель отдельно.

Например:

  • Сверточный слой в сверточной нейронной сети требует большого объема вычислений, но требует небольшого количества коэффициентов параметров W, что подходит для параллелизма данных.
  • Полносвязный слой требует меньше вычислений и требует большего количества коэффициентов параметров W. Подходит для использования модельного параллелизма.

нравится:img

0x02 Механизм сегментации и формат данных

2.1 Принцип сегментации

Нарезка данных означает сокращение объема вычислений, а то, как вы нарезаете модель, определяет топологию вычислений и связи. Различные методы разбиения могут привести к различиям в производительности вычислений. Итак, мы попытаемся найти некоторые принципы сегментации:

  • При разделении данных постарайтесь убедиться, что размер модели разделения сбалансирован, а связь лучше.

  • Чтобы обеспечить балансировку нагрузки сервера параметров, уменьшить одноточечное узкое место производительности сервера параметров и снизить стоимость передачи по сети (например, при передаче по сети параметров модели встраивания вся задержка и стоимость будут неприемлемыми ), поэтому принципы следующие:

    • Связанные данные/модели находятся на том же сервере параметров.
    • Попробуйте равномерно распределить модель по всем узлам сервера параметров.
    • Для очень маленьких моделей попробуйте разместить их на одном узле сервера параметров.
    • Для моделей с несколькими строками попробуйте разместить одну и ту же строку на одном узле сервера параметров.
  • Предоставьте индивидуальные требования, поскольку каждый алгоритм или различные реализации алгоритма имеют разные требования к методу деления.

2.2 Модель и формат данных

Потому что дистрибуция на самом деле включает в себя не только вычислительную дистрибуцию, но и дистрибуцию хранилища. Это требует, чтобы формат файлов моделей и файлов данных изначально поддерживал сегментацию.

Для моделей, связанных с данными (таких как декомпозиция матрицы, pagerank, svd и т. д., то есть модель выражается в формате "ключ-значение"), способ сегментации модели можно контролировать путем сегментации данных. В других случаях модель не связана напрямую с данными (например, LR, нейронная сеть и т. д.), если данные и модель сегментированы отдельно.

В этом отношении различные компании также приложили свои усилия. Например, модель ангела Tencent хранится в единицах матрицы. По умолчанию Angel делит модель (матрицу) на прямоугольные области одинакового размера, каждой матрице соответствует папка, названная в честь имени матрицы по пути сохранения модели, которая содержит файл метаданных и файл данных матрицы. Матрица имеет только один файл метаданных (метаданные в основном состоят из функций матрицы, индексов разделов и индексов, связанных со строками), но обычно существует несколько файлов данных.

2.3 Механизм данных Paracel

Paracel предоставляет множество методов сегментации данных, которые нам нужно объяснить с нескольких аспектов.

2.3.1 Представление данных

Paracel использует графики и матрицы для представления обучающих данных.

Существует четыре типа графиков:

  • bigraph
  • bigraph_continuous
  • digraph
  • undirected_graph

Paracel использует библиотеку Eigen3 для поддержки операций с матрицей/вектором, поэтому он поддерживает два типа матриц:

  • SparseMatrix
  • MatrixXd

Возьмем для примера биграф.

В математической области теории графов вершины биграфа можно разделить на два непересекающихся множества U и V (то есть U и V являются независимыми множествами), так что вершина в U соединена с вершиной в V.

Определяется следующим образом:

template <class T = paracel::default_id_type>
class bigraph {
 private:
  size_t v_sz = 0; 
  size_t e_sz = 0;
  paracel::dict_type<T, paracel::dict_type<T, double> > adj;
 
 public:
  MSGPACK_DEFINE(v_sz, e_sz, adj);    
 public:
  bigraph();
  bigraph(std::unordered_map<T, std::unordered_map<T, double> > edge_info);
  bigraph(std::vector<std::tuple<T, T> > tpls);
  bigraph(std::vector<std::tuple<T, T, double> > tpls);
  void add_edge(const T & v, const T & w);
  void add_edge(const T & v, const T & w, double wgt);
  // return bigraph data
  std::unordered_map<T, std::unordered_map<T, double> > get_data();
  // traverse bigraph edge using functor func
  template <class F>
  void traverse(F & func);
  // traverse vertex v’s related edges using functor func
  template <class F>
  void traverse(const T & v, F & func);
  // return U bag
  std::vector<T> left_vertex_bag();
  // return U set
  std::unordered_set<T> left_vertex_set();
  // out: tpls
  void dump2triples(std::vector<std::tuple<T, T, double> > & tpls);
  // out: dict
  void dump2dict(std::unordered_map > & dict);
  // return number of vertexes in U
  inline size_t v();
  // return number of edges in bigraph
  inline size_t e();
  // return adjacent info of vertex v
  std::unordered_map<T, double> adjacent(const T & v);
  // return outdegree of vertex u in U
  inline size_t outdegree(const T & u);
  // return indegree of vertex v in V
  inline size_t indegree(const T & v);
};

2.3.2 Загрузка данных

Paracel предоставляет различные интерфейсы для загрузки входных файлов. В последней версии все интерфейсы, связанные с загрузкой, поддерживают только файлы в текстовом формате, которые занимают больше памяти.

Пользователи могут параллельно читать строки, соответствующие разделу данных, а затем создавать собственную структуру данных или напрямую загружать входные данные в виде «графа» или «матрицы» Paracel. В последнем случае для описания структуры входного файла необходимо использовать переменные «pattern» и «mix_flag».PatternТакже определяет метод разделения входных данных.

Paracel определяет несколько шаблонов с помощью переменной «pattern»:

pattern structure line example
linesplit(default) Используйте строки для определения разделов all structures
fmap первое-второе значение (значение установлено на 1,0) первое-второе значение регистра разделяется по первому полю a,b a,b,0.2
smap второй-первый случай (значение установлено на 1,0) второй-первый случай разделения вторым полем a,b a,b,0.2
fsmap поддерживают ту же структуру, что и разделы fmap и smap с двумя полями вместе a,b or a,b,0.2
fvec id,feature1,…,feature k, разделенный по id 1001 0.1
fset attr1, attr2, attr3,… attr1, attr2 value2,attr3

Переменнаяmix_flagУказывает, определена ли связь связи графика/матрицы в одной строке. Как показано в примере ниже, когдаmix_flagЕсли задано значение false, все отношения связи для узла "a" раскрываются в три строки. Если «шаблон» равен «fvec» и «fset», тоmix_flagВсегда правда".

mix_flag example
true а, б, в, г, б, в, г…
true а, б а, в, г б, в б, д …
false(default) а, б а, в а, г б, в б, д …

Как указано выше,patternопределяет не только формат данных, но и стратегию разбиения, при этомmix_flagСообщает Paracel, смешаны ли отношения ссылок в одной строке.

0x03 загрузка данных

3.1 Параллельная обработка

Обработка данных фреймворка ИИ в основном представляет собой параллельную обработку следующим образом:

  • Загрузка/обработка данных использует ЦП.
  • Обучение использует GPU.

В идеале, перед каждой итерацией обучения загружается ЦП, и данные обучения готовы, чтобы обучение могло продолжаться беспрепятственно.

Однако вычислительная мощность графического процессора будет удваиваться каждый год, а скорость улучшения процессора намного отстает от скорости графического процессора, поэтому процессор будет отставать. Это не только проблема недостаточной мощности процессора, но и проблема недостаточной скорости чтения данных из хранилища.

Поэтому машинное обучение предъявляет все более высокие требования к загрузке и предварительной обработке данных: подготовка данных для следующей итерации должна быть завершена за время вычислений GPU, а GPU не может бездействовать в ожидании обучающих данных.

3.2 Трубопровод

Для обучения машинному обучению загрузку данных можно разделить на три этапа:

  • Загружать данные с диска или распределенного хранилища на хост (ЦП).
  • Перенос данных из памяти хоста с возможностью подкачки в закрепленную память хоста.
  • Переместите данные из закрепленной памяти хоста на хост-ГП.

Поэтому популярные фреймворки глубокого обучения выполняют конвейерную обработку в соответствии с характеристиками шагов загрузки и характеристиками разнородного оборудования, тем самым увеличивая пропускную способность процесса обработки данных.

Конвейер обычно включает в себя несколько операторов. Каждый оператор состоит из очереди данных для формирования буфера. После того, как вышестоящий оператор завершит обработку, он будет передан нижестоящему оператору для обработки. Таким образом, каждая задача оператора будет независимой друг от друга.Оператор может использовать мелкозернистую многопоточность/многопроцессность для параллельного ускорения.Каждый оператор может независимо контролировать скорость обработки и память, чтобы адаптироваться к требованиям скорости обработки разные сети.

Если внутренняя очередь данных оператора не пуста, модель будет продолжать получать данные непрерывно, и не будет узких мест из-за ожидания обучающих данных.

Ниже приведена логика последовательной обработки:

+------+            +-----------+           +---------------------------+
|      |            |           |           |                           |
| Data +----------> | Load Data +---------> | Transfer to Pinned Memory |
|      |            |           |           |                           |
+------+            +-----------+           +---------------------------+

Вот логика параллельного конвейера:

                    +------------+
+--------+          |            |
|        |          | Process 1  |
| Data 1 +--------> |            +------+
|        |          | Load Data  |      |
+--------+          |            |      |
                    +------------+      |
                                        |
                                        |
                                        |
                    +------------+      |
+--------+          |            |      |        +-----------------------------+
|        |          | Process 2  |      +------> | Pin-memory process          |
| Data 2 +--------> |            |               |                             |
|        |          | Load Data  +-------------> |                             |
+--------+          |            |               |  Transfer to Pinned Memory  |
                    +------------+       +-----> +-----------------------------+
                                         |
                                         |
                                         |
+--------+          +------------+       |
|        |          |            |       |
| Data 3 +--------> | Process 3  +-------+
|        |          |            |
+--------+          | Load Data  |
                    |            |
                    +------------+

3.3 GPU

Эта статья предназначена для решения проблемы передачи данных на стороне процессора, то есть: загрузка данных с диска, из страничной памяти в фиксированную.

Однако передача данных из закрепленной памяти в GPU (tensor.cuda()) также может быть конвейеризирован с использованием потоков CUDA.

Кроме того, для приложений глубокого обучения требуются сложные многоступенчатые конвейеры обработки данных, включая загрузку, декодирование, кадрирование, изменение размера и многие другие усовершенствования. Эти конвейеры обработки данных, которые в настоящее время выполняются на ЦП, стали узкими местами, ограничивая производительность и масштабируемость обучения и логического вывода.

Nvidia DALI решает проблему узкого места ЦП, помещая предварительную обработку данных в обработку ГП.Пользователи могут создавать конвейеры на основе ГП или ЦП в соответствии с характеристиками своих моделей.

image.png

0x04 Загрузка данных Paracel

Как упоминалось ранее, Paracel использует для загрузки данных графики, матрицы и т. д. Далее давайте посмотрим, как это реализовать.

4.1 Пример кода

Мы выбираем образец из исходного кода, в котором есть слова модельный раздел и раздел данных.

На самом деле смысл здесь такой: параллельная загрузка моделей и параллельная загрузка данных.

class adjust_ktop_s : public paracel::paralg {
​
 public:
  adjust_ktop_s(paracel::Comm comm,
                std::string hosts_dct_str,
                std::string _rating_input,
                std::string _fmt,
                std::string _sim_input,
                int _low_limit,
                std::string _output) : 
      paracel::paralg(hosts_dct_str, comm, _output),
      rating_input(_rating_input),
      fmt(_fmt),
      sim_input(_sim_input),
      low_limit(_low_limit) {}
​
  virtual void solve() {
​
    // load sim_G, model partition 并行加载模型
    auto local_parser = [] (const std::string & line) {
      auto tmp = paracel::str_split(line, '\t');
      auto adj = paracel::str_split(tmp[1], '|');
      std::vector<std::string> stuff = {tmp[0]};
      stuff.insert(stuff.end(), adj.begin(), adj.end());
      return stuff;
    };
    auto parser_func = paracel::gen_parser(local_parser);
    paracel_load_as_graph(sim_G,
                          sim_input,
                          parser_func,
                          "fset");
​
    // load rating_G, data partition 并行加载数据
    auto local_parser_rating = [] (const std::string & line) {
      return paracel::str_split(line, ',');
    };
    auto local_parser_rating_sfv = [] (const std::string & line) {
      std::vector<std::string> tmp = paracel::str_split(line, ',');
      std::vector<std::string> r({tmp[1], tmp[0], tmp[2]});
      return r;
    };
    auto rating_parser_func = paracel::gen_parser(local_parser_rating);
    if(fmt == "sfv") {
      rating_parser_func = paracel::gen_parser(local_parser_rating_sfv);
    }
    paracel_load_as_graph(rating_G,
                          rating_input,
                          rating_parser_func,
                          fmt);
​
    // init rating_G 
    paracel::dict_type<std::string, double> tmp_msg;
    auto init_lambda = [&] (const node_t & uid,
                            const node_t & iid,
                            double v) {
      std::string key = std::to_string(uid) + "_" + std::to_string(iid);
      tmp_msg[key] = v;
    };
    rating_G.traverse(init_lambda);
    paracel_write_multi(tmp_msg);
    paracel_sync();
​
    // learning
    cal_low_peak();
  }
​
 private:
  std::string rating_input, fmt;
  std::string sim_input;
  int low_limit = 1;
  paracel::bigraph<node_t> sim_G;
  paracel::bigraph<node_t> rating_G;
  paracel::dict_type<node_t, int> ktop_result;
  double training_rmse = 0., original_rmse = 0.;
}; // class adjust_ktop_s

4.2 Схема загрузки

Для графически структурированных данных или моделей сначала каждый рабочий процесс будет загружать файлы параллельно через fixload, а затем синхронизировать через paracel_sync.

Примечание. Каждый работник будет выполнять следующие функции, которые будут координироваться и унифицироваться внутри через MPI.

  template <class T, class G>
  void paracel_load_as_graph(paracel::bigraph<G> & grp,
                             const T & fn,
                             parser_type & parser,
                             const paracel::str_type & pattern = "fmap",
                             bool mix_flag = false) {
    if(pattern == "fset") {
      mix_flag = true;
    }
    // TODO: check pattern 
    // load lines
    paracel::loader<T> ld(fn, worker_comm, parser, pattern, mix_flag);
    // 并行加载,fixload 里面有一个all2all交换
    paracel::list_type<paracel::str_type> lines = ld.fixload();
    paracel_sync(); //这里进行同步,确保所有worker都完成加载
    // create graph 
    ld.create_graph(lines, grp); // 此时才开始建立图
    set_decomp_info(pattern);
    lines.resize(0); lines.shrink_to_fit(); paracel::cheat_to_os();
  }

4.3 Загрузка файлов

Данные (модель) Paracel, которые могут состоять из нескольких файлов данных, можно загружать параллельно:

  • Во-первых, разделите файлы в соответствии со списком файлов и размером мира, чтобы гарантировать отсутствие дисбаланса рабочей нагрузки между несколькими загруженными рабочими процессами.
  • Во-вторых, вызовитеstructure_load, чтобы выполнить параллельную загрузку,Совпало с pytorch.
  paracel::list_type<paracel::str_type> fixload() {
    
    paracel::scheduler scheduler(m_comm, pattern, mix);
    auto fname_lst = paracel::expand(filenames); //文件名字列表
    
    // 依据文件列表和world size来分区,确保多个加载的worker之间不会出现workload不均衡的情况
    paracel::partition partition_obj(fname_lst,
                                     m_comm.get_size(),
                                     pattern);
    partition_obj.files_partition();
    // parallel loading lines 此时才并行加载
    auto linelst = scheduler.structure_load(partition_obj);
    m_comm.synchronize();
    if(m_comm.get_rank() == 0) std::cout << "lines got" << std::endl;
    
    return linelst;
  }

4.3.1 Разделение

Так вот возникает вопрос.Если есть 6 воркеров и 12 файлов, как каждый воркер может грузиться параллельно?

Некоторые студенты могут сказать: каждый воркер загружает два файла. Однако эта ситуация применима только к случаю, когда размеры файлов в основном одинаковы.Если размеры файлов несовместимы, например, файл с 15 000 строк, файл с 50 строками и файл с 20 000 строк..., то нагрузка рабочих будет неравномерной, в результате чего эффект параллельной загрузки не может быть достигнут.

Поэтому его нужно распределить по общему количеству строк во всех файлах. Например, 12 файлов содержат в общей сложности 120 000 строк, и каждый рабочий процесс отвечает за загрузку 10 000 строк.

Первый воркер может отвечать за загрузку 10000 строк первого файла, второй воркер отвечает за загрузку последних 5000 строк первого файла и 50 строк второго файла, а также строку xxx третьего файла.... .

4.3.2 Определение раздела

Давайте посмотрим, как определяются разделы:

  • namelst : список имен файлов моделей или файлов данных.
  • slst, где i-й элемент — это начальный номер строки i-го раздела.
  • elst : i-й элемент является конечным номером строки i-го раздела.
  • np : количество всех рабочих.
  • displs : i-й элемент — это номер начальной строки i-го файла во всех строках файла. Например: в первом файле 5 строк, во втором файле 6 строк, в третьем файле 6 строк, тогда displs[0] = 0, displs[1] = 5, displs[2] = 11...

детали следующим образом:

class partition {
​
 public:
  partition(paracel::list_type<paracel::str_type> namelst_in,
            int np_in, paracel::str_type pattern_in)
      : namelst(namelst_in), np(np_in), pattern(pattern_in) {}
​
 private:
  paracel::list_type<paracel::str_type> namelst;
  int np;  // world size
  paracel::str_type pattern;
  paracel::list_type<long> slst, elst, displs;
​
}; // class partition

4.3.3 Сбалансированное разделение

Цель состоит в том, чтобы подсчитать общее количество строк во всех файлах, а затем распределить их поровну между воркерами.

const int BLK_SZ = 32;
​
void files_partition(int blk_sz = paracel::BLK_SZ) {
    if(pattern == "linesplit" || pattern == "fvec") {
      blk_sz = 1;
    }
    slst.resize(0);
    elst.resize(0);
    displs.resize(0);
    displs.resize(namelst.size() + 1, 0); // 扩展为文件个数
    for(size_t i = 0; i < displs.size() - 1; ++i) {
      std::ifstream f(namelst[i], std::ios::ate); // ate作用是写入的数据被加入到文件末尾
      long tmp = f.tellg(); // 得到某个文件的行数
      f.close();
      displs[i + 1] = displs[i] + tmp; // 计算每个文件在总行数中的位置
    }
    long sz = displs.back(); //得到所有文件的总行数
    int nbk = np * blk_sz; // 每个worker负责的范围
    long bk_sz = sz / static_cast<long>(nbk); //每个partition的大小
    long s, e;
    for(int i = 0; i < nbk; ++i) { //  nbk是每个worker负责的范围,其中每个范围是s, e,s和e之间大小是BLK_SZ。
      s = static_cast<long>(i) * bk_sz; // 加载起始行
      if(i == nbk - 1) {
        e = sz;
      } else {
        e = (i + 1) * bk_sz; // 加载终止行
      }
      assert(s < e);
      slst.push_back(s); //插入起始行
      elst.push_back(e); //插入终止行
    }
  }

4.4 Параллельная загрузка

Вспомните предыдущий код, когда мы используем разделы для балансировки нагрузки, мы можем использовать планировщик для реализации параллельной загрузки:

    partition_obj.files_partition();
    // parallel loading lines 此时才并行加载
    auto linelst = scheduler.structure_load(partition_obj);

Планировщик можно рассматривать как планировщик, отвечающий за планирование параллельной загрузки нескольких процессов.

Например, если воркер имеет ранг = 2, то он будет рассчитываться по собственному рангу, чтобы получить начало загрузки этого воркера, а конечная позиция: ст = 64, эн = 96. Затем используйте files_load_lines_impl для конкретной загрузки.

paracel::list_type<paracel::str_type>
scheduler::structure_load(partition & partition_obj) {
  paracel::list_type<paracel::str_type> result;
  int blk_sz = paracel::BLK_SZ;
  if(pattern == "fvec" || pattern == "linesplit") {
    blk_sz = 1;
  }
  int st = m_comm.get_rank() * blk_sz; // 依据自己的rank来计算,看看自己这个进程从哪里加载。
  int en = (m_comm.get_rank() + 1) * blk_sz; // 加载到哪里结束
  auto slst = partition_obj.get_start_list();
  auto elst = partition_obj.get_end_list();
  for(int i = st; i < en; ++i) { // 遍历 64 ~ 96
    // 去找 slst[64 ~ 96], elst[64 ~ 96]的来逐一加载
    auto lines = partition_obj.files_load_lines_impl(slst[i], elst[i]); // 自己应该加载什么
    result.insert(result.end(), lines.begin(), lines.end());
  }
  return result;
}

files_load_lines_impl завершает функцию загрузки определенных файлов.

  template <class F>
  void files_load_lines_impl(long st, long en, F & func) {
    // to locate files index to load from
    int fst = 0;
    int fen = 0;
    long offset;
    // 找到st, en分别属于哪个文件,即在 displs 的位置,找到哪些files
    for(size_t i = 0; i < namelst.size(); ++i) {
      if(st >= displs[i]) {
        fst = i; // st所在文件的idx
      }
      if(en > displs[i + 1]) {
        fen = i + 1; // en所在文件的idx
      }
    }
    assert(fst <= fen);
    bool flag = false;
    // load from files
    for(auto fi = fst; fi < fen + 1; ++fi) { // 遍历加载 fst, fen之间的文件
      if(flag) { 
        offset = 0;
      } else {
        offset = st - displs[fi];
      }
      assert(offset >= 0);
      
      std::ifstream f(namelst[fi]); // 加载某个file
      // 依据文件行数,找到对应在哪个文件之中,然后加载
      if(offset) {
        f.seekg(offset - 1);
        paracel::str_type l;
        std::getline(f, l);
        offset += l.size();
      }
      if(fi == fen) {
        while(offset + displs[fi] < en) {
          paracel::str_type l;
            std::getline(f, l);
            offset += l.size() + 1;
          func(l);
        }
      } else {
        flag = true;
        while(1) {
          paracel::str_type l;
            std::getline(f, l);
            if(l.size() == 0) {
            break;
          }
          func(l);
        }
      }
      f.close();
    } // end of for
  }

4.5 Создание графика

После завершения загрузки будет вызван create_graph для завершения построения графа.

  void create_graph(paracel::list_type<paracel::str_type> & linelst,
                    paracel::bigraph<paracel::default_id_type> & grp) {
    
    paracel::scheduler scheduler(m_comm, pattern, mix); 
    
    // hash lines into slotslst,每个worker构建自己负责的部分
    paracel::list_type<paracel::list_type<paracel::compact_triple_type> > result;
    scheduler.lines_organize(linelst, 
                             parserfunc, 
                             result);
    linelst.resize(0); linelst.shrink_to_fit(); paracel::cheat_to_os();
    m_comm.synchronize();
    
    // alltoall exchange,让每个worker都拥有全部的数据
    paracel::list_type<paracel::compact_triple_type> stf;
    scheduler.exchange(result, stf);
    result.resize(0); result.shrink_to_fit(); paracel::cheat_to_os();
    m_comm.synchronize();
​
    for(auto & tpl : stf) {
      grp.add_edge(std::get<0>(tpl), 
                   std::get<1>(tpl), 
                   std::get<2>(tpl));
    }
    stf.resize(0); stf.shrink_to_fit(); paracel::cheat_to_os();
  }

В процессе построения используйте lines_organize для завершения обработки определенных строк данных.В частности, он анализируется в соответствии с форматом строк в файле.Например, тип файла fset? или фсв? Или bfs и т. д., которые обрабатываются по-разному для каждого формата.

  template <class F = std::function< paracel::list_type<paracel::str_type>(paracel::str_type) > >
  listlistriple_type 
  lines_organize(const paracel::list_type<paracel::str_type> & lines,
                 F && parser_func = default_parser) {
​
    listlistriple_type line_slot_lst(m_comm.get_size());
    paracel::str_type delimiter("[:| ]*");
    for(auto & line : lines) { 
      auto stf = parser_func(line);
      if(stf.size() == 2) {
        // bfs or part of fset case
          // ['a', 'b'] or ['a', 'b:0.2']
          auto tmp = paracel::str_split(stf[1], delimiter);
          if(tmp.size() == 1) {
            paracel::triple_type tpl(stf[0], stf[1], 1.);
            line_slot_lst[h(stf[0], stf[1], npx, npy)].push_back(tpl);
          } else {
            paracel::triple_type tpl(stf[0], tmp[0], std::stod(tmp[1]));
            line_slot_lst[h(stf[0], tmp[0], npx, npy)].push_back(tpl);
          }
      } else if(mix) {
        // fset case
          // ['a', 'b', 'c'] or ['a', 'b|0.2', 'c|0.4']
        // but ['a', '0.2', '0.4'] is not supported here
        for(paracel::default_id_type i = 1; i < stf.size(); ++i) {
            auto item = stf[i];
            auto tmp = paracel::str_split(item, delimiter);
            if(tmp.size() == 1) {
              paracel::triple_type tpl(stf[0], item, 1.);
              line_slot_lst[h(stf[0], item, npx, npy)].push_back(tpl);
            } else {
              paracel::triple_type tpl(stf[0], tmp[0], std::stod(tmp[1]));
              line_slot_lst[h(stf[0], tmp[0], npx, npy)].push_back(tpl);
            }
          } // end of for
      } else {
        // fsv case
        paracel::triple_type tpl(stf[0], stf[1], std::stod(stf[2]));
          line_slot_lst[h(stf[0], stf[1], npx, npy)].push_back(tpl);
      } // end of if
    } // end of for
    return line_slot_lst;
  }

0x05 Сводка

Теперь общая логика резюмируется следующим образом, мы предполагаем, что есть два воркера для загрузки нескольких файлов параллельно. В конце концов каждый рабочий процесс загружает данные и модели в свой собственный процесс.

+------------------------------------------------------------------------------------------------------------------------------------------------------+
| worker 1          +------------------+                                                                                                               |
|                   | partition        |                             +-----------------+                                                               |
|                   |                  |      3 structure_load       | scheduler       |                                                               |
|                   |         slst     +---------------------------> |                 |         5                       6                 8           |
|   1 fixload       |                  |                             |                 +----> paracel_sync +-----> create_graph +----> lines_organize  |
|  +------------->  |         elst     | <---------------------------+                 |                                                               |
|                   |                  |   4 files_load_lines_impl   +-----------------+          ^                        ^                           |
|                   |         displs   |                                                          |                        |                           |
|                   |                  |                                                          |                        |                           |
|                   +------------------+                                                          |                        |                           |
|                                                                                                 |                        |                           |
|                             ^                                                                   |                        |                           |
|                             |                                                                   |                        |                           |
|                             |2 files_partition                                                  |                        |                           |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
                              |                                                                   |                        |
               +------------------------------+                                                   |                        |
               |              |               |                                                   |                        |
               |              |               |                                                   |                        |
          +----+----+     +---+----+     +----+---+                                               |                    7   +
          | File 1  |     | File 2 |     | File n |                                               |            scheduler.exchange
          +----+----+     +---+----+     +----+---+                                               |                        +
               |              |               |                                                   |                        |
               |              |               |                                                   |                        |
               +------------------------------+                                                   |                        |
                              |                                                                   |                        |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
| worker 2                    |2 files_partition                                                  |                        |                           |
|                             v                                                                   |                        |                           |
|                                                                                                 |                        |                           |
|                   +-------------------+                                                         |                        |                           |
|                   | partition         |                                                         |                        |                           |
|                   |                   |                           +------------------+          |                        |                           |
|    1 fixload      |          slst     |   3  structure_load       | scheduler        |          v                        v                           |
|   +-------------> |                   +------------------------>  |                  |                                                    8          |
|                   |          elst     |                           |                  +----> paracel_sync +-----> create_graph +-----> lines_organize |
|                   |                   | <-------------------------+                  |          5                       6                            |
|                   |          displs   |  4 files_load_lines_impl  +------------------+                                                               |
|                   |                   |                                                                                                              |
|                   +-------------------+                                                                                                              |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
​

Телефон такой:

img

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Модель распараллеливания сверточных нейронных сетей — один странный трюк для распараллеливания сверточных нейронных сетей

Проблемы и решения обработки данных в рамках ИИ

torch.utils.data интерпретации исходного кода PyTorch: весь процесс парсинга обработки данных

Расскажите о своем понимании и осведомленности в области крупномасштабного машинного обучения?

Nvidia-DALI от отказа до входа

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel