[Анализ исходного кода] Распределенный PyTorch (1) --- DistributedSampler загрузки данных

машинное обучение глубокое обучение

0x00 сводка

Чтобы лучше ввести загрузку данных сервера параметров Paracel, мы временно вставляем две загрузки данных PyTorch (поскольку количество слов слишком велико, поэтому оно разделено на два), в основном с точки зрения распределенной. Эта статья — просто закуска, и в будущем будет специальная серия для анализа распространения PyTorch.

Другие статьи из серии серверов параметров:

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite (1) ----- PostOffice

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite(2) ----- Коммуникационный модуль Van

[Анализ исходного кода] (3) сервера параметров машинного обучения ps-lite ----- Агент Заказчик

[анализ исходного кода] сервер параметров машинного обучения ps-lite(4) ----- реализация узла приложения

[Анализ исходного кода] Сервер параметров машинного обучения Paracel (1) ----- общая архитектура

[Анализ исходного кода] Сервер параметров машинного обучения Paracel (2) ----- Реализация SSP

0x01 загрузка данных

1.1 Путь ускорения

При распределенном обучении есть три уровня работы, которые необходимо выполнить, чтобы ускорить обучение.

  • Уровень загрузки данных
  • Многомашинный уровень связи
  • уровень кода

существуетплоскость данных, вы можете использовать многопроцессорную параллельную загрузку, чтобы ускорить процесс предварительной обработки данных, а также использовать функции графического процессора для ускорения, такие как Nvidia DALI, чтобы решить проблему узкого места процессора, поместив предварительную обработку данных на обработку графического процессора.

существуетМногомашинный уровень связи, доступны различные библиотеки связи коллекций, такие как NCCL, OpenMPI, Gloo и т. д.

существуетуровень кода, вы можете использовать распределенный API, предоставляемый платформой, или использовать Horovod для преобразования автономного кода для поддержки распределенных задач.

Далее рассмотрим, как ускоряется уровень данных.

1.2 Параллельная обработка

Обработка данных фреймворка ИИ в основном представляет собой параллельную обработку следующим образом:

  • Загрузка/обработка данных использует ЦП.
  • Обучение использует GPU.

В идеале, перед каждой итерацией обучения загружается ЦП, и данные обучения готовы, чтобы обучение могло продолжаться беспрепятственно.

Однако вычислительная мощность графического процессора будет удваиваться каждый год, а скорость улучшения процессора намного отстает от скорости графического процессора, поэтому процессор будет отставать. Это не только проблема недостаточной вычислительной мощности процессора, но и проблема недостаточной скорости чтения данных в деревенском хранилище.

Поэтому машинное обучение предъявляет все более высокие требования к загрузке и предварительной обработке данных: подготовка данных для следующей итерации должна быть завершена за время вычислений GPU, а GPU не может бездействовать в ожидании обучающих данных.

1.3 Трубопровод

Для обучения машинному обучению загрузку данных можно разделить на три этапа:

  • Загружать данные с диска или распределенного хранилища на хост (ЦП).
  • Перенос данных из памяти хоста с возможностью подкачки в закрепленную память хоста.
  • Переместите данные из закрепленной памяти хоста на хост-ГП.

Поэтому популярные фреймворки глубокого обучения выполняют конвейерную обработку в соответствии с характеристиками шагов загрузки и характеристиками разнородного оборудования, тем самым увеличивая пропускную способность процесса обработки данных.

Конвейер обычно включает в себя несколько операторов. Каждый оператор состоит из очереди данных для формирования буфера. После того, как вышестоящий оператор завершит обработку, он будет передан нижестоящему оператору для обработки. Таким образом, каждая задача оператора будет независимой друг от друга.Оператор может использовать мелкозернистую многопоточность/многопроцессность для параллельного ускорения.Каждый оператор может независимо контролировать скорость обработки и память, чтобы адаптироваться к требованиям скорости обработки разные сети.

Если внутренняя очередь данных оператора не пуста, модель будет продолжать получать данные непрерывно, и не будет узких мест из-за ожидания обучающих данных.

Ниже приведена логика последовательной обработки:

+------+            +-----------+           +---------------------------+
|      |            |           |           |                           |
| Data +----------> | Load Data +---------> | Transfer to Pinned Memory |
|      |            |           |           |                           |
+------+            +-----------+           +---------------------------+

Вот логика параллельного конвейера:

                    +------------+
+--------+          |            |
|        |          | Process 1  |
| Data 1 +--------> |            +------+
|        |          | Load Data  |      |
+--------+          |            |      |
                    +------------+      |
                                        |
                                        |
                                        |
                    +------------+      |        +-----------------------------------+
+--------+          |            |      |        |                                   |
|        |          | Process 2  |      +------> | Pin-memory process                |
| Data 2 +--------> |            |               |                                   |
|        |          | Load Data  +-------------> |                                   |
+--------+          |            |               |        Transfer to Pinned Memory  |
                    +------------+       +-----> |                                   |
                                         |       |                                   |
                                         |       +-----------------------------------+
                                         |
+--------+          +------------+       |
|        |          |            |       |
| Data 3 +--------> | Process 3  +-------+
|        |          |            |
+--------+          | Load Data  |
                    |            |
                    +------------+
​

1.4 GPU

Эта статья предназначена для решения проблемы передачи данных на стороне процессора, то есть: загрузка данных с диска, из страничной памяти в фиксированную.

Однако передача данных из закрепленной памяти в GPU (tensor.cuda()) также может быть конвейеризирован с использованием потоков CUDA.

Кроме того, для приложений глубокого обучения требуются сложные многоступенчатые конвейеры обработки данных, включая загрузку, декодирование, кадрирование, изменение размера и многие другие усовершенствования. Эти конвейеры обработки данных, которые в настоящее время выполняются на ЦП, стали узкими местами, ограничивая производительность и масштабируемость обучения и логического вывода.

Nvidia DALI решает проблему узкого места ЦП, помещая предварительную обработку данных в обработку ГП.Пользователи могут создавать конвейеры на основе ГП или ЦП в соответствии с характеристиками своих моделей.

image.png

Далее мы представим загрузку данных PyTorch и в основном с точки зрения распределенности.

0x02 Распределенная загрузка PyTorch

2.1 DDP

pytorch предоставляет множество вариантов обучения с распределенными данными. По мере того, как приложения переходят от простого к сложному, от прототипа к производству, общие траектории разработки могут быть следующими:

  • Если данные и модель можно поместить в один графический процессор и обучить на одном устройстве, в настоящее время не нужно беспокоиться о скорости обучения;
  • Используйте DataParallel с несколькими графическими процессорами для одной машины, если у вас на сервере несколько графических процессоров и вы хотите ускорить обучение с минимальными изменениями кода;
  • Если вы хотите еще больше ускорить обучение и готовы написать немного кода для начала, используйте одну машину с несколькими графическими процессорами DistributedDataParallel;
  • Используйте многомашинные сценарии DistributedDataParallel и запуска, если приложение масштабируется за пределы машин;
  • Используйте torchelastic, чтобы инициировать распределенное обучение, если ожидаются ошибки (например, OOM) или если ресурсы можно динамически подключать и отключать во время обучения.

Наиболее важной частью этой статьи является DDP, распределенное параллельное обучение данных (DDP) — широко распространенный метод обучения с несколькими данными для одной программы. При использовании DDP модель копируется в каждый процесс, а затем каждой копии модели передается разное подмножество выборки данных. DDP отвечает за градиентную связь, чтобы поддерживать синхронизацию реплик модели, и наложение ее на вычисление градиента для ускорения обучения.

2.2 Распределенная нагрузка

Давайте сначала посмотрим на общую структуру распределенной загрузки.

Учитывая пример кода, вы можете видеть, что в основном используются три сущности DataSet, DistributedSampler и DataLoader.

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), sampler=sampler)
for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
        train(loader)

Логическая связь этих трех понятий такова:

  • Dataset: Как вы можете понять из названия, это означает набор данных. Отвечает за инкапсуляцию исходных обучающих данных в структуру данных, распознаваемую Python.Производный класс набора данных должен предоставлять интерфейс для получения отдельных данных.
  • Sampler: Как видно из названия, это сэмплер, который отвечает за метод выборки или стратегию выборки и реализует определенную стратегию извлечения/выборки для получения индекса данных из набора данных для использования в DataLoade. Можно считать, что Сэмплер — командир, отвечающий за решение, где воевать.
  • DataLoader: Отвечает за загрузку данных из набора данных в соответствии с индексом. Поддерживает наборы данных как в стиле Map, так и в стиле Iterable, а также поддерживает загрузку одного или нескольких процессов. Загрузчик - боец ​​для определенного боя, ответственный за бой по приказу Сэмплера.

Конкретная картина такова, вкратце:

  1. DataSet отправляет количество наборов данных в DistributedSampler.
  2. Sampler отправляет индексы данных в Loader по определенным правилам.
  3. Загрузчик загружает данные по индексам.
  4. Загрузчик отправляет данные в модель для обучения.
+------------------------+                     +-----------+
|DistributedSampler      |                     |DataLoader |
|                        |     2 indices       |           |
|    Some strategy       +-------------------> |           |
|                        |                     |           |
|-------------+----------|                     |           |
              ^                                |           |  4 data  +-------+
              |                                |       -------------->+ train |
            1 | length                         |           |          +-------+
              |                                |           |
+-------------+----------+                     |           |
|DataSet                 |                     |           |
|        +---------+     |      3 Load         |           |
|        |  Data   +-------------------------> |           |
|        +---------+     |                     |           |
|                        |                     |           |
+------------------------+                     +-----------+

Поскольку набор данных не является предметом распределенного обучения, в следующей части этой статьи в основном анализируется Sampler.

Суть Sampler заключается в следующем: как позволить каждому воркеру загружать только свою часть набора данных и реализовать ортогональное распределение набора данных между воркерами.

0x03 DistributedSampler

За параллелизм данных и распределенное обучение DistributedSampler отвечает за свою задачу выборки данных.

DistributedSampler — это производный класс от Sampler. Когда DistributedDataParallel использует DistributedSampler, каждый параллельный процесс получает экземпляр DistributedSampler, и этот экземпляр DistributedSampler отправляет инструкции DataLoader, чтобы DataLoader загружал определенные данные.

Стратегия загрузки DistributedSampler отвечает за предоставление только подмножества загруженного набора данных, а подмножества, предоставляемые DistributedSampler, не перекрываются и не пересекаются.

3.1 Инициализация

__init__Код инициализации в основном устанавливает различную информацию о рабочем узле, такую ​​как набор данных набора данных, ранг (глобальный серийный номер графического процессора) и количество копий num_replicas. И вычислить total_size всех выборок.

Вот несколько параметров:

  • набор данных: набор данных выборки.
  • num_replicas: количество процессов, участвующих в распределенном обучении. Если не задано, world_size получается из группы как количество процессов.
  • rank : Порядковый номер текущего процесса, если не задан, то будет получен из группы.
  • shuffle : необходимо ли при выборке перетасовывать индексы.
  • seed : если требуется перетасовка, установите случайное семя.
  • drop_last : если данные не могут быть разделены поровну, следует ли удалить нераспределенные хвостовые данные.
  • эпоха : Набор данных будет перемешиваться каждую эпоху. Как сохранить согласованность набора данных после перемешивания? Это делается через эпохи.

Конкретный код выглядит следующим образом, без обработки исключений.

class DistributedSampler(Sampler[T_co]):
​
    def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
                 rank: Optional[int] = None, shuffle: bool = True,
                 seed: int = 0, drop_last: bool = False) -> None:
​
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.epoch = 0
        self.drop_last = drop_last
        # If the dataset length is evenly divisible by # of replicas, then there
        # is no need to drop any data, since the dataset will be split equally.
        if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                # `type:ignore` is required because Dataset cannot provide a default __len__
                # see NOTE in pytorch/torch/utils/data/sampler.py
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]
        self.total_size = self.num_samples * self.num_replicas
        self.shuffle = shuffle
        self.seed = seed

3.2 Итерационные методы

DistributedSampler реализован как итератор (похожий на цикл), таким образом, используя магические методы абстрактного класса python:

  • __len__(self): когдаlen()Поведение при вызове функции, обычно возвращающее количество элементов в итераторе.
  • __iter__(self): поведение при переборе элементов в контейнере фактически возвращает итератор (обычно сам итератор), и результат каждой итерации будет использоваться в качестве начального значения для следующей итерации.

__iter__Техническая деталь кода:

indices = indices[self.rank:self.total_size:self.num_replicas]

Когда в списке есть двойные кавычки, напримерlist[start:end:step], что значит:

  • старт: исходное положение
  • конец: конечное положение
  • шаг: размер шага

Давайте рассмотрим пример, такой как:

a = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
print(a[0:15:3])
print(a[1:15:3])
print(a[2:15:3])

получить:

[1, 4, 7, 10, 13]
[2, 5, 8, 11, 14]
[3, 6, 9, 12, 15]

так какindices[self.rank:self.total_size:self.num_replicas]Среди них num_replicas — это на самом деле общее количество рангов, поэтому здесь каждый воркер будет строго возвращать часть порядкового номера данных, соответствующую его собственному рангу.

Подводя итог, метод распределения DistributedSampler таков: каждый сегмент непрерывен.num_replicasДанные разбиваются на части один за другим и распределяются поnum_replicasЭто процесс, и данные получаются через ранг каждого работника, так что достигается цель неперекрытия и непересечения, но следует также отметить, что данные, полученные каждым процессом, являются прерывистыми.

Конкретный код выглядит следующим образом:

class DistributedSampler(Sampler[T_co]):
​
    def __iter__(self) -> Iterator[T_co]:
        
        if self.shuffle: # 如果需要shuffle,则会基于epoch和seed进行处理
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            g.manual_seed(self.seed + self.epoch)
            indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
        else: # 否则直接返回数据集长度序列
            indices = list(range(len(self.dataset)))  # type: ignore[arg-type]
​
        # 是否需要补齐数据
        if not self.drop_last:
            # add extra samples to make it evenly divisible
            padding_size = self.total_size - len(indices)
            if padding_size <= len(indices):
                indices += indices[:padding_size]
            else:
                indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
        else:
            # remove tail of data to make it evenly divisible.
            indices = indices[:self.total_size]
        assert len(indices) == self.total_size
​
        # subsample
        # 依据自己的rank,依次返回自己的数据序号
        indices = indices[self.rank:self.total_size:self.num_replicas]
        assert len(indices) == self.num_samples
​
        return iter(indices)
​
    def __len__(self) -> int:
        return self.num_samples
​
    def set_epoch(self, epoch: int) -> None:
        r"""
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.
​
        Args:
            epoch (int): Epoch number.
        """
        self.epoch = epoch

Логика между внутренними переменными следующая:

  1. Получить длину длины из набора данных;
  2. Получить num_replicas (с несколькими рангами) из конфигурации, свой ранг;
  3. Получите num_samples и total_size в соответствии с длиной набора данных и num_replicas;
  4. Наконец, укажите index = index[rank: total_size: num_replicas];
  5. Вернуть индексы в DataLoader
+-----------------------------------------------------------+
| DistributedSampler                                        |
|                                                           |
|    2                 2                                    |
|  rank   +---+  num_replicas                               |
|    +    |            +                                    |
|    |    |            | 3                                  |
|    |    |            |                                    |
|    |    |            v                                    |
|    |    |  num_samples = ceil(len(dataset)/ num_replicas) |
|    |    |            +                                    |
|    |    |            |                                    |
|    |    |            | 3                                  |
|    |    |            v                                    |
|    |    |      total_size = num_samples * num_replicas    |
|    |    |            +                                    |
|    |4   |4           |4                                   |
|    |    |            |                                    |
|    v    v            v                                    |
|  +-+----+------------+--------------------------------+   |          +-------------+
|  |                                                    |   | indices  |             |
|  | indices = indices[rank: total_size: num_replicas]  +------------->+  DataLoader |
|  |              ^                                     |   |    5     |             |
|  |              |                                     |   |          +-------------+
|  |              |                                     |   |
|  +----------------------------------------------------+   |
+-----------------------------------------------------------+
                  |
                1 | length
           +------+--------+
           |   DataSet     |
           +---------------+

3.3 Перемешать набор данных

Набор данных перемешивается каждую эпоху, ноКак различные процессы поддерживают согласованность набора данных после перемешивания?

DistributedSampler использует текущую эпоху в качестве начального числа случайных чисел и настраивает ее перед вычислением индекса, чтобы гарантировать, что разные процессы используют одно и то же начальное число случайных чисел, чтобы перемешанные данные могли быть согласованными.

3.3.1 Использование

Как видно из кода ниже, если требуется распределенное обучение, установите эпоху на сэмплере.

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), ...,
                            sampler=sampler)
    for epoch in range(start_epoch, n_epochs):
        if is_distributed:
            sampler.set_epoch(epoch) # 这设置epoch
        train(loader)

3.3.2 python

Конкретно соответствует реализации DistributedSampler.

Установить эпоху очень просто, просто настройте ее.

    def set_epoch(self, epoch: int) -> None:
        r"""
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.
​
        Args:
            epoch (int): Epoch number.
        """
        self.epoch = epoch

Конкретное использование установки случайного начального числа находится в функции итерации:

    def __iter__(self) -> Iterator[T_co]:
        if self.shuffle:
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            g.manual_seed(self.seed + self.epoch) # 这里设置随机种子
            indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
        else:
            indices = list(range(len(self.dataset)))  # type: ignore[arg-type]
            
        # 省略其他代码    

3.3.3 C++

Мы также можем заранее взглянуть на DistributedRandomSampler в коде C++, который представляет собой C++ API, который также играет ту же роль в python.

Мы можем увидеть настройку seed и shuffle следующим образом:

void DistributedRandomSampler::reset(optional<size_t> new_size) {
  size_ = new_size.value_or(size_);
  populate_indices();
​
  std::mt19937 rand(epoch_);
  std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
  sample_index_ = begin_index_;
}

3.3.4 Резюме

Мы расширяем текущую логику следующим образом:

  1. Получить длину длины из набора данных;
  2. Получить num_replicas (с несколькими рангами) из конфигурации, свой ранг, эпоха;
  3. Используйте эпоху, чтобы установить случайное начальное число;
  4. Используйте случайное начальное число для зашифровки индексов набора данных, и зашифрованные индексы будут использоваться все время;
  5. Получите num_samples и total_size в соответствии с длиной набора данных и num_replicas;
  6. В сочетании с приведенными выше различными условиями данных, наконец, дайте index = index[rank: total_size: num_replicas];
  7. Вернуть индексы в DataLoader
+-----------------------------------------------------------------+
| DistributedSampler                                              |
|                                                                 |
|                                                                 |
|    2       3                                                    |
|   epoch +------>  manual_seed(seed + epoch) +---------> indices |
|                                                              +  |
|                                                              |  |
|                                                              |  |
|    2                 2                                       |  |
|  rank   +---+  num_replicas                                4 |  |
|    +    |            +                                       |  |
|    |    |            | 5                                     |  |
|    |    |            |                                       |  |
|    |    |            v                                       |  |
|    |    |  num_samples = ceil(len(dataset)/ num_replicas)    |  |
|    |    |            +                                       |  |
|    |    |            |                                       |  |
|    |    |            | 5                                     |  |
|    |    |            v                                       |  |
|    |    |      total_size = num_samples * num_replicas       |  |
|    |    |            +                                       |  |
|    |6   |6           |6                                      |  |
|    |    |            |                                       |  |
|    v    v            v                                       |  |
|  +-+----+------------+--------------------------------+      |  |
|  |                                                    |      |  |
|  | indices = indices[rank: total_size: num_replicas]  | <----+  |
|  |              ^                          +          |         |
|  |              |                          |          |         |
|  |              |                          |          |         |
|  +----------------------------------------------------+         |
+-----------------------------------------------------------------+
                  |                          |
                1 | length                7  v indices
                  |
          +-------+--------+             +-------------+
          |                |             |             |
          |    DataSet     |             |  DataLoader |
          |                |             |             |
          +----------------+             +-------------+

3.4 Sampler in C++

Поскольку некоторые компании разрабатывают на C++, им также срочно необходимо использовать pytorch, поэтому pytorch также предоставляет C++ API, далее мы увидим, как его реализовать.

3.4.1 Определения

Его класс определен в: torch\csrc\api\include\torch\data\samplers\distributed.h

Мы видим, что DistributedSampler — это базовый класс, а основные переменные-члены:

  • size_t size_ размер файла
  • size_t num_replicas_ количество реплик
  • size_t rank_ Какой процесс или GPU соответствует этому семплеру
  • size_t epoch Эпоха данного обучения
  • bool allow_duplicates_ разрешить ли резервное копирование

Далее идут два производных класса: DistributedRandomSampler и DistributedSequentialSampler.

/// A `Sampler` that selects a subset of indices to sample from and defines a
/// sampling behavior. In a distributed setting, this selects a subset of the
/// indices depending on the provided num_replicas and rank parameters. The
/// `Sampler` performs a rounding operation based on the `allow_duplicates`
/// parameter to decide the local sample count.
template <typename BatchRequest = std::vector<size_t>>
class DistributedSampler : public Sampler<BatchRequest> {
 public:
  DistributedSampler(
      size_t size,
      size_t num_replicas = 1,
      size_t rank = 0,
      bool allow_duplicates = true)
      : size_(size),
        num_replicas_(num_replicas),
        rank_(rank),
        epoch_(0),
        allow_duplicates_(allow_duplicates) {}
​
  /// Set the epoch for the current enumeration. This can be used to alter the
  /// sample selection and shuffling behavior.
  void set_epoch(size_t epoch) {
    epoch_ = epoch;
  }
​
  size_t epoch() const {
    return epoch_;
  }
​
 protected:
  size_t local_sample_count() {
    if (allow_duplicates_) {
      return (size_ + num_replicas_ - 1) / num_replicas_;
    } else {
      return size_ / num_replicas_;
    }
  }
​
  size_t size_;
  size_t num_replicas_;
  size_t rank_;
  size_t epoch_;
  bool allow_duplicates_;
};
​
​
/// Select samples randomly. The sampling order is shuffled at each `reset()`
/// call.
class TORCH_API DistributedRandomSampler : public DistributedSampler<> {
 public:
  DistributedRandomSampler(
      size_t size,
      size_t num_replicas = 1,
      size_t rank = 0,
      bool allow_duplicates = true);
​
  /// Resets the `DistributedRandomSampler` to a new set of indices.
  void reset(optional<size_t> new_size = nullopt) override;
​
  /// Returns the next batch of indices.
  optional<std::vector<size_t>> next(size_t batch_size) override;
​
  /// Serializes the `DistributedRandomSampler` to the `archive`.
  void save(serialize::OutputArchive& archive) const override;
​
  /// Deserializes the `DistributedRandomSampler` from the `archive`.
  void load(serialize::InputArchive& archive) override;
​
  /// Returns the current index of the `DistributedRandomSampler`.
  size_t index() const noexcept;
​
 private:
  void populate_indices();
​
  size_t begin_index_;
  size_t end_index_;
  size_t sample_index_;
  std::vector<size_t> all_indices_;
};
​
/// Select samples sequentially.
class TORCH_API DistributedSequentialSampler : public DistributedSampler<> {
 public:
  DistributedSequentialSampler(
      size_t size,
      size_t num_replicas = 1,
      size_t rank = 0,
      bool allow_duplicates = true);
​
  /// Resets the `DistributedSequentialSampler` to a new set of indices.
  void reset(optional<size_t> new_size = nullopt) override;
​
  /// Returns the next batch of indices.
  optional<std::vector<size_t>> next(size_t batch_size) override;
​
  /// Serializes the `DistributedSequentialSampler` to the `archive`.
  void save(serialize::OutputArchive& archive) const override;
​
  /// Deserializes the `DistributedSequentialSampler` from the `archive`.
  void load(serialize::InputArchive& archive) override;
​
  /// Returns the current index of the `DistributedSequentialSampler`.
  size_t index() const noexcept;
​
 private:
  void populate_indices();
​
  size_t begin_index_;
  size_t end_index_;
  size_t sample_index_;
  std::vector<size_t> all_indices_;
};

3.4.2 Реализация

Конкретная реализация класса находится в: torch\csrc\api\src\data\samplers\distributed.cpp

3.4.2.1 DistributedRandomSampler

Давайте сначала посмотрим на DistributedRandomSampler.

Его функция состоит в том, чтобы получить зашифрованный индекс в соответствии с rank_ этого работника. Мы объясняем функции в логическом порядке.

  • Во время инициализации будет вызываться reset(size_) для перемешивания.

  • Функция сброса предназначена для того, чтобы сэмплер указывал на новый набор индексов:

    • Во-первых, вызовите populate_indices(), чтобы установить начальный индекс, соответствующий этому рангу, и завершить индекс.
    • В функции populate_indices задается индекс данных, то есть настраивается all_indices_, а также настраивается начальный индекс и конечный индекс этого ранга.
    • Затем перетасуйте all_indices_.
  • Следующая функция относительно проста, потому что основная работа выполняется сбросом, поэтому данные в это время были случайным образом нарушены, поэтому найдите начальную позицию и верните количество строк в данных.

Поскольку функция йота используется ниже, некоторые учащиеся могут быть с ней не знакомы.Вот роль йоты:

std::vector<int> test;
test.resize(10);        
std::iota(test.begin(), test.end(), 5);// 将从 5 开始的 10 次递增值赋值给 test
​
//test结果:5 6 7 8 9 10 11 12 13 14

Конкретный код выглядит следующим образом:

DistributedRandomSampler::DistributedRandomSampler(
    size_t size,
    size_t num_replicas,
    size_t rank,
    bool allow_duplicates)
    : DistributedSampler(size, num_replicas, rank, allow_duplicates),
      begin_index_(0),
      end_index_(0),
      sample_index_(0) {
  // shuffle first time.
  reset(size_);
}
​
// 每次加载新epoch时候,都要调用reset
void DistributedRandomSampler::reset(optional<size_t> new_size) {
  size_ = new_size.value_or(size_);
  populate_indices();
​
  std::mt19937 rand(epoch_);
  // 对于数据进行shuffle
  std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
  sample_index_ = begin_index_;
}
​
void DistributedRandomSampler::populate_indices() {
  size_t num_local_samples = local_sample_count();
  // 得到样本数量
  size_t sample_count =
      num_replicas_ == 1 ? size_ : num_local_samples * num_replicas_;
  all_indices_.resize(sample_count);
    
  // std::iota 的作用是用顺序递增的值赋值指定范围内的元素
  // 这里是给all_indices_设置从0开始到sample_count这些数值
  std::iota(std::begin(all_indices_), std::end(all_indices_), 0);
  // 如果sample count大于size_,则需要给多出来的那些index再赋一些数值
  for (size_t i = size_; i < sample_count; ++i) {
    // we may have added duplicate samples to make all
    // replicas to have the same number of samples.
    all_indices_[i] = i - size_;
  }
  begin_index_ = rank_ * num_local_samples; // 对应本rank的起始index
  end_index_ = begin_index_ + num_local_samples; // 对应本rank的终止index
  sample_index_ = begin_index_;
}
​
size_t DistributedRandomSampler::index() const noexcept {
  return sample_index_;
}
​
// 注意,每次加载新epoch时候,都要调用reset,因此对于next函数来说,工作已经很小
optional<std::vector<size_t>> DistributedRandomSampler::next(
    size_t batch_size) {
  if (sample_index_ == end_index_) { // 已经提取完数据
    return nullopt;
  }
​
  size_t end = sample_index_ + batch_size; // 本次迭代的终止位置
  if (end > end_index_) {
    end = end_index_;
  }
​
  auto iter = all_indices_.begin(); // 因为此时数据已经被随机打乱了,找到起始位置即可
  std::vector<size_t> res(iter + sample_index_, iter + end); // 从所有数据中提取前面若干行
  sample_index_ = end;
  return res;
}
3.4.2.2 DistributedSequentialSampler

Затем взгляните на DistributedSequentialSampler.

Его функция состоит в том, чтобы получить индекс заказа в соответствии с rank_ этого работника. Мы объясняем функции в логическом порядке.

  • Функция сброса намного проще, просто используйте populate_indices, чтобы привести индекс в порядок.
  • Следующая функция относительно сложна не только для последовательного возврата индекса, но и для установки следующей начальной позиции.
DistributedSequentialSampler::DistributedSequentialSampler(
    size_t size,
    size_t num_replicas,
    size_t rank,
    bool allow_duplicates)
    : DistributedSampler(size, num_replicas, rank, allow_duplicates),
      begin_index_(0),
      end_index_(0),
      sample_index_(0) {
  populate_indices(); // 这里会设定本rank对应的起始位置
}
​
void DistributedSequentialSampler::reset(optional<size_t> new_size) {
  size_t size = new_size.value_or(size_);
  if (size != size_) {
    size_ = size;
    populate_indices();
  } else {
    sample_index_ = begin_index_;
  }
}
​
void DistributedSequentialSampler::populate_indices() {
  begin_index_ = rank_ * local_sample_count(); // 本rank对应的起始位置
  end_index_ = begin_index_ + local_sample_count();
  sample_index_ = begin_index_;
}
​
size_t DistributedSequentialSampler::index() const noexcept {
  return sample_index_;
}
​
optional<std::vector<size_t>> DistributedSequentialSampler::next(
    size_t batch_size) {
  if (sample_index_ == end_index_) { // 已经循环结束
    return nullopt;
  }
​
  size_t end = sample_index_ + batch_size; // 本次的终止行
  if (end > end_index_) {
    end = end_index_;
  }
​
  std::vector<size_t> res(end - sample_index_); // 返回的vector大小
  // 给res设置从sample_index_开始递增(end - sample_index_)这么大的这些数值,这就是顺序返回了index
  std::iota(std::begin(res), std::end(res), sample_index_);
  if (end >= size_) {
    for (size_t& index : res) { //遍历 vector,得到本次的index
      index = index % size_;
    }
  }
  sample_index_ = end; // 设置下次开始行
  return res;
}

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Модель распараллеливания сверточных нейронных сетей — один странный трюк для распараллеливания сверточных нейронных сетей

Проблемы и решения обработки данных в рамках ИИ

torch.utils.data интерпретации исходного кода PyTorch: весь процесс парсинга обработки данных

Расскажите о своем понимании и осведомленности в области крупномасштабного машинного обучения?

Nvidia-DALI от отказа до входа

pytorch (распределенные) данные параллельной личной практики - DataParallel/DistributedDataParallel