[Анализ исходного кода] Распределенное эластичное обучение PyTorch (3) --- агент

глубокое обучение PyTorch

0x00 сводка

В предыдущих статьях мы изучили основные распространяемые модули PyTorch и представили несколько официальных примеров.Далее мы представим эластичное обучение PyTorch.Эта статья является третьей статьей, посвященной основным функциям эластичных агентов.

Другие статьи из этой серии:

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (1) --- общая идея

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (2) --- запуск и процесс с одним узлом

0x01 Общий фон

Давайте сначала резюмируем, что наиболее важными концепциями TE являются две концепции Агента и Рандеву.

  • Агент — это независимый фоновый процесс, работающий на одном узле. Его можно рассматривать как менеджера рабочих процессов или супервайзеров процессов. Он отвечает за запуск рабочих процессов, мониторинг операций рабочих процессов и захват исключений рабочих процессов.rendezvousРеализовать взаимное обнаружение между рабочими и нести ответственность за изменения при смене участников.rendezvousСинхронизируйте изменения.
  • Для достижения эластичного обучения необходим механизм, позволяющий узлам/процессам обнаруживать друг друга. рандеву — это механизм обнаружения или компонент синхронизации. Когда система запускается или изменяется членство, все рабочие (повторно) сходятся (встречаются) для создания новой группы процессов.

1.1 Функциональное разделение

TE состоит из нескольких эластичных агентов на основе Rendezvous. Это разделение функций. Давайте сравним их.

  • Агент фокусируется на логике на конкретных узлах.
    • Агент отвечает за определенные операции, связанные с бизнес-логикой, такие как запуск процесса для выполнения пользовательской программы, мониторинг состояния выполнения пользовательской программы и уведомление Rendezvous в случае возникновения исключения.
    • Агент — это менеджер рабочих процессов, отвечающий за запуск и управление рабочими процессами, формирование рабочей группы, отслеживание рабочего состояния рабочих процессов, сбор данных об отказавших рабочих процессах и перезапуск группы рабочих процессов в случае сбоя или нового добавленного рабочего процесса.
    • Агент отвечает за сохранение информации WORLD_SIZE и RANK. Пользователю не нужно предоставлять его вручную, и Агент сделает это автоматически.
    • Агент — это фоновый процесс на определенном узле и независимый индивидуум. Сам агент не может реализовать общее эластичное обучение, поэтому ему нужен механизм для завершения взаимного обнаружения между рабочими, синхронизации изменений и т. д. (информация WORLD_SIZE и RANK фактически должна быть синхронизирована несколькими узлами для определения), это следующая концепция Rendezvous.
  • Rendezvous отвечает за логику кластера, гарантируя, что узлы достигнут строгого консенсуса в отношении того, «какие узлы участвуют в обучении».
    • Каждый Агент включает в себя обработчик Rendezvous, и эти обработчики составляют кластер Rendezvous в целом, формируя, таким образом, кластер Агента.
    • После завершения Rendezvous создается общее хранилище ключей и значений, которое реализуетtorch.distributed.StoreAPI. Это хранилище используется совместно только участниками, завершившими Rendezvous, и оно предназначено для того, чтобы позволить Torch Distributed Elastic обмениваться управляющей информацией и данными во время заданий инициализации.
    • Rendezvous отвечает за сохранение всей необходимой информации о текущей группе поверх каждого агента. Поверх каждого агента происходит рандеву, они будут общаться друг с другом и поддерживать набор информации в целом, которая хранится в упомянутом выше Магазине.
    • Rendezvous отвечает за логику кластера, такую ​​как добавление новых узлов, удаление узлов, присвоение рангов и т. д.

Давайте сначала взглянем на принципиальную схему из исходного кода.Сначала у нас есть общая концепция.

1.2 Rendezvous

Мы лишь кратко представляем рандеву в этой статье, сосредоточив внимание на представлении агентов.

В контексте Torch Distributed Elastic мы используем термин рандеву для обозначения конкретной функции: примитива распределенной синхронизации в сочетании с обнаружением одноранговых узлов.

Rendezvous используется Torch Distributed Elastic для сбора участников (узлов) для задания обучения, чтобы участники могли согласовать список участников и роль каждого участника, а также согласовать коллективное решение о начале/возобновлении обучения.

Рандеву разделяет и разделяет функции, а бизнес-логика абстрагируется в ряд операторов, таких как_RendevzousJoinOp. Rendezvous поддерживает набор конечных автоматов внутри, и оператор решает следующую операцию. Например_RendezvousOpExecutorдля выполнения различных операторов и получения Действия, которое должно быть выполнено на следующем шаге в соответствии с результатами оператора, чтобы работать над собой.

например, в_DistributedRendezvousOpExecutorСреди них, если текущее действие окажется ADD_TO_WAIT_LIST, оно будет выполнено_add_to_wait_list, а затем позвонитеself._state.wait_list.add(self._node)

if action == _Action.KEEP_ALIVE:
    self._keep_alive()
elif action == _Action.ADD_TO_PARTICIPANTS:
    self._add_to_participants()
elif action == _Action.ADD_TO_WAIT_LIST: # 发现当前Action
    self._add_to_wait_list() # 然后执行
elif action == _Action.REMOVE_FROM_PARTICIPANTS:
    self._remove_from_participants()
elif action == _Action.REMOVE_FROM_WAIT_LIST:
    self._remove_from_wait_list()
elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
    self._mark_rendezvous_complete()
elif action == _Action.MARK_RENDEZVOUS_CLOSED:
    self._mark_rendezvous_closed()

0x02 Общая логика агента

2.1 Функция

Агент Elastic — это плоскость управления torchelastic. Это независимый процесс, отвечающий за запуск и управление базовыми рабочими процессами. Агент отвечает за:

  • Работает в тандеме с собственным распределенным PyTorch: позволяет каждому работнику получать всю информацию, необходимую для успешного вызова.torch.distributed.init_process_group().
  • Отказоустойчивость: Контролируйте каждого воркера, прекращайте работу всех воркеров и вовремя перезапускайте их при возникновении ошибки или исключения.
  • Отказоустойчивость: реагируйте на изменения участников и перезапускайте все рабочие процессы с новыми участниками.

Следующее изображение от Zhihu, которое является уточнением предыдущего изображения.

img

2.2 Основа работы

Агенты Torchelast и пользовательские воркеры работают в соответствии с отказоустойчивым контрактом:

  • TE (torchelastic) ожидает, что рабочие-пользователи выполнят работу в течение 5 минут.
  • При разработке приложения DDP лучше всего, чтобы все рабочие процессы вышли из строя, а не только один рабочий процесс.
  • TE не синхронизирует количество перезапусков между прокси.
  • Повторное рандеву TE не уменьшает количество перезапусков.
  • Когда один агент выполнит свою работу (успех или неудача), он закроет рандеву. Если у других агентов все еще есть рабочие, они будут уволены.
  • Исходя из вышеизложенного, уменьшение масштаба не работает, если хотя бы один агент выполнил задачу.
  • Когда агент обнаруживает увеличение масштаба, он не уменьшает «max_restarts».
  • Агенты Torchelast работают вместе через etcd или аналогичные серверные части.

2.3 Развертывание

Простые агенты развернуты на каждом узле и работают с локальными процессами. Более продвинутые агенты могут запускать и управлять работниками удаленно. Агенты могут быть полностью децентрализованы, общаться и координировать свои действия с другими агентами (работниками, выполняющими ту же работу) для принятия коллективного решения, основанного на условиях управляемых им работников.

Как настроить, пример также приведен в исходном коде.Если вы начнете обучать работу с 8 тренерами (по одному тренеру на GPU) на GPU, мы можем сделать следующую конфигурацию.

1. Use 8 x single GPU instances, place an agent per instance, managing 1 worker per agent.
2. Use 4 x double GPU instances, place an agent per instance, managing 2 workers per agent.
3. Use 2 x quad GPU instances, place an agent per instance, managing 4 workers per agent.
4. Use 1 x 8 GPU instance, place an agent per instance, managing 8 workers per agent.

2.4 Базовый класс

базовый классElasticAgentЭто абстрактный класс, из которого должны быть получены реальные работающие агенты. Судя по комментариям ElasticAgent, процесс агента отвечает за управление одним или несколькими рабочими процессами. Предполагается, что рабочие процессы представляют собой обычные распределенные сценарии PyTorch. Когда рабочий процесс создается агентом, агент предоставляет рабочему процессу необходимую информацию для правильной инициализации группы процессов факела. При развертывании точная топология и соотношение агентов и рабочих зависят от конкретной реализации агента и предпочтений размещения рабочих мест пользователей.

class ElasticAgent(abc.ABC):
    """
    Agent process responsible for managing one or more worker processes.
    The worker processes are assumed to be regular distributed PyTorch scripts.
    When the worker process is created by the agent, the agent provides the
    necessary information for the worker processes to properly initialize
    a torch process group.

    The exact deployment topology and ratio of agent-to-worker is dependent
    on the specific implementation of the agent and the user's job placement
    preferences. 

    Usage
    ::

     group_result = agent.run()
      if group_result.is_failed():
        # workers failed
        failure = group_result.failures[0]
        log.exception(f"worker 0 failed with exit code : {failure.exit_code}")
      else:
        return group_result.return_values[0] # return rank 0's results

    """

    @abc.abstractmethod
    def run(self, role: str = DEFAULT_ROLE) -> RunResult:
        """
        Runs the agent, retrying the worker group on failures up to
        ``max_restarts``.

        Returns:
            The result of the execution, containing the return values or
            failure details for each worker mapped by the worker's global rank.

        Raises:
            Exception - any other failures NOT related to worker process
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def get_worker_group(self, role: str = DEFAULT_ROLE) -> WorkerGroup:
        """
        Returns:
            The ``WorkerGroup`` for the given ``role``.
            Note that the worker group is a mutable object and hence in a
            multi-threaded/process environment it may change state.
            Implementors are encouraged (but not required) to return
            a defensive read-only copy.
        """
        raise NotImplementedError()

ElasticAgent имеет два производных класса:

  • SimpleElasticAgentНекоторые функции базового класса реализованы для облегчения расширения реализации новых агентов.
  • LocalElasticAgentполученныйSimpleElasticAgent, является конечным агентом, используемым для эластичного обучения. Он в основном используется для локальных операций и отвечает за управление всеми рабочими процессами на одной машине.

0x03 Worker

Мы начнем с рассмотрения рабочих процессов, которые являются принципалами, управляемыми агентами.

3.1 Определение работника

Класс Worker представляет рабочий экземпляр, который мы представили выше.WorkerSpec, Рабочий основан наWorkerSpecПостроен, его ключевые переменные-члены следующие:

  • id (произвольный): однозначно идентифицирует воркера, что объясняется спецификой реализации ElasticAgent, для локальных агентов может быть воркеромpid(int), для удаленных прокси может быть закодировано как ``хост:порт(строка)`.

  • local_rank : локальный ранг работника.

  • global_rank: глобальный ранг работника.

  • role_rank: ранг всех работников с одинаковой ролью.

  • world_size: количество глобальных рабочих.

  • role_world_size: количество воркеров с одинаковой ролью.

class Worker:
    """
    Represents a worker instance. Contrast this with ``WorkerSpec`` that
    represents the specifications of a worker. A ``Worker`` is created from
    a ``WorkerSpec``. A ``Worker`` is to a ``WorkerSpec`` as an object is to
    a class.

    The ``id`` of the worker is interpreted
    by the specific implementation of ``ElasticAgent``. For a local
    agent, it could be the ``pid (int)`` of the worker, for a remote
    agent it could be encoded as ``host:port (string)``.

    Args:
        id (Any): uniquely identifies a worker (interpreted by the agent)
        local_rank (int): local rank of the worker
        global_rank (int): global rank of the worker
        role_rank (int): rank of the worker across all workers that have the same role
        world_size (int): number of workers (globally)
        role_world_size (int): number of workers that have the same role
    """

    def __init__(
        self,
        local_rank: int,
        global_rank: int = -1,
        role_rank: int = -1,
        world_size: int = -1,
        role_world_size: int = -1,
    ):
        # unique identifier for this worker
        self.id: Any = None

        # rank of the worker among workers with the same role being monitored
        # by the same ``agent`` instance.
        self.local_rank: int = local_rank

        #  rank of the worker among all the workers across all roles
        #  across all ``agent`` instances.
        #  Global rank is not stable between re-rendezvous.
        self.global_rank: int = global_rank

        #  rank of the worker among all the workers with the same role
        #  across all ``agent`` instances.
        #  Global rank is not stable between re-rendezvous.
        self.role_rank: int = role_rank

        # total number of workers (globally). Due to elasticity
        # the world size may change between re-rendezvous.
        self.world_size: int = world_size

        # total number of workers that share the same role. Due to elasticity
        # the role world size may change between re-rendezvous.
        self.role_world_size: int = role_world_size

3.2 WorkerGroup

WorkerGroup представляет собой рабочую группу и управляет несколькими работниками в целом для пакетной обработки.

class WorkerGroup:
    """
    Represents the set of ``Worker`` instances for the given ``WorkerSpec``
    managed by ``ElasticAgent``. Whether the worker group contains cross
    instance workers or not depends on the implementation of the agent.
    """
    def __init__(self, spec: WorkerSpec):
        self.spec = spec
        self.workers = [Worker(local_rank=i) for i in range(self.spec.local_world_size)]

        # assigned after rdzv
        self.store = None
        self.group_rank = None
        self.group_world_size = None

        self.state = WorkerState.INIT

Во время инициализации SimpleElasticAgent создается WorkerGroup.

class SimpleElasticAgent(ElasticAgent):
    """
    An ``ElasticAgent`` that manages workers (``WorkerGroup``)
    for a single ``WorkerSpec`` (e.g. one particular type of worker role).
    """

    def __init__(self, spec: WorkerSpec, exit_barrier_timeout: float = 300):
        self._worker_group = WorkerGroup(spec)
        self._remaining_restarts = self._worker_group.spec.max_restarts
        self._store = None
        self._exit_barrier_timeout = exit_barrier_timeout
        self._total_execution_time = 0

3.3 WorkerState

Представление WorkerStateWorkerGroupстатус. Все воркеры в рабочей группе сохраняют/изменяют состояние как единое целое. Если один работник в рабочей группе выходит из строя, вся рабочая группа считается сбойной:

  UNKNOWN - agent lost track of worker group state, unrecoverable
  INIT - worker group object created not yet started
  HEALTHY - workers running and healthy
  UNHEALTHY - workers running and unhealthy
  STOPPED - workers stopped (interruped) by the agent
  SUCCEEDED - workers finished running (exit 0)
  FAILED - workers failed to successfully finish (exit !0)

Конкретные значения этих состояний следующие:

  • НЕИЗВЕСТНО — агент потерял отслеживание состояния рабочей группы и не может восстановить

  • INIT - Созданный объект рабочей группы не запущен

  • ЗДОРОВЫЙ работник работает здоровым

  • НЕЗДОРОВЫЙ-рабочий работает, но не здоров

  • STOPPED - агент останавливает (прерывает) воркер

  • SUCCEEDED-worker завершил работу (значение выхода равно 0)

  • FAILED-worker не удалось успешно завершить (значение выхода не равно 0)

Рабочая группа с начальногоINITСостояние начинается, затем переходит в «здоровое» или «нездоровое» состояние и, наконец, достигает конечного состояния «успех» или «неудача». Рабочие группы могут быть прерваны агентами и временно переведены в «остановленное» состояние. Рабочие процессы в состоянии «остановлено» могут быть запланированы для перезапуска в ближайшем будущем.已停止的Примеры состояний:

  • Наблюдается сбой рабочей группы | нездоровый
  • Обнаружено изменение участника

Когда операция в рабочей группе (запуск, остановка, rdzv, повторная попытка и т. д.) завершается сбоем и приводит к частичному применению операции к рабочей группе, статус будет «неизвестен». Обычно это происходит, когда во время изменения состояния возникает исключение, которое не перехвачено/не обработано. Когда рабочая группа находится в «неизвестном» состоянии, агент не возобновит работу рабочей группы, поэтому лучше отменить задание и дать возможность узлу повторить попытку с помощью диспетчера заданий.

WorkerState определяется следующим образом:

class WorkerState(str, Enum):
    """
    State of the ``WorkerGroup``. Workers in a worker group change state as a unit.
    If a single worker in a worker group fails the entire set is considered
    failed::

      UNKNOWN - agent lost track of worker group state, unrecoverable
      INIT - worker group object created not yet started
      HEALTHY - workers running and healthy
      UNHEALTHY - workers running and unhealthy
      STOPPED - workers stopped (interruped) by the agent
      SUCCEEDED - workers finished running (exit 0)
      FAILED - workers failed to successfully finish (exit !0)


    A worker group starts from an initial ``INIT`` state,
    then progresses to ``HEALTHY`` or ``UNHEALTHY`` states,
    and finally reaches a terminal ``SUCCEEDED`` or ``FAILED`` state.

    Worker groups can be interrupted and temporarily put into ``STOPPED`` state
    by the agent. Workers in ``STOPPED`` state are scheduled to be restarted
    in the near future by the agent. Some examples of workers being put into
    ``STOPPED`` state are:

    1. Worker group failure|unhealthy observed
    2. Membership change detected

    When actions (start, stop, rdzv, retry, etc) on worker group fails
    and results in the action being partially applied to the worker group
    the state will be ``UNKNOWN``. Typically this happens on uncaught/unhandled
    exceptions during state change events on the agent. The agent is not
    expected to recover worker groups in ``UNKNOWN`` state and is better off
    self terminating and allowing the job manager to retry the node.
    """

    UNKNOWN = "UNKNOWN"
    INIT = "INIT"
    HEALTHY = "HEALTHY"
    UNHEALTHY = "UNHEALTHY"
    STOPPED = "STOPPED"
    SUCCEEDED = "SUCCEEDED"
    FAILED = "FAILED"

    @staticmethod
    def is_running(state: "WorkerState") -> bool:
        """
        Returns:
             True if the worker state represents workers still running
             (e.g. that the process exists but not necessarily healthy).
        """
        return state in {WorkerState.HEALTHY, WorkerState.UNHEALTHY}

0x04 SimpleElasticAgent

SimpleElasticAgent — это один из классов реализации агента. Эта абстракция предназначена для облегчения расширения новых реализаций агентов. Как видно сзади, текущий встроенныйLocalElasticAgentОтвечает за управление всеми рабочими процессами на одной машине. Если пользователи хотят управлять всеми рабочими процессами на нескольких машинах с помощью только одного агента, а не только локальными рабочими процессами, они могут расширитьSimpleElasticAgentдля реализации пользовательского агента.

class SimpleElasticAgent(ElasticAgent):
    """
    An ``ElasticAgent`` that manages workers (``WorkerGroup``)
    for a single ``WorkerSpec`` (e.g. one particular type of worker role).
    """

    def __init__(self, spec: WorkerSpec, exit_barrier_timeout: float = 300):
        self._worker_group = WorkerGroup(spec)
        self._remaining_restarts = self._worker_group.spec.max_restarts
        self._store = None
        self._exit_barrier_timeout = exit_barrier_timeout
        self._total_execution_time = 0

4.1 Общая работа

Основной цикл SimpleElasticAgent _invoke_run — это основная логика (здесь агент по умолчанию и рабочий процесс находятся на одном компьютере), который выполняет следующие действия:

  • использоватьself._initialize_workers(self._worker_group)Полная работа по инициализации, такая как запуск воркеров, присвоение рангов каждому воркеру и т. д.
  • Затем войдите в цикл while True, в котором _monitor_workers используется для периодического обучения статусу выполнения пользовательской программы для получения результатов выполнения рабочего процесса, а затем выполняет различные обработки в зависимости от ситуации.
    • Возвращает, если программа завершается нормально.
    • Если программа не удалась, повторите попытку, если количество попыток достигнуто, завершите рабочий процесс.
    • Если членство в узле изменится, например, при масштабировании, будут ожидать новые узлы, и в это время все рабочие процессы будут перезапущены.
    def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
        # NOTE: currently only works for a single role

        spec = self._worker_group.spec
        role = spec.role

        self._initialize_workers(self._worker_group) # 启动worker
        monitor_interval = spec.monitor_interval
        rdzv_handler = spec.rdzv_handler

        while True:
            assert self._worker_group.state != WorkerState.INIT
            # 定期监控
            time.sleep(monitor_interval)
            # 监控客户程序运行情况
            run_result = self._monitor_workers(self._worker_group) # 得到进程运行结果
            state = run_result.state
            self._worker_group.state = state

            put_metric(f"workers.{role}.remaining_restarts", self._remaining_restarts)
            put_metric(f"workers.{role}.{state.name.lower()}", 1)

            if state == WorkerState.SUCCEEDED:
                # 程序正常结束
                self._exit_barrier()
                return run_result
            elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
                # 程序出错
                if self._remaining_restarts > 0: # 重试
                    self._remaining_restarts -= 1
                    self._restart_workers(self._worker_group)
                else:
                    self._stop_workers(self._worker_group) # 重试次数达到,结束workers
                    self._worker_group.state = WorkerState.FAILED
                    self._exit_barrier()
                    return run_result
            elif state == WorkerState.HEALTHY:
                # 节点成员关系有变化,比如scale up,就会有新节点waiting
                # membership changes do not count as retries
                num_nodes_waiting = rdzv_handler.num_nodes_waiting()
                group_rank = self._worker_group.group_rank
                # 如果有新的节点在waiting,就重启所有workers
                if num_nodes_waiting > 0:
                    self._restart_workers(self._worker_group)
            else:
                raise Exception(f"[{role}] Worker group in {state.name} state")

Вышеизложенное является лишь обзором всего процесса, и мы проанализируем весь процесс один за другим.

4.2 Инициализация воркеров

В основном цикле прокси сначала используйтеself._initialize_workers(self._worker_group)запустить рабочий. существует_initialize_workersСреди:

  • первое использованиеself._rendezvous(worker_group)Выполнение синхронных операций консенсуса между узлами и ранговой обработки и т. д.
  • Следующий звонок_start_workersЗапуск рабочих. здесь_start_workersЭто виртуальная функция и должна быть реализована производным классом.
    @prof
    def _initialize_workers(self, worker_group: WorkerGroup) -> None:
        r"""
        Starts a fresh set of workers for the worker_group.
        Essentially a rendezvous followed by a start_workers.

        The caller should first call ``_stop_workers()`` to stop running workers
        prior to calling this method.

        Optimistically sets the state of the worker group that
        just started as ``HEALTHY`` and delegates the actual monitoring
        of state to ``_monitor_workers()`` method
        """
        role = worker_group.spec.role

        # TODO after stopping workers, wait at least monitor_interval*2 for
        # workers on different nodes to fail on a collective op before waiting
        # on the rdzv barrier, this way we ensure that nodes enter rdzv
        # at around the same time and reduce false positive rdzv timeout errors
        self._rendezvous(worker_group) # 同步共识操作 

        worker_ids = self._start_workers(worker_group) # 启动worker
        for local_rank, w_id in worker_ids.items():
            worker = worker_group.workers[local_rank]
            worker.id = w_id

        worker_group.state = WorkerState.HEALTHY

4.2.1 _rendezvous

Давайте сначала посмотрим на _rendezvous, который делает следующее:

  • Вызовите next_rendezvous() для обработки изменений членства, которые вернут размер мира, хранилище и т. д.
  • Хранилище будет настроено в рабочую группу, и последующие воркеры смогут общаться через этот kvstore.
  • Вызов _assign_worker_ranks создаст рабочих и установит для них ранги.Возвращенные рабочие присваиваются worker_group.workers агента.

Вышеупомянутые две точки обрабатываются с использованием информации о рандеву, такой как извлечение рангов из рандеву.

    @prof
    def _rendezvous(self, worker_group: WorkerGroup) -> None:
        r"""
        Runs rendezvous for the workers specified by worker spec.
        Assigns workers a new global rank and world size.
        Updates the rendezvous store for the worker group.
        """

        spec = worker_group.spec

        # 处理成员关系变化,注意,这里得到的是 group rank!
        store, group_rank, group_world_size = spec.rdzv_handler.next_rendezvous()
        self._store = store # store被设置到 Agent之中,store可以被认为是远端KV存储

        # 依据 group rank 为 worker 建立 ranks
        workers = self._assign_worker_ranks(store, group_rank, group_world_size, spec)
        worker_group.workers = workers
        worker_group.store = store
        worker_group.group_rank = group_rank
        worker_group.group_world_size = group_world_size

        if group_rank == 0:
            self._set_master_addr_port(store, spec.master_addr, spec.master_port)
        master_addr, master_port = self._get_master_addr_port(store)
        restart_count = spec.max_restarts - self._remaining_restarts
4.2.2.1 Обработка изменений членства

Elastic вызывает rdzv_handler.next_rendezvous() для обработки изменений членства, чтобы начать следующий раунд операций рандеву (поскольку рабочий процесс уже запущен и должен присоединиться к кластеру).

Обратите внимание, что next_rendezvous — это внутренняя функция RendezvousHandler. Вызов этой функции будет заблокирован до тех пор, пока количество воркеров не будет соответствовать требованиям. Эта функция вызывается при инициализации или перезапуске Worker. Когда функция возвращается, разныеworker groupРанг в возврате будет использоваться как единственный идентификатор. Его внутренняя логика такова:

  • использовать сначала_RendezvousExitOpДайте узлу выйти.
  • затем используйте_RendezvousJoinOpВоссоединитесь с узлом.
  • Наконец, запустите сердцебиение и верните размер мира, магазин и т. д.
    def next_rendezvous(self) -> Tuple[Store, int, int]:
        """See base class."""

        self._stop_heartbeats()

        # Delay the execution for a small random amount of time if this is our
        # first run. This will slightly skew the rendezvous attempts across the
        # nodes and reduce the load on the backend.
        if self._state_holder.state.round == 0:
            _delay(seconds=(0, 0.3))

        exit_op = _RendezvousExitOp()
        join_op = _RendezvousJoinOp()

        deadline = self._get_deadline(self._settings.timeout.join)

        self._op_executor.run(exit_op, deadline)
        self._op_executor.run(join_op, deadline)

        self._start_heartbeats()

        rank, world_size = self._get_world()
        store = self._get_store()

        return store, rank, world_size # 返回的是 worker group 的rank
4.2.3.2 Присвоение рангов работникам

Затем вызовите _assign_worker_ranks, чтобы создать ранги для рабочих. Алгоритм присвоения рангов следующий:

  1. Каждый агент записывает свою конфигурацию (group_rank, group_world_size, num_workers) в общедоступное хранилище.
  2. Каждый агент получает конфигурацию всех агентов и выполняет двухуровневую сортировку по роли и рангу.
  3. Определить глобальный ранг: глобальный ранг текущего агента — это смещение group_rank этого агента в массиве информации. Смещение рассчитывается как сумма local_worlds всех агентов с рангом ниже group_rank. Уровень воркеров: [смещение, смещение+local_world_size].
  4. Определить ранг роли: Определите ранг роли, используя алгоритм из пункта 3, с той разницей, что расчет смещения начинается с первого агента с той же ролью, что и текущая роль, и с наименьшим групповым рангом.
  5. Поскольку все агенты используют один и тот же алгоритм, их вычисляемый массив рангов одинаков.

Затем создайте воркеров и назначьте воркеров в worker_group.workers.

@prof
def _assign_worker_ranks(
    self, store, group_rank: int, group_world_size: int, spec: WorkerSpec
) -> List[Worker]:
    """
    Determines proper ranks for worker processes. The rank assignment
    is done according to the following algorithm:

    1. Each agent writes its configuration(group_rank, group_world_size
       , num_workers) to the common store.
    2. Each agent retrieves configuration for all agents
       and performs two level sort using role and rank.
    3. Determine the global rank: the global rank of the workers for the current
       agent is the offset of the infos array up to group_rank of the agent.
       The offset is computed as a sum of local_world_size of all agents that
       have rank less than the group_rank. The workers would have the ranks:
       [offset, offset+local_world_size)
    4. Determine the role rank: The role rank is determined using the algorithms
       in the point 3 with the exception that the offset is done from the first
       agent that has the same role as current one and has the minimum group rank.
    """

    # 每个代理将其配置(group_rank, group_world_size, num_workers)写入公共存储。
    role_infos = self._share_and_gather(store, group_rank, group_world_size, spec)
    # 每个代理检索所有代理的配置,并使用角色和rank执行两级排序。
    my_role_info = role_infos[group_rank]
    # 确定全局rank:当前代理的global rank是 本代理 的 group_rank 在infos数组的偏移量(offset)。偏移量的计算方法是,排名低于group_rank的所有代理的local_world之和。workers 的等级为:[offset, offset+local_world_size]。
    worker_world_size, worker_global_ranks = self._get_ranks(role_infos, group_rank)
    role_infos = sorted(
        role_infos, key=functools.cmp_to_key(_RoleInstanceInfo.compare)
    )
    role_start_idx, role_end_idx = _RoleInstanceInfo.find_role_boundaries(
        role_infos, my_role_info.role
    )
    role_pos = next(
        idx
        for idx, role_info in enumerate(role_infos)
        if _RoleInstanceInfo.compare(role_info, my_role_info) == 0
    )
    # 确定role rank:使用第3点中的算法确定role rank,不同之处是:偏移量计算是从与当前角色相同且具有最小 group rank 的第一个代理开始。
    role_world_size, role_ranks = self._get_ranks(
        role_infos, role_pos, role_start_idx, role_end_idx + 1
    )
    # 生成 workers,把 worker 都赋值在 worker_group.workers 之中。
    workers = []
    for ind in range(spec.local_world_size):
        worker = Worker(
            local_rank=ind,
            global_rank=worker_global_ranks[ind],
            role_rank=role_ranks[ind],
            world_size=worker_world_size,
            role_world_size=role_world_size,
        )
        workers.append(worker)
    return workers

4.2.4 Запуск рабочего процесса

Вызовите _start_workers производного класса, чтобы запустить рабочий процесс, поэтому базовый класс здесь не реализован, позже мы увидим, как производный класс реализует его.

    @abc.abstractmethod
    def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
        r"""
        Starts ``worker_group.spec.local_world_size`` number of workers
        according to worker spec for the worker group .

        Returns a map of ``local_rank`` to worker ``id``.
        """
        raise NotImplementedError()

Текущая логика выглядит следующим образом, в частности:

  1. Вызовите rdzv_handler.next_rendezvous для синхронизации с другими узлами.
  2. rdzv_handler.next_rendezvous возвращает ранги и другую информацию в _assign_worker_ranks.
  3. _assign_worker_ranks генерирует несколько рабочих, где каждому рабочему автоматически присваивается ранг. На этих рабочих указывает worker_group.workers агента.
+--------------------------------------------------+
| LocalElasticAgent                                |         _initialize_workers
|                                                  |                 +
|                                                  |                 |
|                                                  |                 |
|   +----------------------+                       |                 v
|   |WorkerGroup           |                       |         _rendezvous(worker_group)
|   |                      |                       |                 +
|   |     spec             |                       |                 |
|   |                      |                       |                 | 1
|   |     group_world_size |                       |                 v
|   |                      |                       |        rdzv_handler.next_rendezvous()
|   |     store            |                       |                 +
|   |                      |    +----------------+ |                 |
|   |     group_rank       |    | Worker0(rank 0)| |               2 | ranks
|   |                      |    | Worker1(rank 1)| |  Workers        v
|   |     workers  +----------> | ...            | | <----+ _assign_worker_ranks
|   |                      |    | Workern(rank n)| |    3
|   +----------------------+    +----------------+ |
|                                                  |
+--------------------------------------------------+

Далее функции, связанные с рангом и работником, будут перечислены отдельно, чтобы все могли лучше понять.

4.3 связанные ранги

Предыдущий _assign_worker_ranks устанавливает ранги для рабочих, но есть некоторые внутренние детали, в которых нам нужно разобраться.

4.3.1 _RoleInstanceInfo

Здесь мы представим структуру данных _RoleInstanceInfo. Агенты используют этот класс для обмена информацией с другими агентами. Эта информация используется для определения ранга этого прокси-воркера. Эти агенты работают в гетерогенной среде, и у разных агентов может быть разное количество рабочих. Его параметры сборки:

  • role (str) : определяемая пользователем роль.
  • rank (int): Ранг агента.
  • local_world_size (int): количество местных рабочих.
class _RoleInstanceInfo:
    """
    The class is used by the agent to exchange the information with other agents.
    The information is used to determine the rank of the workers that agent
    manages in heterogeneous environments, where different agents can have
    different number of workers.
    """

    __slots__ = ["role", "rank", "local_world_size"]

    def __init__(self, role: str, rank: int, local_world_size: int):
        r"""

        Args:
            role (str): user-defined role for the workers with this spec
            rank (int): the rank of the agent
            local_world_size (int): number of local workers to run
        """

        self.role = role
        self.rank = rank
        self.local_world_size = local_world_size

    def serialize(self) -> bytes:
        dict_data = {
            "role": self.role,
            "rank": self.rank,
            "local_world_size": self.local_world_size,
        }
        return json.dumps(dict_data).encode(encoding="UTF-8")

    @staticmethod
    def deserialize(data: bytes):
        dict_data = json.loads(data.decode(encoding="UTF-8"))
        return _RoleInstanceInfo(
            dict_data["role"], dict_data["rank"], dict_data["local_world_size"]
        )

    @staticmethod
    def compare(obj1, obj2) -> int:
        if obj1.role == obj2.role:
            return obj1.rank - obj2.rank
        elif obj1.role > obj2.role:
            return 1
        else:
            return -1

    @staticmethod
    def find_role_boundaries(roles_infos: List, role: str) -> Tuple[int, int]:
        start_idx, end_idx = -1, -1
        for idx, role_info in enumerate(roles_infos):
            if role_info.role == role:
                if start_idx == -1:
                    start_idx = idx
                end_idx = idx
        return (start_idx, end_idx)

4.3.2 _share_and_gather

Роль _share_and_gather - это синхронизировать между агентами, чтобы получить общую информацию о роли.Каждый агент настроен (group_rank, group_world_size, num_workers) на общедоступное хранилище.. Вот использование магазина, возвращенного Rendezvous ранее для обмена информацией.

    def _share_and_gather(
        self, store, group_rank: int, group_world_size: int, spec: WorkerSpec
    ) -> List:
        agent_role_info = _RoleInstanceInfo(
            spec.role, group_rank, spec.local_world_size
        )
        key_prefix = "torchelastic/role_info"
        agent_config_enc = agent_role_info.serialize()
        role_infos_bytes = store_util.synchronize(
            store, agent_config_enc, group_rank, group_world_size, key_prefix
        )
        role_infos = [
            _RoleInstanceInfo.deserialize(role_info_bytes)
            for role_info_bytes in role_infos_bytes
        ]
        return role_infos

4.3.3 _get_ranks

Определить глобальный ранг в соответствии с информацией о роли: глобальный ранг текущего агента — это смещение group_rank этого агента в массиве информации. Смещение рассчитывается как сумма local_worlds всех агентов с рангом ниже group_rank. Уровень воркеров: [смещение, смещение+local_world_size].

def _get_ranks(
    self,
    role_infos: List[_RoleInstanceInfo],
    role_idx: int,
    start_idx: int = 0,
    end_idx: int = -1,
) -> Tuple[int, List[int]]:
    if end_idx == -1:
        end_idx = len(role_infos)
    prefix_sum = 0
    total_sum = 0
    for idx in range(start_idx, end_idx):
        if role_idx > idx:
            prefix_sum += role_infos[idx].local_world_size
        total_sum += role_infos[idx].local_world_size
    return (
        total_sum,
        list(range(prefix_sum, prefix_sum + role_infos[role_idx].local_world_size)),
    )

Текущая логика расширяется следующим образом:

  1. Вызовите rdzv_handler.next_rendezvous() для синхронизации с другими узлами и получения информации.
  2. Получить в информации магазин (его можно считать удаленным хранилищем KV), group_world_size, group_rank и передать Агенту.
  3. Такая информация, как ранги, передается методу _assign_worker_ranks.
  4. Среди _assign_worker_ranks,Вызовите _share_and_gather для синхронизации между агентами, чтобы получить общую информацию о ролях. Каждый агент записывает свою конфигурацию (group_rank, group_world_size, num_workers) в общедоступное хранилище KV.
  5. Определить глобальный ранг в соответствии с информацией о роли: глобальный ранг текущего агента — это смещение group_rank этого агента в массиве информации. Смещение рассчитывается как сумма local_worlds всех агентов с рангом ниже group_rank.
  6. Создайте серию рабочих, используя различную информацию.
  7. Воркеры копируются в WorkerGroup агента.
                                                              _initialize_workers
                                                                      +
                                                                      |
                                                                      |
                                                                      v
                                                              _rendezvous(worker_group)
                                                                      +
+----------------------------------------------+                      |
| LocalElasticAgent                            |                      | 1
|                                              |   2                  v
|                                         +--------------+  rdzv_handler.next_rendezvous()
| +--------------------+                  |    |                      +
| | WorkerGroup        |                  |    |                      |
| |                    |                  |    |                    3 | ranks
| |                    |                  |    |                      v
| |  spec              |                  |    |       +--------------+------------------+
| |                    |                  |    |       | _assign_worker_ranks            |
| |                    |                  |    |       |                                 |
| |  store   <----------------------------+    |       |                        4        |
| |                    |                  |    |       | role_infos = _share_and_gather( |
| |                    |                  |    |       |               +          store) |
| |  group_world_size<--------------------+    |       |               | 5               |
| |                    |                  |    |       |               |                 |
| |                    |                  |    |       |               v                 |
| |  group_rank <-------------------------+    |       |          _get_ranks(world...)   |
| |                    |                       |       |          _get_ranks(role...)    |
| |                    |   +----------------+  |       |               +                 |
| |  workers  +----------->+ Worker0(rank 0)|  |       |               |                 |
| |                    |   | Worker1(rank 1)|  |       |               | 6               |
| |                    |   | ...            |  |Workers|               v                 |
| |                    |   | Workern(rank n)+<------------+ new Worker(local_rank,       |
| +--------------------+   +----------------+  |    7  |               global_rank,      |
|                                              |       |               role_rank,        |
+----------------------------------------------+       |               world_size,       |
                                                       |               role_world_size)  |
                                                       |                                 |
                                                       +---------------------------------+

После операции _rendezvous был сгенерирован экземпляр Worker, поэтому давайте посмотрим, как сгенерировать процесс Worker. Но поскольку эти методы не реализованы в SimpleElasticAgent, нам необходимо проанализировать подраздел его производного класса LocalElasticAgent, чтобы продолжить расширение нашей логической схемы.

4.4 Связанные с работником

Давайте сначала посмотрим на оставшиеся две функции SimpleElasticAgent, связанные с рабочими процессами.

4.4.1 Перезагрузка

_restart_workers — перезапустить воркеры.

# pyre-fixme[56]: Pyre was not able to infer the type of the decorator
#  `torch.distributed.elastic.metrics.prof`.
@prof
def _restart_workers(self, worker_group: WorkerGroup) -> None:
    """
    Restarts (stops, rendezvous, starts) all local workers in the group.
    """

    role = worker_group.spec.role
    self._stop_workers(worker_group)
    worker_group.state = WorkerState.STOPPED
    self._initialize_workers(worker_group)

4.4.2 barrier

На самом деле практически невозможно гарантировать, что все воркеры DDP могут быть гарантированно завершены в одно и то же время, поэтому TE предоставляет барьер финализации Роль этого барьера заключается в реализации тайм-аута (5 минут) для финализации воркера.

    def _exit_barrier(self):
        """
        Wait for ``exit_barrier_timeout`` seconds for all agents to finish
        executing their local workers (either successfully or not). This
        acts as a safety guard against user scripts that terminate at different
        times. This barrier keeps the agent process alive until all workers finish.
        """
        start = time.time()
        try:
            store_util.barrier(
                self._store,
                self._worker_group.group_rank,
                self._worker_group.group_world_size,
                key_prefix=_TERMINAL_STATE_SYNC_ID,
                barrier_timeout=self._exit_barrier_timeout,
            )
        except Exception:
            log.exception(
                f"Error waiting on exit barrier. Elapsed: {time.time() - start} seconds"
            )

0x05 LocalElasticAgent

LocalElasticAgent — это конечный агент, используемый для обучения эластичности. Он в основном используется для локальных операций и отвечает за управление всеми рабочими процессами на одном компьютере. Он является производным от SimpleElasticAgent.

Этот агент развертывается на каждом хосте и настроен для созданияnрабочий процесс. При использовании графических процессоров n — это количество графических процессоров, доступных на хосте. Локальные агенты не взаимодействуют с другими локальными агентами, развернутыми на других хостах, даже если рабочие процессы могут обмениваться данными между хостами. Идентификатор рабочего процесса интерпретируется как локальный процесс. Агент запускает и останавливает все рабочие процессы на машине в целом.

Функции и аргументы, передаваемые рабочему процессу, должны быть совместимы с многопроцессорной обработкой Python. Чтобы передать структуру данных многопроцессорной обработки рабочему процессу, пользователь может создать структуру данных в той же многопроцессорной многопроцессорной обработке, что и указанный start_method, и передать ее в качестве параметра функции.

exit_barrier_timeout используется для указания времени (в секундах) ожидания завершения работы других агентов. Это действует как защитная сетка для обработки ситуаций, когда рабочие заканчивают работу в разное время, чтобы агент не рассматривал работника, который заканчивает работу раньше, как событие масштабирования. Пользовательскому коду настоятельно рекомендуется обеспечивать синхронное завершение рабочих операций, а не полагаться на exit_barrier_timeout.

SimpleElasticAgent в основном обеспечивает инициализацию и общий режим работы, но оставляет некоторые абстрактные функции, которые не были реализованы, такие как_start_workers,_stop_workers,_monitor_workers,_shutdown. LocalElasticAgent дополняет эти функции.

class LocalElasticAgent(SimpleElasticAgent):
    """
    An implementation of :py:class:`torchelastic.agent.server.ElasticAgent`
    that handles host-local workers.
    This agent is deployed per host and is configured to spawn ``n`` workers.
    When using GPUs, ``n`` maps to the number of GPUs available on the host.

    The local agent does not communicate to other local agents deployed on
    other hosts, even if the workers may communicate inter-host. The worker id
    is interpreted to be a local process. The agent starts and stops all worker
    processes as a single unit.


    The worker function and argument passed to the worker function must be
    python multiprocessing compatible. To pass multiprocessing data structures
    to the workers you may create the data structure in the same multiprocessing
    context as the specified ``start_method`` and pass it as a function argument.

    The ``exit_barrier_timeout`` specifies the amount of time (in seconds) to wait
    for other agents to finish. This acts as a safety net to handle cases where
    workers finish at different times, to prevent agents from viewing workers
    that finished early as a scale-down event. It is strongly advised that the
    user code deal with ensuring that workers are terminated in a synchronous
    manner rather than relying on the exit_barrier_timeout.
    """

    def __init__(
        self,
        spec: WorkerSpec,
        start_method="spawn",
        exit_barrier_timeout: float = 300,
        log_dir: Optional[str] = None,
    ):
        super().__init__(spec, exit_barrier_timeout)
        self._start_method = start_method
        self._pcontext: Optional[PContext] = None
        rdzv_run_id = spec.rdzv_handler.get_run_id()
        self._log_dir = self._make_log_dir(log_dir, rdzv_run_id)

    def _make_log_dir(self, log_dir: Optional[str], rdzv_run_id: str):
        base_log_dir = log_dir or tempfile.mkdtemp(prefix="torchelastic_")
        os.makedirs(base_log_dir, exist_ok=True)
        dir = tempfile.mkdtemp(prefix=f"{rdzv_run_id}_", dir=base_log_dir)
        return dir

5.1 Использование

Давайте сначала извлечем код из его комментариев и посмотрим, как его использовать. Вот как начать с функции в качестве записи.

    def trainer(args) -> str:
        return "do train"

    def main():
        start_method="spawn"
        shared_queue= multiprocessing.get_context(start_method).Queue()
        spec = WorkerSpec(
                    role="trainer",
                    local_world_size=nproc_per_process,
                    entrypoint=trainer,
                    args=("foobar",),
                    ...<OTHER_PARAMS...>)
        agent = LocalElasticAgent(spec, start_method)
        results = agent.run()

        if results.is_failed():
            print("trainer failed")
        else:
            print(f"rank 0 return value: {results.return_values[0]}")
            # prints -> rank 0 return value: do train

Вот как начать с двоичного кода в качестве записи.

    def main():
        spec = WorkerSpec(
                    role="trainer",
                    local_world_size=nproc_per_process,
                    entrypoint="/usr/local/bin/trainer",
                    args=("--trainer_args", "foobar"),
                    ...<OTHER_PARAMS...>)
        agent = LocalElasticAgent(spec)
        results = agent.run()

        if not results.is_failed():
            print("binary launches do not have return values")

После операции _rendezvous был сгенерирован экземпляр Worker, поэтому давайте посмотрим, как сгенерировать процесс Worker.

5.2 Стоп

Следующая функция останавливает рабочих.

    @prof
    def _stop_workers(self, worker_group: WorkerGroup) -> None:
        self._shutdown()
        
    def _shutdown(self) -> None:
        if self._pcontext:
            self._pcontext.close()        

5.3 Инициализация

Продолжим предыдущую статью.После операции _rendezvous был сгенерирован экземпляр Worker.Далее посмотрим, как сгенерировать процесс Worker. Поскольку эти методы ранее не были реализованы в SimpleElasticAgent, мы продолжаем расширять нашу логическую схему в этом обзоре.

Давайте сначала взглянем на инициализацию воркеров. В _initialize_workers сначала используйте _rendezvous для создания экземпляра воркеров, а затем вызовите _start_workers для запуска воркеров.

    @prof
    def _initialize_workers(self, worker_group: WorkerGroup) -> None:
        r"""
        Starts a fresh set of workers for the worker_group.
        Essentially a rendezvous followed by a start_workers.

        The caller should first call ``_stop_workers()`` to stop running workers
        prior to calling this method.

        Optimistically sets the state of the worker group that
        just started as ``HEALTHY`` and delegates the actual monitoring
        of state to ``_monitor_workers()`` method
        """
        role = worker_group.spec.role

        # TODO after stopping workers, wait at least monitor_interval*2 for
        # workers on different nodes to fail on a collective op before waiting
        # on the rdzv barrier, this way we ensure that nodes enter rdzv
        # at around the same time and reduce false positive rdzv timeout errors
        self._rendezvous(worker_group) # Worker实例已经生成了

        worker_ids = self._start_workers(worker_group) # 启动Worker进程
        for local_rank, w_id in worker_ids.items():
            worker = worker_group.workers[local_rank]
            worker.id = w_id # 得到进程ID

        worker_group.state = WorkerState.HEALTHY

5.4 Запуск рабочего процесса

_start_workersМетод вызовет start_processes для запуска рабочего процесса, _start_method по умолчанию — «spawn». То есть запускается несколько процессов, и программа пользователя выполняется параллельно. В то же время будут контролироваться текущие результаты этих процессов. Среди параметров start_processesentrypointиargsЭто пользовательская команда и параметры, точка входа может быть функцией или строкой.

_start_workersСохраните результат запуска многопоточности методом start_processes в _pcontext, а затем используйте _pcontext для продолжения управления.Например, для завершения worker'а нужно напрямую вызвать метод close _pcontext.

    @prof
    def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
        spec = worker_group.spec
        store = worker_group.store
        assert store is not None
        master_addr, master_port = super()._get_master_addr_port(store)
        restart_count = spec.max_restarts - self._remaining_restarts

        use_agent_store = spec.rdzv_handler.get_backend() == "static"

        args: Dict[int, Tuple] = {}
        envs: Dict[int, Dict[str, str]] = {}
        for worker in worker_group.workers:
            local_rank = worker.local_rank
            worker_env = {
                "LOCAL_RANK": str(local_rank),
                "RANK": str(worker.global_rank),
                "GROUP_RANK": str(worker_group.group_rank),
                "ROLE_RANK": str(worker.role_rank),
                "ROLE_NAME": spec.role,
                "LOCAL_WORLD_SIZE": str(spec.local_world_size),
                "WORLD_SIZE": str(worker.world_size),
                "GROUP_WORLD_SIZE": str(worker_group.group_world_size),
                "ROLE_WORLD_SIZE": str(worker.role_world_size),
                "MASTER_ADDR": master_addr,
                "MASTER_PORT": str(master_port),
                "TORCHELASTIC_RESTART_COUNT": str(restart_count),
                "TORCHELASTIC_MAX_RESTARTS": str(spec.max_restarts),
                "TORCHELASTIC_RUN_ID": spec.rdzv_handler.get_run_id(),
                "TORCHELASTIC_USE_AGENT_STORE": str(use_agent_store),
                "NCCL_ASYNC_ERROR_HANDLING": str(1),
            }
            if "OMP_NUM_THREADS" in os.environ:
                worker_env["OMP_NUM_THREADS"] = os.environ["OMP_NUM_THREADS"]
            envs[local_rank] = worker_env
            worker_args = list(spec.args)
            worker_args = macros.substitute(worker_args, str(local_rank))
            args[local_rank] = tuple(worker_args)

        # scaling events do not count towards restarts (gets same attempt #)
        # remove existing log dir if this restart is due to a scaling event
        attempt_log_dir = os.path.join(self._log_dir, f"attempt_{restart_count}")
        shutil.rmtree(attempt_log_dir, ignore_errors=True)
        os.makedirs(attempt_log_dir)

        self._pcontext = start_processes( # 把启动多线程的结果保存在 _pcontext 之中。
            name=spec.role,
            entrypoint=spec.entrypoint,
            args=args,
            envs=envs,
            log_dir=attempt_log_dir,
            start_method=self._start_method,
            redirects=spec.redirects,
            tee=spec.tee,
        )

        return self._pcontext.pids()

5.5 Мониторинг

После запуска TE позвонит_monitor_workersКонтролируйте рабочих. Перед сохранением результата запуска многопоточности в_pcontext, используйте его сейчас_pcontextСледите за работой.

    @prof
    def _monitor_workers(self, worker_group: WorkerGroup) -> RunResult:
        role = worker_group.spec.role
        worker_pids = {w.id for w in worker_group.workers}
        assert self._pcontext is not None
        pc_pids = set(self._pcontext.pids().values())
        if worker_pids != pc_pids:
            return RunResult(state=WorkerState.UNKNOWN)

        result = self._pcontext.wait(0) # 对运行结构进行监控
        if result:
            if result.is_failed(): # 如果进程失败
                # map local rank failure to global rank
                worker_failures = {}
                #  返回的结果内部就包括每个进程的运行结果
                for local_rank, failure in result.failures.items():
                    worker = worker_group.workers[local_rank]
                    worker_failures[worker.global_rank] = failure
                return RunResult(
                    state=WorkerState.FAILED,
                    failures=worker_failures, # 返回运行结果
                )
            else:
                # copy ret_val_queue into a map with a global ranks
                workers_ret_vals = {}
                for local_rank, ret_val in result.return_values.items():
                    worker = worker_group.workers[local_rank]
                    workers_ret_vals[worker.global_rank] = ret_val
                return RunResult(
                    state=WorkerState.SUCCEEDED,
                    return_values=workers_ret_vals, # 返回运行结果
                )
        else:
            return RunResult(state=WorkerState.HEALTHY)

Поскольку запуск и мониторинг включают в себя общую логику работы системы, которую необходимо лучше понять вместе со рандеву, поэтому мы откладываем эту часть анализа и ждем, пока Рандеву проведет общий анализ.

Текущая общая логика выглядит следующим образом:

  1. Вызовите rdzv_handler.next_rendezvous() для синхронизации с другими узлами и получения информации.
  2. Получить в информации магазин (его можно считать удаленным хранилищем KV), group_world_size, group_rank и передать Агенту.
  3. Такая информация, как ранги, передается методу _assign_worker_ranks.
  4. В _assign_worker_ranks вызовите _share_and_gather для синхронизации между агентами и получения общей информации о ролях. Каждый агент записывает свою конфигурацию (group_rank, group_world_size, num_workers) в общедоступное хранилище KV.
  5. Определить глобальный ранг в соответствии с информацией о роли: глобальный ранг текущего агента — это смещение group_rank этого агента в массиве информации. Смещение рассчитывается как сумма local_worlds всех агентов с рангом ниже group_rank.
  6. Создайте серию рабочих, используя различную информацию.
  7. Воркеры копируются в WorkerGroup агента.
  8. Используйте _start_workers для запуска рабочих процессов.
  9. Назначьте идентификатор рабочего процесса для worker.id Агента, чтобы этот worker.id можно было использовать для управления процессом в будущем.
  10. Используйте _monitor_workers для мониторинга рабочих процессов.
  11. Используйте _exit_barrier, чтобы дождаться завершения рабочего процесса.
                                                              _initialize_workers
                                                                      +
                                                                      |
                                                                      |
                                                                      v
                                                              _rendezvous(worker_group)
                                                                      +
+----------------------------------------------+                      |
| LocalElasticAgent                            |                      | 1
|                                              |   2                  v
|                                         +--------------+  rdzv_handler.next_rendezvous()
| +--------------------+                  |    |                      +
| | WorkerGroup        |                  |    |                      |
| |                    |                  |    |                    3 | ranks
| |                    |                  |    |                      v
| |  spec              |                  |    |       +--------------+------------------+
| |                    |                  |    |       | _assign_worker_ranks            |
| |                    |                  |    |       |                                 |
| |  store   <----------------------------+    |       |                        4        |
| |                    |                  |    |       | role_infos = _share_and_gather( |
| |                    |                  |    |       |               +          store) |
| |  group_world_size<--------------------+    |       |               | 5               |
| |                    |                  |    |       |               |                 |
| |                    |                  |    |       |               v                 |
| |  group_rank <-------------------------+    |       |          _get_ranks(world...)   |
| |                    |                       |       |          _get_ranks(role...)    |
| |                    |   +----------------+  |       |               +                 |
| |  workers  +----------->+ Worker0(rank 0)|  |       |               |                 |
| |                    |   | Worker1(rank 1)|  |       |               | 6               |
| |                    |   | ...            |  |Workers|               v                 |
| |                    |   | Workern(rank n)+<------------+ new Worker(local_rank,       |
| +--------------------+   +---------+------+  |    7  |               global_rank,      |
|                                    ^         |       |               role_rank,        |
|                                    |         |       |               world_size,       |
|                                    |         |       |               role_world_size)  |
+----------------------------------------------+       |                                 |
                                     |                 +---------------+-----------------+
                                     |                                 |
                                     |                                 | 8
                                     |              9                  v
                                     +-----------------------+   _start_workers
                                                                       +
                                                                       | 10
                                                                       |
                                                                       v
                                                       +---------------+--------------+
                                                       | state = _monitor_workers     |
                                                  +--> |                              +-->
                                                  |    +---------------+--------------+  |
                                                  |                    |                 |
                                                  <--------------------------------------+
                                                     LOOP  Every 30S   |
                                                                       | 11
                                                                       v
                                                                    _exit_barrier

Телефон такой:

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

TorchElastic — эластичное отказоустойчивое распределенное обучение