[Анализ исходного кода] Распределенное эластичное обучение PyTorch (6) --- мониторинг/отказоустойчивость

машинное обучение PyTorch

0x00 сводка

Что касается эластичного обучения PyTorch, мы до сих пор представляли агента и рандеву отдельно, но некоторые части не были подробно рассмотрены, например, мониторинг.Эта статья объединяет их и проводит общую логическую сортировку эластичного обучения.

Статьи о тренировках на эластичность следующие:

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (1) --- общая идея

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (2) --- запуск и процесс с одним узлом

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (3) --- агент

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (4) --- Архитектура и логика рандеву

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (5) --- Механизм рандеву

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (5) --- Механизм рандеву

0x01 Общая логика

Нужно смотреть на системную логику с нескольких сторон, примерно сверху вниз, от целого к локальному.

1.1 Перспектива кластера узлов

Давайте сначала посмотрим на это с точки зрения кластера узлов, который можно рассматривать как вид эластичной системы сверху вниз с высоты птичьего полета. С этой точки зрения агент работает на каждом узле.Агент включает рандеву, который отвечает за распределенное согласование.Агент также отвечает за запуск рабочих процессов и мониторинг рабочих процессов.

1.2 Общая логическая схема Агента

Затем мы углубляемся в агентство.Из предыдущей статьи мы знаем, что текущая общая логика выглядит следующим образом.

  • 1) позвонить_initialize_workersЗапустить рабочий процесс, то есть запустить несколько процессов для параллельного выполнения пользовательской программы для обучения.
    • 2) Вызов _rendezvous, который внутри:
      • Вызовите next_rendezvous для обработки изменений членства,
      • Вызовите _assign_worker_ranks, чтобы установить ранги для рабочих.
    • 3) Вызвать _start_workers для запуска рабочих.
  • 4) Вызовите _monitor_workers для мониторинга результатов выполнения этих процессов.

1.3 Угол наблюдения

Ядром эластичной тренировки является мониторинг/динамическая обработка, поэтому мы углубимся в модуль мониторинга для анализа. С точки зрения мониторинга конкретная логика основного цикла _invoke_run агента агента выглядит следующим образом:

  • Вызовите _initialize_workers, чтобы запустить воркеры.
    • Вызовите _rendezvous, который внутренне:
      • Вызовите next_rendezvous для обработки изменений членства,
      • Вызовите _assign_worker_ranks, чтобы установить ранги для рабочих.
    • Вызовите _start_workers, чтобы запустить рабочих.
  • Программа входит в цикл while, а затем периодически опрашивает и отслеживает статус выполнения пользовательской программы с помощью _monitor_ worker-ов и выносит суждения в зависимости от ситуации.
  • Если рабочий процесс дает сбой или неработоспособен, перейдите в состояние elif в {WorkerState.UNHEALTHY, WorkerState.FAILED}: здесь.
    • Сначала вызовите _restart_workers для перезапуска, чтобы начать новое рандеву и перезапустить рабочий процесс.
    • При превышении максимального количества перезапусков задача закрывается.
  • Если программа работает нормально, перейдите в состояние == WorkerState.HEALTHY здесь.
    • При масштабировании ожидается новый узел, и все рабочие процессы будут перезапущены.

Конкретный код выглядит следующим образом:

    def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
        # NOTE: currently only works for a single role

        spec = self._worker_group.spec
        role = spec.role

        self._initialize_workers(self._worker_group) # 启动worker
        monitor_interval = spec.monitor_interval
        rdzv_handler = spec.rdzv_handler

        while True:
            assert self._worker_group.state != WorkerState.INIT
            # 定期监控
            time.sleep(monitor_interval)
            # 监控客户程序运行情况
            run_result = self._monitor_workers(self._worker_group)
            state = run_result.state # 进程运行情况
            self._worker_group.state = state

            if state == WorkerState.SUCCEEDED:
                # 程序正常结束
                self._exit_barrier() # 有一个成功了就全部结束
                return run_result
            elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
                # 程序出错
                if self._remaining_restarts > 0: # 重试
                    self._remaining_restarts -= 1
                    self._restart_workers(self._worker_group) # 进行重启
                else:
                    self._stop_workers(self._worker_group) # 重试次数达到,结束workers
                    self._worker_group.state = WorkerState.FAILED
                    self._exit_barrier()
                    return run_result
            elif state == WorkerState.HEALTHY:
								# 程序正常运行
                # 节点成员关系有变化,比如scale up
                # membership changes do not count as retries
                num_nodes_waiting = rdzv_handler.num_nodes_waiting()
                group_rank = self._worker_group.group_rank
                # 如果有新的节点在waiting,就重启所有workers
                if num_nodes_waiting > 0:
                    self._restart_workers(self._worker_group)
            else:
                raise Exception(f"[{role}] Worker group in {state.name} state")

Уточняем его снова, как показано в скетче:

  _initialize_workers  <---------------------------------+                 Node 1    +   Node 2                  _initialize_workers
           +                                             |                           |                                   +
           |                                             |                           |                                   |
           |                                             |  +-----------------+      |      +-----------------+          |
           v                                             |  |RendezvousHandler|    sync     |RendezvousHandler|          v
      _rendezvous +---------------------------------------->+                 | <----+----> |                 +<---+ _rendezvous
           +                          next_rendezvous    |  |                 |      |      |                 |          +
           |                                             |  |                 |      |      |                 |          |
    _assign_worker_ranks                                 |  |                 |  heartbeat  |                 |          |
           |                                             |  |                 | <----+----> |                 |
           v                                             |  +-----------------+      |      +-----------------+          v
     _start_workers                                      |                           |                              _start_workers
           +                                             |                           |                                   +
           |                                             |                           |                                   |
           |                                             |                           |                                   |
           v                                             |                           |                                   v
     +-----+-------------------------------------------------------+                 |                          +--------+---------+
     |                                                   |         |                 |                          |                  |
     |state = _monitor_workers                           |         |                 |                          |                  |
     |   +                                               |         |                 |                          |                  |
     |   |                                               |         |                 |                          |                  |
     |   | UNHEALTHY,FAILED   1. Process fail            |         |                 |                          |                  |
+--> |   +-----------------> _restart_workers +--+       |         +-->              |                          |                  |
|    |   |                                       |       +         |  |              |                          |                  |
|    |   |                                       +--> _stop_workers|  |              |                          |  LOOP Every 30S  |
|    |   | HEALTHY            2. Node change     |                 |  |              |                          |                  |
|    |   +-----------------> _restart_workers +--+                 |  |              |                          |                  |
|    |   |                                                         |  |              |                          |                  |
|    |   |                                                         |  |              |                          |                  |
|    |   | SUCCEEDED                                               |  |              |                          |                  |
|    |   |                                                         |  |              |                          |                  |
|    |   | 3. exit                                                 |  |              |                          |                  |
|    |   |                                                         |  |              |                          |                  |
|    +-------------------------------------------------------------+  |              |                          |                  |
|        |                                                            |              |                          |                  |
<---------------------------------------------------------------------+              |                          +--------+---------+
         |        LOOP  Every 30S                                                    |                                   |
         |                                                                           |                                   |
         v                                                                           |                                   v
       _exit_barrier                                                                 +                             _exit_barrier

Мобильный телефон показан на рисунке:

Или вы можете увидеть изображение ниже, изображение взято изКолонка Calling.com/afraid/408382623…

0x02 Мультипроцесс

Механизм мониторинга заключается в наблюдении за несколькими работающими обучающими работниками, что включает в себя запуск и мониторинг нескольких процессов.Нам нужно ввести несколько процессов. Это зависит от записи запуска рабочего процесса.

2.1 Запуск рабочих

_start_workersВызовите start_processes для запуска рабочих процессов, _start_method по умолчанию — «spawn». То есть запускается несколько процессов, и программа пользователя выполняется параллельно. В то же время будут контролироваться текущие результаты этих процессов. Среди параметров start_processesentrypointиargsЭто пользовательская команда и параметры, точка входа может быть функцией или строкой.

Затем _start_workers сохраняет результат многопоточности, запущенной методом start_processes, в _pcontext, а затем использует _pcontext для продолжения управления.Например, чтобы завершить работу, нужно напрямую вызвать метод close _pcontext.

    @prof
    def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
        spec = worker_group.spec        store = worker_group.store
        assert store is not None
        master_addr, master_port = super()._get_master_addr_port(store)
        restart_count = spec.max_restarts - self._remaining_restarts

        use_agent_store = spec.rdzv_handler.get_backend() == "static"

        args: Dict[int, Tuple] = {}
        envs: Dict[int, Dict[str, str]] = {}
        for worker in worker_group.workers:
            local_rank = worker.local_rank
            worker_env = {
                "LOCAL_RANK": str(local_rank),
                "RANK": str(worker.global_rank),
                "GROUP_RANK": str(worker_group.group_rank),
                "ROLE_RANK": str(worker.role_rank),
                "ROLE_NAME": spec.role,
                "LOCAL_WORLD_SIZE": str(spec.local_world_size),
                "WORLD_SIZE": str(worker.world_size),
                "GROUP_WORLD_SIZE": str(worker_group.group_world_size),
                "ROLE_WORLD_SIZE": str(worker.role_world_size),
                "MASTER_ADDR": master_addr,
                "MASTER_PORT": str(master_port),
                "TORCHELASTIC_RESTART_COUNT": str(restart_count),
                "TORCHELASTIC_MAX_RESTARTS": str(spec.max_restarts),
                "TORCHELASTIC_RUN_ID": spec.rdzv_handler.get_run_id(),
                "TORCHELASTIC_USE_AGENT_STORE": str(use_agent_store),
                "NCCL_ASYNC_ERROR_HANDLING": str(1),
            }
            if "OMP_NUM_THREADS" in os.environ:
                worker_env["OMP_NUM_THREADS"] = os.environ["OMP_NUM_THREADS"]
            envs[local_rank] = worker_env
            worker_args = list(spec.args)
            worker_args = macros.substitute(worker_args, str(local_rank))
            args[local_rank] = tuple(worker_args)

        # scaling events do not count towards restarts (gets same attempt #)
        # remove existing log dir if this restart is due to a scaling event
        attempt_log_dir = os.path.join(self._log_dir, f"attempt_{restart_count}")
        shutil.rmtree(attempt_log_dir, ignore_errors=True)
        os.makedirs(attempt_log_dir)

        assert spec.entrypoint is not None
        self._pcontext = start_processes( # 把启动多线程的结果保存在 _pcontext 之中。
            name=spec.role,
            entrypoint=spec.entrypoint, # 训练代码入口
            args=args, # 这里重要的是local rank
            envs=envs,
            log_dir=attempt_log_dir,
            start_method=self._start_method,
            redirects=spec.redirects,
            tee=spec.tee,
        )

        return self._pcontext.pids()

2.1.1 start_processes

Обратите внимание, что код для start_processes здесь находится в torch/distributed/elastic/multiprocessing/api.py, который отличается от start_processes из mp, используемого позже. start_processes будет извлекать локальный рейтинг из args, а затем работать в соответствии с local_rank, например, создавать файл журнала для каждого процесса. ТотСмысл такой: связать каждый рабочий процесс с local_rank, один local_rank соответствует одному рабочему процессу.

def start_processes(
    name: str,
    entrypoint: Union[Callable, str],
    args: Dict[int, Tuple],
    envs: Dict[int, Dict[str, str]],
    log_dir: str,
    start_method: str = "spawn",
    redirects: Union[Std, Dict[int, Std]] = Std.NONE,
    tee: Union[Std, Dict[int, Std]] = Std.NONE,
) -> PContext:
    """
    Starts ``n`` copies of ``entrypoint`` processes with the provided options.
    ``entrypoint`` is either a ``Callable`` (function) or a ``str`` (binary).
    The number of copies is determined by the number of entries for ``args`` and
    ``envs`` arguments, which need to have the same key set.

    ``args`` and ``env`` parameters are the arguments and environment variables
    to pass down to the entrypoint mapped by the replica index (local rank).
    All local ranks must be accounted for.
    That is, the keyset should be ``{0,1,...,(nprocs-1)}``.

    Args:
        name: a human readable short name that describes what the processes are
              (used as header when tee'ing stdout/stderr outputs)
        entrypoint: either a ``Callable`` (function) or ``cmd`` (binary)
        args: arguments to each replica
        envs: env vars to each replica
        log_dir: directory used to write log files
        nprocs: number of copies to create (one on each process)
        start_method: multiprocessing start method (spawn, fork, forkserver)
                      ignored for binaries
        redirects: which std streams to redirect to a log file
        tees: which std streams to redirect + print to console

    """

    # listdir raises FileNotFound or NotADirectoryError so no need to check manually
    if os.listdir(log_dir):
        raise RuntimeError(
            f"log_dir: {log_dir} is not empty, please provide an empty log_dir"
        )

    nprocs = len(args)
    _validate_full_rank(args, nprocs, "args")
    _validate_full_rank(envs, nprocs, "envs")

    # create subdirs for each local rank in the logs_dir
    redirs = to_map(redirects, nprocs)
    ts = to_map(tee, nprocs)

    # to tee stdout/stderr we first redirect into a file
    # then tail -f stdout.log/stderr.log so add tee settings to redirects
    for local_rank, tee_std in ts.items():
        redirect_std = redirs[local_rank]
        redirs[local_rank] = redirect_std | tee_std

    stdouts = {local_rank: "" for local_rank in range(nprocs)}
    stderrs = {local_rank: "" for local_rank in range(nprocs)}
    tee_stdouts: Dict[int, str] = {}
    tee_stderrs: Dict[int, str] = {}
    error_files = {}

    # 大量使用了local_rank
    for local_rank in range(nprocs):
        clogdir = os.path.join(log_dir, str(local_rank))
        os.mkdir(clogdir)

        rd = redirs[local_rank]
        if (rd & Std.OUT) == Std.OUT:
            stdouts[local_rank] = os.path.join(clogdir, "stdout.log")
        if (rd & Std.ERR) == Std.ERR:
            stderrs[local_rank] = os.path.join(clogdir, "stderr.log")

        t = ts[local_rank]
        if t & Std.OUT == Std.OUT:
            tee_stdouts[local_rank] = stdouts[local_rank]
        if t & Std.ERR == Std.ERR:
            tee_stderrs[local_rank] = stderrs[local_rank]

        error_file = os.path.join(clogdir, "error.json")
        error_files[local_rank] = error_file
        envs[local_rank]["TORCHELASTIC_ERROR_FILE"] = error_file

    context: PContext
    if isinstance(entrypoint, str):
        context = SubprocessContext(
            name=name,
            entrypoint=entrypoint,
            args=args,
            envs=envs,
            stdouts=stdouts,
            stderrs=stderrs,
            tee_stdouts=tee_stdouts,
            tee_stderrs=tee_stderrs,
            error_files=error_files,
        )
    else:
        context = MultiprocessContext(
            name=name,
            entrypoint=entrypoint,
            args=args,
            envs=envs,
            stdouts=stdouts,
            stderrs=stderrs,
            tee_stdouts=tee_stdouts,
            tee_stderrs=tee_stderrs,
            error_files=error_files,
            start_method=start_method,
        )

    try:
        context.start()
        return context
    except Exception:
        context.close()
        raise

2.1.2 RunResult

Текущий результат рабочего процесса указывается RunResult. RunResult — это результат, возвращаемый рабочим потоком. Результат выполнения соответствует стратегии «все или ничего», при которой запуск завершается успешно только тогда и только тогда, когда все локальные рабочие процессы, управляемые этим агентом, завершаются успешно.

Как было сказано ранее, каждому рабочему процессу присваивается local_rank, что тоже правильно, при наличии 5 графических процессоров, естественно, для обучения запускается 5 рабочих процессов, и эти 5 рабочих процессов соответствуют локальному рангу 0~4.

Но аннотация RunResult гласит: если результат успешен (например,is_failed() = False),ноreturn_valuesПоле содержит выходные данные (возвращаемое значение) рабочих процессов, управляемых этим агентом, сопоставленные по их ГЛОБАЛЬНЫМ рангам. который,result.return_values[0]является возвращаемым значением глобального ранга 0. Итак, в _monitor_workers будет отображение локального ранга на глобальный ранг, о котором мы поговорим позже.

@dataclass
class RunResult:
    """
    Results returned by the worker executions. Run results follow an "all-or-nothing" policy
    where the run is successful if and only if ALL local workers managed by this agent
    complete successfully.

    If the result is successful (e.g. ``is_failed() = False``) then the ``return_values``
    field contains the outputs (return values) of the workers managed by THIS agent mapped
    by their GLOBAL ranks. That is ``result.return_values[0]`` is the return value of
    global rank 0.

    .. note:: ``return_values`` are only meaningful for when the worker entrypoint
              is a function. Workers specified as a binary entrypoint do not canonically
              have a return value and the ``return_values`` field is meaningless and
              may be empty.

    If ``is_failed()`` returns ``True`` then the ``failures`` field contains the
    failure information, again, mapped by the GLOBAL rank of the worker that failed.

    The keys in ``return_values`` and ``failures`` are mutually exclusive, that is,
    a worker's final state can only be one of: succeeded, failed. Workers intentionally
    terminated by the agent according to the agent's restart policy, are not represented
    in either ``return_values`` nor ``failures``.
    """

    state: WorkerState
    return_values: Dict[int, Any] = field(default_factory=dict)
    failures: Dict[int, ProcessFailure] = field(default_factory=dict)

    def is_failed(self) -> bool:
        return self.state == WorkerState.FAILED

2.1 Использование ТЭ

TE использует torch.mp и пакет subprocess для многопроцессорной обработки. При запуске нескольких процессов сохраните результат в _pcontext, который является экземпляром типа PContext.

    self._pcontext = start_processes( # 把启动多线程的结果保存在 _pcontext 之中。
        name=spec.role,
        entrypoint=spec.entrypoint,
        args=args,
        envs=envs,
        log_dir=attempt_log_dir,
        start_method=self._start_method,
        redirects=spec.redirects,
        tee=spec.tee,
    )

Среди них start_processes, PContext происходят из следующих:

from torch.distributed.elastic.multiprocessing import start_processes, PContext

_monitor_workers используют _pcontext для мониторинга при мониторинге. Во время мониторинга он будет преобразован в WorkerState.FAILED в соответствии с результатом потока, WorkerState.HEALTHY или WorkerState.SUCCEEDED будет возвращен на верхний уровень.

@prof
def _monitor_workers(self, worker_group: WorkerGroup) -> RunResult:
    role = worker_group.spec.role
    worker_pids = {w.id for w in worker_group.workers}
    assert self._pcontext is not None
    pc_pids = set(self._pcontext.pids().values())
    
    result = self._pcontext.wait(0) # 对运行结果进行监控
    if result:
        if result.is_failed():
            # map local rank failure to global rank
            worker_failures = {}
            for local_rank, failure in result.failures.items():
                worker = worker_group.workers[local_rank]
                worker_failures[worker.global_rank] = failure
            return RunResult(
                state=WorkerState.FAILED, # 进程出错,返回 WorkerState.FAILED
                failures=worker_failures,
            )
        else:
            # copy ret_val_queue into a map with a global ranks
            workers_ret_vals = {}
            for local_rank, ret_val in result.return_values.items():
                worker = worker_group.workers[local_rank]
                workers_ret_vals[worker.global_rank] = ret_val
            return RunResult(
                state=WorkerState.SUCCEEDED,
                return_values=workers_ret_vals,
            )
    else:
        return RunResult(state=WorkerState.HEALTHY)

Видно, что ключом является PContext, так что давайте взглянем на этот класс.

2.2 PContext

PContext — это абстрактный класс, который на самом деле является некоторой базовой конфигурацией.

class PContext(abc.ABC):
    """
    The base class that standardizes operations over a set of processes
    that are launched via different mechanisms. The name ``PContext``
    is intentional to disambiguate with ``torch.multiprocessing.ProcessContext``.

    .. warning:: stdouts and stderrs should ALWAYS be a superset of
                 tee_stdouts and tee_stderrs (respectively) this is b/c
                 tee is implemented as a redirect + tail -f <stdout/stderr.log>
    """
    def __init__(
        self,
        name: str,
        entrypoint: Union[Callable, str],
        args: Dict[int, Tuple],
        envs: Dict[int, Dict[str, str]],
        stdouts: Dict[int, str],
        stderrs: Dict[int, str],
        tee_stdouts: Dict[int, str],
        tee_stderrs: Dict[int, str],
        error_files: Dict[int, str],
    ):
        self.name = name
        # validate that all mappings have the same number of keys and
        # all local ranks are accounted for
        nprocs = len(args)
        _validate_full_rank(stdouts, nprocs, "stdouts")
        _validate_full_rank(stderrs, nprocs, "stderrs")

        self.entrypoint = entrypoint
        self.args = args
        self.envs = envs
        self.stdouts = stdouts
        self.stderrs = stderrs
        self.error_files = error_files
        self.nprocs = nprocs

        self._stdout_tail = TailLog(name, tee_stdouts, sys.stdout)
        self._stderr_tail = TailLog(name, tee_stderrs, sys.stderr)    

Но очень важно, чтобы он имел два производных класса: MultiprocessContext и SubprocessContext. Как упоминалось ранее, среди параметров start_processes естьentrypointиargsЭто пользовательская команда и параметры, точка входа может быть функцией или строкой. Если точка входа является функцией, используется MultiprocessContext. Если это строковый тип, используйте SubprocessContext.

def start_processes(
    name: str,
    entrypoint: Union[Callable, str],
    args: Dict[int, Tuple],
    envs: Dict[int, Dict[str, str]],
    log_dir: str,
    start_method: str = "spawn",
    redirects: Union[Std, Dict[int, Std]] = Std.NONE,
    tee: Union[Std, Dict[int, Std]] = Std.NONE,
) -> PContext:
  
    context: PContext
    if isinstance(entrypoint, str): # 如果是字符串
        context = SubprocessContext(
            name=name,
            entrypoint=entrypoint,
            args=args,
            envs=envs,
            stdouts=stdouts,
            stderrs=stderrs,
            tee_stdouts=tee_stdouts,
            tee_stderrs=tee_stderrs,
            error_files=error_files,
        )
    else:
        context = MultiprocessContext( # 函数则来到这里
            name=name,
            entrypoint=entrypoint,
            args=args,
            envs=envs,
            stdouts=stdouts,
            stderrs=stderrs,
            tee_stdouts=tee_stdouts,
            tee_stderrs=tee_stderrs,
            error_files=error_files,
            start_method=start_method,
        )

    try:
        context.start() # 调用到这里
        return context
    except Exception:
        context.close()
        raise  

В частности, основания двух производных классов различны.

  • MultiprocessContextиспользоватьtorch.multiprocessing.start_processesчтобы начать процесс.
  • SubprocessContextиспользоватьsubprocess.Popenчтобы начать процесс.

Далее мы будем использовать MultiprocessContext для анализа.

2.3 MultiprocessContext

Определение MultiprocessContext выглядит следующим образом, наиболее значимой из которых является переменная-член _pc, которая на самом деле является переменной ProcessContext.

import torch.multiprocessing as mp

class MultiprocessContext(PContext):
    """
    ``PContext`` holding worker processes invoked as a function.
    """

    def __init__(
        self,
        name: str,
        entrypoint: Callable,
        args: Dict[int, Tuple],
        envs: Dict[int, Dict[str, str]],
        stdouts: Dict[int, str],
        stderrs: Dict[int, str],
        tee_stdouts: Dict[int, str],
        tee_stderrs: Dict[int, str],
        error_files: Dict[int, str],
        start_method: str,
    ):
        super().__init__(
            name,
            entrypoint,
            args,
            envs,
            stdouts,
            stderrs,
            tee_stdouts,
            tee_stderrs,
            error_files,
        )

        self.start_method = start_method
        # each ret_val queue will always contain a single element.
        self._ret_vals = {
            local_rank: mp.get_context(self.start_method).SimpleQueue()
            for local_rank in range(self.nprocs)
        }

        # see comments in ``join()`` for what this is
        self._return_values: Dict[int, Any] = {}
        self._pc: Optional[mp.ProcessContext] = None # 这里是关键
        self._worker_finished_event = mp.get_context(self.start_method).Event()

2.3.1 start

Запуск MultiprocessContext заключается в вызове mp.start_processes с последующим сохранением результата.

import torch.multiprocessing as mp

		def _start(self):
        if self._pc:
            raise ValueError(
                "The process context already initialized."
                " Most likely the start method got called twice."
            )
        self._pc = mp.start_processes( # 这里返回了 mp.ProcessContext
            fn=_wrap,
            args=(
                self.entrypoint,
                self.args,
                self.envs,
                self.stdouts,
                self.stderrs,
                self._ret_vals,
                self._worker_finished_event,
            ),
            nprocs=self.nprocs,
            join=False,
            daemon=False,
            start_method=self.start_method,
        )

2.3.2 wait

Метод ожидания находится в своем базовом классе class PContext(abc.ABC): . этоЦиклические вызовы функции _poll для периодической проверки.

    def wait(self, timeout: float = -1, period: float = 1) -> Optional[RunProcsResult]:
        """
        Waits for the specified ``timeout`` seconds, polling every ``period`` seconds
        for the processes to be done. Returns ``None`` if the processes are still running
        on timeout expiry. Negative timeout values are interpreted as "wait-forever".
        A timeout value of zero simply queries the status of the processes (e.g. equivalent
        to a poll).
        """
        if timeout == 0:
            return self._poll()
        if timeout < 0:
            timeout = sys.maxsize

        expiry = time.time() + timeout
        while time.time() < expiry: # 定期操作
            pr = self._poll() # 用poll来检测
            if pr:
                return pr
            time.sleep(period)

        return None

2.3.3 _poll

Функция _poll предназначена специально для обнаружения, а для обнаружения вызывается torch.mp.ProcessContext.join. torch.mp.ProcessContext вызывает исключение, когда некоторые/все рабочие процессы завершаются сбоем. Если время ожидания истекло, статус рабочего процесса проверяется и возвращается немедленно. Поскольку мы используем synchronize.Event для ожидания завершения всех процессов, Join никогда не вернет успех.

PyTorch использует multiprocessing.Queue, чтобы вернуть возвращаемое значение рабочего процесса родительскому процессу, а окончательный возвращаемый результат включает в себя текущий результат каждого процесса.

def _poll(self) -> Optional[RunProcsResult]:

    try:
        # torch.mp.ProcessContext Throws an Exception if some/all of
        # worker processes failed
        # timeout < 0 checks worker status and return immediately
        # Join will never return success since we use synchronize.Event to wait
        # for all processes to finish.
        self._pc.join(-1)

        # IMPORTANT: we use multiprocessing.Queue to carry worker return values
        # back to the parent, the worker process will wait before terminating
        # until all the buffered items are fed by the feeder thread to the underlying
        # pipe. Hence to prevent deadlocks on large return values,
        # we opportunistically try queue.get on each join call
        # See: https://docs.python.org/2/library/multiprocessing.html#all-platforms
        
        for local_rank in range(0, self.nprocs): # 遍历自己下面的进程
            return_queue = self._ret_vals[local_rank]
            if not return_queue.empty():
                # save the return values temporarily into a member var
                self._return_values[local_rank] = return_queue.get() # 得到进程运行结果

        if self._is_done():
            # we should ALWAYS have ALL the return values when all the processes are done
            self._worker_finished_event.set()
            # Wait untill all processes are finished. At this point workers finished executing user function
            self._pc.join()
            self.close()
            return RunProcsResult(
                return_values=self._return_values, # 返回进程结果
                stdouts=self.stdouts,
                stderrs=self.stderrs,
            )
        else:
            return None
          
    except (mp.ProcessRaisedException, mp.ProcessExitedException) as e:
        failed_local_rank = e.error_index

        # entrypoint for MultiprocessContext will always be a Callable
        fn_name = self.entrypoint.__qualname__  # type: ignore[union-attr]
        failed_proc = self._pc.processes[failed_local_rank]
        error_filepath = self.error_files[failed_local_rank]

        self.close()
        return RunProcsResult( # 返回进程结果
            failures={
                failed_local_rank: ProcessFailure(
                    local_rank=failed_local_rank,
                    pid=e.pid,
                    exitcode=failed_proc.exitcode,
                    error_file=error_filepath,
                )
            },
            stdouts=self.stdouts,
            stderrs=self.stderrs,
        )

2.4 ProcessContext

Как видно спереди, ключевой переменной MultiprocessContext является: _pc: Необязательный[mp.ProcessContext], эта переменная-член создается start_processes, поэтому нам нужно взглянуть на torch.mp.ProcessContext.

2.4.1 start_processes

start_processes находится в torch/multiprocessing/spawn.py и возвращает ProcessContext. Обратите внимание, что с этого момента процесс обучения будет запускать собственный обучающий код, как если бы агента не было, поскольку агент уже выполнил работу torch.distributed.launch.

def start_processes(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn'):
    mp = multiprocessing.get_context(start_method)
    error_queues = []
    processes = []
    for i in range(nprocs):
        error_queue = mp.SimpleQueue()
        process = mp.Process(
            target=_wrap,
            args=(fn, i, args, error_queue), # 训练进程开始跑训练代码
            daemon=daemon,
        )
        process.start()
        error_queues.append(error_queue)
        processes.append(process)

    context = ProcessContext(processes, error_queues)
    if not join:
        return context

    # Loop on join until it returns True or raises an exception.
    while not context.join():
        pass

2.4.2 ProcessContext

torch.mp.ProcessContext — это класс, который, наконец, вступает в игру. На самом деле нас не волнует внутренняя реализация torch.mp.ProcessContext и то, как его запустить, потому что через метод start_processes фактически запущен torch.mp.ProcessContext, и мы можем рассматривать его как функциональный черный ящик. Мы действительно заботимся о том, как использовать torch.mp.ProcessContext для мониторинга.

Из его комментариев мы знаем, что torch.mp.ProcessContext выдает исключение при сбое некоторых/всех рабочих процессов. Если время ожидания истекло, статус рабочего процесса проверяется и возвращается немедленно. Поскольку мы используем synchronize.Event для ожидания завершения всех процессов, Join никогда не вернет успех.

# torch.mp.ProcessContext Throws an Exception if some/all of
# worker processes failed
# timeout < 0 checks worker status and return immediately
# Join will never return success since we use synchronize.Event to wait
# for all processes to finish.

2.5 Резюме

Текущие отношения следующие:

  • Во время создания LocalElasticAgent создает MultiprocessContext, а MultiprocessContext создает ProcessContext.
  • LocalElasticAgent._pcontextсохраненMultiprocessContext,MultiprocessContext._pcсохраненProcessContext.
  • Во время мониторинга LocalElasticAgent._monitor_workers вызывает MultiprocessContext.wait, MultiprocessContext вызывает ProcessContext.join, а ProcessContext.join специально отслеживает состояние выполнения процесса, тем самым завершая общую логику мониторинга.
  • После изменения или истечения времени ожидания дочернего процесса ProcessContext.join возвращает результат процесса, MultiprocessContext.wait пересылает результат процесса обратно, а _monitor_workers преобразует результат процесса в WorkerState.SUCCEEDED или WorkerState.FAILED.

Как показано на рисунке:

+--------------------------------------------------------------------------------------+   +------------------------------------+   +----------------+
| LocalElasticAgent                                                                    |   | MultiprocessContext                |   | ProcessContext |
|                                                                                      |   |                                    |   |                |
|                                                                                      |   |                                    |   |                |
|  +----------------------------------------+       MultiprocessContext _pcontext      |   |       ProcessContext _pc           |   |                |
|  | _invoke_run                            |                                          |   |                                    |   |                |
|  |                                        |                                          |   |                                    |   |                |
|  |   _initialize_workers  +-------------------->  _pcontext = start_processes  +-------------->  start():                     |   |                |
|  |                                        |                                          |   |         _pc = mp.start_processes +----------->          |
|  |                                        |                                          |   |                                    |   |                |
|  |   while True:                          |      +--------------------------------+  |   |                                    |   |                |
|  |       _monitor_workers(_worker_group)+------> | _monitor_workers               |  |   |                                    |   |                |
|  |                                        |      |                                |  |   |                                    |   |                |
|  |                                        |      |             _pcontext.wait +--------------->  wait +---> poll:             |   |                |
|  |                                        |      |                                |  |   |                    _pc.join  +--------------->          |
|  +----------------------------------------+      +--------------------------------+  |   |                                    |   |                |
|                                                                                      |   |                                    |   |                |
+--------------------------------------------------------------------------------------+   +------------------------------------+   +----------------+

Телефон такой:

0x03 Механизм мониторинга

Как видно из предыдущего кода _monitor_workers, _monitor_workersОн преобразует текущий результат дочернего процесса в определенное состояние WorkerState. Когда агент получает результат мониторинга _monitor_workers, он обрабатывает его в зависимости от ситуации.

            # 监控客户程序运行情况
            run_result = self._monitor_workers(self._worker_group)
            state = run_result.state # 进程运行情况
            self._worker_group.state = state

            if state == WorkerState.SUCCEEDED:
                # 程序正常结束
                self._exit_barrier() # 有一个成功了就全部结束
                return run_result
            elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
                # 程序出错
                if self._remaining_restarts > 0: # 重试
                    self._remaining_restarts -= 1
                    self._restart_workers(self._worker_group) # 进行重启
                else:
                    self._stop_workers(self._worker_group) # 重试次数达到,结束workers
                    self._worker_group.state = WorkerState.FAILED
                    self._exit_barrier()
                    return run_result
            elif state == WorkerState.HEALTHY:
								# 程序正常运行
                # 节点成员关系有变化,比如scale up
                # membership changes do not count as retries
                num_nodes_waiting = rdzv_handler.num_nodes_waiting()
                group_rank = self._worker_group.group_rank
                # 如果有新的节点在waiting,就重启所有workers
                if num_nodes_waiting > 0:
                    self._restart_workers(self._worker_group)
            else:
                raise Exception(f"[{role}] Worker group in {state.name} state")

3.1 Мониторинг

позвоню сюда_pcontext.wait(0)чтобы получить состояние текущих рабочих подпроцессов, а затем преобразовать различные рабочие состояния обратно в вызывающий объект в соответствии с возвращенным результатом. Как упоминалось выше, RunResult должен быть сопоставлен с глобальным рангом, поэтому _monitor_workers имеет сопоставление локального ранга с глобальным рангом.

Зачем использовать глобальный рейтинг в качестве индикатора состояния процесса? Поскольку требуется связь между узлами, в настоящее время требуется глобальный ранг.

    @prof
    def _monitor_workers(self, worker_group: WorkerGroup) -> RunResult:
        role = worker_group.spec.role
        worker_pids = {w.id for w in worker_group.workers} # 拿到本agent所有worker的pid
        pc_pids = set(self._pcontext.pids().values())
        if worker_pids != pc_pids:
            return RunResult(state=WorkerState.UNKNOWN)

        result = self._pcontext.wait(0) # 对运行结构进行监控
        if result:
            if result.is_failed(): # 如果进程失败
                # map local rank failure to global rank
                worker_failures = {}
                #  返回的结果内部就包括每个进程的运行结果
                for local_rank, failure in result.failures.items(): # local_rank是进程index
                    worker = worker_group.workers[local_rank] # 拿到对应的worker
                    worker_failures[worker.global_rank] = failure # 拿到其 global_rank,进而设置worker状态
                return RunResult(
                    state=WorkerState.FAILED,
                    failures=worker_failures, # 返回运行结果
                )
            else:
                # copy ret_val_queue into a map with a global ranks
                workers_ret_vals = {}
                for local_rank, ret_val in result.return_values.items():
                    worker = worker_group.workers[local_rank] # 
                    workers_ret_vals[worker.global_rank] = ret_val
                return RunResult(
                    state=WorkerState.SUCCEEDED,
                    return_values=workers_ret_vals, # 返回运行结果
                )
        else:
            return RunResult(state=WorkerState.HEALTHY)

3.2 Обращение

В зависимости от статуса возврата будет разная обработка:

  • Если WorkerState.SUCCEEDED, это означает, что обучение завершено и возвращается в обычном режиме.
  • Если WorkerState.HEALTHY, это означает, что обучение проходит нормально. В это время он проверит, добавлен ли новый узел. Мы подробно объясним это позже.
  • Если WorkerState.UNHEALTHY, WorkerState.FAILED, значит проблема с обучением, здесь возможны две ситуации.
    • Одна ошибка программы, TE повторит попытку.
    • Одним из них является выход узла, который мы анализируем ниже, но его процесс обработки согласуется с ошибкой программы.

Далее разберем, как бороться с окончанием обучения и программными ошибками.

0x04 конец тренировки

        if state == WorkerState.SUCCEEDED:
            # 程序正常结束
            self._exit_barrier() # 有一个成功了就全部结束
            return run_result

Вышеупомянутая обработка в обычном конце обучения, особенность заключается в использовании _exit_barrier.

4.1 Единое завершение

В настоящее время Torchelastic поддерживает приложения в стиле DDP. То есть TE ожидает, что все рабочие закончат работу примерно в одно и то же время. На самом деле практически невозможно гарантировать, что все воркеры DDP могут быть гарантированно завершены в одно и то же время, поэтому TE предоставляет барьер финализации, роль которого заключается в реализации тайм-аута ожидания (5 минут) для финализации воркера. То есть, если обучен один воркер, TE (torchelastic) ожидает, что все воркеры пользователя завершатся с 5-минутной ошибкой.

def _exit_barrier(self):
    """
    Wait for ``exit_barrier_timeout`` seconds for all agents to finish
    executing their local workers (either successfully or not). This
    acts as a safety guard against user scripts that terminate at different
    times. This barrier keeps the agent process alive until all workers finish.
    """

    start = time.time()
    try:
        store_util.barrier(
            self._store,
            self._worker_group.group_rank,
            self._worker_group.group_world_size,
            key_prefix=_TERMINAL_STATE_SYNC_ID,
            barrier_timeout=self._exit_barrier_timeout,
        )
    except Exception:
        log.exception(
            f"Error waiting on exit barrier. Elapsed: {time.time() - start} seconds"
        )

Значение по умолчанию для exit_barrier_timeout — 300 секунд, то есть 5 минут.

exit_barrier_timeout: float = 300,

4.2 Синхронизация

В torch/distributed/elastic/utils/store.py барьер будет вызывать синхронизацию для синхронизации.

def barrier(
    store, rank: int, world_size: int, key_prefix: str, barrier_timeout: float = 300
) -> None:
    """
    A global lock between agents.

    Note: Since the data is not removed from the store, the barrier can be used
        once per unique ``key_prefix``.
    """
    data = f"{rank}".encode(encoding="UTF-8")
    synchronize(store, data, rank, world_size, key_prefix, barrier_timeout)

синхронизировать - синхронизировать через магазин.

def get_all(store, prefix: str, size: int):
    r"""
    Given a store and a prefix, the method goes through the array of keys
    of the following format: ``{prefix}{idx}``, where idx is in a range
    from 0 to size, and tries to retrieve the data.

    Usage

    ::

     values = get_all(store, 'torchelastic/data', 3)
     value1 = values[0] # retrieves the data for key torchelastic/data0
     value2 = values[1] # retrieves the data for key torchelastic/data1
     value3 = values[2] # retrieves the data for key torchelastic/data2

    """
    data_arr = []
    for idx in range(size):
        data = store.get(f"{prefix}{idx}")
        data_arr.append(data)
    return data_arr

def synchronize(
    store,
    data: bytes,
    rank: int,
    world_size: int,
    key_prefix: str,
    barrier_timeout: float = 300,
) -> List[bytes]:
    """
    Synchronizes ``world_size`` agents between each other using the underlying c10d store.
    The ``data`` will be available on each of the agents.

    Note: The data on the path is not deleted, as a result there can be stale data if
        you use the same key_prefix twice.
    """
    store.set_timeout(timedelta(seconds=barrier_timeout))
    store.set(f"{key_prefix}{rank}", data)
    agent_data = get_all(store, key_prefix, world_size)
    return agent_data

Обработка ошибок 0x05

5.1 Типы ошибок

Каждый хост в распределенном задании PyTorch запускает агент TorchElastic и несколько рабочих процессов (как дочерние процессы агента TorchElastic). Поскольку рабочие процессы предоставляются пользователем (скрипты/задания PyTorch), TorchElastic может распространять ошибки через прокси-серверы к тренеру, вплоть до планировщика, который в конечном итоге информирует конечного пользователя о состоянии этих заданий и применяет некоторую стратегию повторных попыток.

TE классифицирует ошибки по следующим категориям.

+----------------+----------------+--------------------------------------------------------------+
| Category       | Sub-Category   |  Description                                                 |
+================+================+==============================================================+
| User Error     | Input Error    | invalid inputs to TorchElastic APIs (e.g. min > max nodes)   |
|                +----------------+--------------------------------------------------------------+
|                | Worker Failure | any failures on the worker child process                     |
+----------------+----------------+--------------------------------------------------------------+
| Platform Error |      n/a       | failures caused by the agent                                 |
+----------------+----------------+--------------------------------------------------------------+
| Infra Error    |      n/a       | failures outside the domain of the agent and workers         |
|                |                | (e.g. host failures)                                         |
+----------------+----------------+--------------------------------------------------------------+

5.1 Режим обработки ошибок

Соответствующий режим обработки ошибок выглядит следующим образом, мы смотрим на уровень неисправности от малого к большому:

  • User Error: В частности, он делится на следующие методы обработки:
    • Ошибка пользователя: например, неправильный ввод, поэтому можно выполнить прямой захват программы.
    • Ошибка работника:
      • Сбои рабочих процессов являются особыми, поскольку исключение/сбой происходит из процесса, отличного от агента, поэтому ошибки необходимо распространять между процессами (например, агент не может простоtry-catchисключение, вызванное рабочим процессом).
        • Использование прокси-сервера TorchElastictorch.distributed.elastic.multiprocessing.start_processesЗапускает воркер, который имеет встроенную простую файловую систему распространения ошибок между процессами.
        • любое использованиеrecordДекорированные функции или двоичные точки входа будут записывать неперехваченные исключения (с информацией о трассировке) в переменные среды.TORCHELASTIC_ERROR_FILEуказанный файл. Родительский процесс (например, агент) устанавливает эту переменную среды поверх каждого запускаемого им дочернего процесса, затем объединяет файлы ошибок для всех дочерних процессов и распространяет файл ошибки с наименьшей меткой времени (например, первой ошибкой).
      • В документации указано следующее: для обучающего задания с «n» рабочими процессами, если «k
      • Сбой одного рабочего процесса приведет к сбою всего кластера: если один рабочий процесс продолжит сбой, это приведет к тому, что переменная max_restarts агента TE станет равной нулю. Это заставит агента выполнить свою работу и закрыть рандеву. Если на других брокерах есть другие работники, они также будут уволены.
  • Ошибка платформы (то есть сбой прокси):
    • Все ошибки, кроме Worker Failures, обычно выдаются из процесса агента, явно или неявно приводя к сбою процесса агента. Поэтому можно применять стратегии обработки исключений, предоставляемые стандартным языком (python).
    • Сбой прокси-сервера также может привести к сбою локальной рабочей группы. То, как это обрабатывается, зависит от диспетчера заданий, например, сбой всего задания (бандитная семантика) или попытка заменить узел. Оба поведения поддерживаются прокси.
  • Infra Error (то есть сбой узла): обрабатывается так же, как и сбои прокси.

Давайте подробно рассмотрим, как бороться с «Отказом работника».

5.2 Механизм обработки

Конкретный механизм обработки ошибок следующий: если максимальное количество повторных попыток не достигнуто, он попытается перезапустить рабочие процессы. Если достигнуто максимальное количество раз, остановите рабочие процессы.

        elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
            # 程序出错
            if self._remaining_restarts > 0: # 重试
                self._remaining_restarts -= 1
                self._restart_workers(self._worker_group) # 进行重启
            else:
                self._stop_workers(self._worker_group) # 重试次数达到,结束workers
                self._worker_group.state = WorkerState.FAILED
                self._exit_barrier()
                return run_result

5.2.1 Перезагрузка

_restart_workers остановит всех рабочих, а затем снова встретится.

@prof
def _restart_workers(self, worker_group: WorkerGroup) -> None:
    """
    Restarts (stops, rendezvous, starts) all local workers in the group.
    """

    role = worker_group.spec.role
    self._stop_workers(worker_group)
    worker_group.state = WorkerState.STOPPED
    self._initialize_workers(worker_group)

5.2.2 Остановка

Остановка рабочих процессов закрывает контекст.

def _shutdown(self) -> None:
    if self._pcontext:
        self._pcontext.close()
        
@prof
def _stop_workers(self, worker_group: WorkerGroup) -> None:
    self._shutdown()

В MultiprocessContext метод close закрывает все дочерние процессы, а затем ожидает их полной остановки.

    def _close(self) -> None:
        if self._pc:
            for proc in self._pc.processes:
                proc.terminate()
                proc.join()

5.4 Перезапуск других агентов

Как видно из комментариев к исходному коду: новый раунд рандеву заставит и других агентов перезапустить своих воркеров.

When worker fails, TE will check the number of restarts available, if there is more than 0 restarts, TE will start a new rendezvous round and restart the worker process. New rendezvous round will other TE agents to terminate their workers.

Как это делается? детали следующим образом:

  1. **Агент 0 (Агент, вызвавший ошибку)** обнаружил ошибку посредством мониторинга.
  2. Агент 0 вызывает _restart_workers для перезапуска рабочих процессов.
  3. Агент 0 вызовет next_rendezvous, чтобы инициировать новый раунд рандеву.
  4. Прежде чем агент 0 выполнит какую-либо операцию, например, операцию поддержания активности, он вызовет синхронизацию для получения информации о кластере из kvstore, что может гарантировать, что агент получит самое последнее состояние кластера.
  5. Агент 0 добавит себя в локальный список ожидания.
  6. Агент 0 также вызовет mark_dirty, что означает, что мой статус обновился и мне нужно написать в KVStore.
  7. Агент 0 вызовет синхронизацию, чтобы записать свой собственный список ожидания в KVStore.
  8. **Агент 1 (другие нормально работающие агенты)** вызовет операцию синхронизации, чтобы получить последнюю информацию из KVStore перед выполнением какой-либо операции, такой как операция поддержания активности.
  9. Агент 1 использует эту информацию для обновления своего состояния, чтобы обновить локальный список ожидания.
  10. Цикл поезда агента 1 находится в состоянии работоспособности после мониторинга каждые 30 секунд, поскольку система работает нормально.
  11. Поэтому агент 1 вызывает num_nodes_waiting(), чтобы увидеть номер списка ожидания.
  12. Агент 1 получит номер местного списка ожидания.
  13. Также вызывает _restart_workers, если список ожидания не пуст.
  14. В конечном итоге он вызовет next_rendezvous.

детали следующим образом:

 Agent 0                                      Agent 1
+---------------------------+                 +--------------------------------------------+
|    _invoke_run            |                 |                       _invoke_run          |
|          +                |                 |                           +                |
|          |                |                 |                           |                |
|          | 1              |                 |                           |                |
|          v                |                 |                           |                |
| Worker Process Error      |                 |                           |                |
|          +                |                 |                           |                |
|          |                |                 |                           | 10             |
|          | 2              |                 |                           v                |
|          v                |                 |                        HEALTHY             |
|  _restart_workers         |                 |                           +                |
|          +                |                 |                           | 11             |
|          |                |                 |                           |                |
|          | 3              |                 |                           v                |
|          v                |                 |              +-->  num_nodes_waiting() > 0 |
|   next_rendezvous         |                 |              |            +                |
|          +                |                 |              |            |                |
|          | 4              |                 |              | 12         | 13             |
|          |                +   +----------+  |              |            v                |
|          v      cluster info  |          |  |              |       _restart_workers      |
|        sync  <------------+-> | KV Store |  |              |            +                |
|          +                |   |          |  |              |            |                |
|          | 5              |   |          |  |              |            | 14             |
|          v                |   |          |  |              |            v                |
|  Add to local waiting_list|   |          |  |              |        next_rendezvous      |
|          +                |   |          |  |              |                             |
|          |                |   |          |  |              |                             |
|          | 6              |   |          |  |              v                             |
|          v                |   |          |  |                                            |
|     mark_dirty            |   |          |  |  Add to local waiting_list                 |
|          +                |   |          |  |              ^                             |
|          |                |   |          |  |              |                             |
|          | 7              |   |          |  |            9 | waiting_list                |
|          v         7      |   |          |  |    8         +                             |
|        sync +---------------> |          +--------------> sync                           |
|              waiting_list |   |          |  |waiting_list                                |
|                           |   +----------+  |                                            |
+---------------------------+                 +--------------------------------------------+


На этом наше первоначальное знакомство с механизмом мониторинга завершено. Из-за нехватки места мы продолжим рассказывать о том, как управлять масштабированием вверх/вниз в следующей статье.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

Cloud Native Elastic AI Training Series II: проектирование и внедрение PyTorch 1.9.0 Elastic Distributed Training

Чтение исходного кода PyTorch Elastic