[Анализ исходного кода] Распределенное эластичное обучение PyTorch (5) --- Механизм рандеву

машинное обучение PyTorch исходный код

0x00 сводка

В предыдущих статьях мы изучили основные распространяемые модули PyTorch и представили несколько официальных примеров. Далее мы представим эластичное обучение PyTorch. Эта статья пятая. Давайте посмотрим на внутренний движок Rendezvous. , например, как обрабатывать присоединение к узлу, выход из узла, ожидание, пульс и т. д.

Статьи о тренировках на эластичность следующие:

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (1) --- общая идея

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (2) --- запуск и процесс с одним узлом

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (3) --- агент

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (4) --- Архитектура и логика рандеву

0x01 Предисловие

1.1 Общая система

Эластичный тренинг можно понимать как систему бега на основе Rendezvous.

  • Агент предпочитает логику на определенных узлах

    • Агент отвечает за определенные операции, связанные с бизнес-логикой, такие как запуск процесса для выполнения пользовательской программы, мониторинг состояния выполнения пользовательской программы и уведомление Rendezvous в случае возникновения исключения.
    • Агент — это менеджер рабочих процессов, отвечающий за запуск и управление рабочими процессами, формирование рабочей группы, отслеживание рабочего состояния рабочих процессов, сбор данных об отказавших рабочих процессах и перезапуск группы рабочих процессов в случае сбоя или нового добавленного рабочего процесса.
    • Агент отвечает за сохранение информации WORLD_SIZE и RANK. Пользователю не нужно предоставлять его вручную, и Агент сделает это автоматически.
    • Агент — это фоновый процесс на определенном узле и независимый индивидуум. Сам агент не может реализовать общее эластичное обучение, поэтому ему нужен механизм для завершения взаимного обнаружения между рабочими, синхронизации изменений и т. д. (информация WORLD_SIZE и RANK фактически должна быть синхронизирована несколькими узлами для определения), это следующая концепция Rendezvous.
  • Рандеву несет ответственность

    кластерная логика

    , чтобы убедиться, что узлы достигли строгого консенсуса в отношении того, «какие узлы участвуют в обучении».

    • Каждый Агент включает в себя обработчик Rendezvous, и эти обработчики составляют кластер Rendezvous в целом, формируя, таким образом, кластер Агента.
    • После завершения Rendezvous создается общее хранилище ключей и значений, которое реализуетtorch.distributed.StoreAPI. Это хранилище используется совместно только участниками, завершившими Rendezvous, и оно предназначено для того, чтобы позволить Torch Distributed Elastic обмениваться управляющей информацией и данными во время заданий инициализации.
    • Rendezvous отвечает за сохранение всей необходимой информации о текущей группе поверх каждого агента. Поверх каждого агента происходит рандеву, они будут общаться друг с другом и поддерживать набор информации в целом, которая хранится в упомянутом выше Магазине.
    • Rendezvous отвечает за логику кластера, такую ​​как добавление новых узлов, удаление узлов, присвоение рангов и т. д.

1.2 Rendezvous

Пока что информация Rendezvous выглядит следующим образом: DynamicRendezvousHandler относится к динамической логике, среди которых_RendezvousStateHolderЭто состояние и другое хранилище метаинформации (статическая структура). Вы обнаружите, что на рисунке также есть _RendezvousOpExecutor, который не представлен. Это механизм выполнения, поэтому в этой статье мы увидим, как _RendezvousOpExecutor обрабатывает его.

+-----------------------------+      +------------------------------------------------+
| LocalElasticAgent           |      | WorkerSpec                                     |
|                             |      |                                                |
| +------------------------+  |      |   rdzv_handler = {DynamicRendezvousHandler} -------+
| |WorkerGroup             |  |      |                                                |   |
| |            spec +--------------> |   entry = worker_fn                            |   |
| |            workers     |  |      |                                                |   |
| |            store       |  |      |   role = {str} 'trainer'                       |   |
| |            group_rank  |  |      |                                                |   |
| |       group_world_size |  |      +------------------------------------------------+   |
| |                        |  |                                                           |
| +------------------------+  |                                                           |
|                             |                                                           |
| rdzv_run_id                 |                                                           |
| store                       |            +-----------------------------------------+    |
|                             |            |DynamicRendezvousHandler                 |    |
+-----------------------------+            |                                         |    |
                                           |                                         |    |
                                           |   _settings: RendezvousSettings         | <--+
                                           |                                         |
                                           |   _store: Store                         |
                                           |                                         |
                                           |   _state_holder: _RendezvousStateHolder |
                                           |                                         |
                                           |   _op_executor: _RendezvousOpExecutor   |
                                           |                                         |
                                           +-----------------------------------------+

1.3 Развязка

_RendezvousOpExecutor разделяет сегментацию функций:

  • Бизнес-логика абстрагируется в ряд операторов, таких как_RendevzousJoinOp.
  • Rendezvous внутренне поддерживает набор конечных автоматов, состоящих из бизнес-функций, таких как функция _add_to_participants для добавления участников.
  • _RendezvousOpExecutorМеханизм выполняет различные операторы, получает действие на основе результата оператора, а затем использует действие для вызова бизнес-функций для работы.

В этой статье в основном представлен движок Rendezvous, соответствующий серверной части C10d.

Реализация движка 0x02

2.1 Базовый класс

_RendezvousOpExecutor — это базовый класс движка, он просто определяет виртуальную функцию запуска.

class _RendezvousOpExecutor(ABC):
    """Executes rendezvous operations."""

    @abstractmethod
    def run(
        self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
    ) -> None:
        """Executes a rendezvous operation.

        An operation is run inside a state machine and is expected to transition
        the rendezvous from one state to another.

        Args:
            state_handler:
                A callable that is expected to return the next state transition
                action based on the current state of the rendezvous.
            deadline:
                The time, in seconds, at which the operation will be considered
                timed-out.
        """

Здесь используется _RendezvousContext, и его функция состоит в том, чтобы инкапсулировать различную информацию Rendezvous и предоставить ее операционной системе. Вот вам и использование _RendezvousState и RendezvousSettings.

class _RendezvousContext:
    """Holds the context of the rendezvous.

    Attributes:
        node:
            The node descriptor associated with the current rendezvous handler
            instance.
        state:
            The current state of the rendezvous.
        settings:
            The rendezvous settings.
    """

    node: _NodeDesc
    state: _RendezvousState
    settings: RendezvousSettings

    def __init__(
        self, node: _NodeDesc, state: _RendezvousState, settings: RendezvousSettings
    ) -> None:
        self.node = node
        self.state = state
        self.settings = settings

2.2 Механизм распределенной работы

_DistributedRendezvousOpExecutor расширяет _RendezvousOpExecutor и является фактическим исполнителем ElasticTorch. Подобно Looper, он отвечает за распространение сообщений, звонки и поддержание состояния.

2.2.1 Определения

По сравнению со своим базовым классом _DistributedRendezvousOpExecutor добавляет переменные-члены, такие как информация об узле, состояние и конфигурация.

class _DistributedRendezvousOpExecutor(_RendezvousOpExecutor):
    """Executes rendezvous operations using a shared state.

    Args:
        node:
            The node descriptor associated with the current rendezvous handler
            instance.
        state_holder:
            The ``RendezvousStateHolder`` to use to sync the rendezvous state
            with other nodes.
        settings:
            The rendezvous settings.
    """

    _node: _NodeDesc
    _state: _RendezvousState
    _state_holder: _RendezvousStateHolder
    _settings: RendezvousSettings

    def __init__(
        self,
        node: _NodeDesc,
        state_holder: _RendezvousStateHolder,
        settings: RendezvousSettings,
    ) -> None:
        self._node = node
        self._state_holder = state_holder
        self._settings = settings

Логика следующая:

+---------------------------------------------------------------+
| _DistributedRendezvousOpExecutor                              |
|                                                               |
|                     +------------------------+                |
|        _state +---> | _RendezvousState       |                |
|                     |                        |                |
|                     |       participants     |                |
|                     |       wait_list        |                |
|                     |       last_heartbeats  |                |
|                     |       deadline         |                |
|                     +------------------------+                |
|                                                               |
|                     +-------------------------+               |
|      _settings +--> | RendezvousSettings      |               |
|                     |                         |               |
|                     +-------------------------+               |
|                                                               |
|                     +--------------------------------------+  |
| _state_holder +---> | _BackendRendezvousStateHolder        |  |
|                     |                                      |  |
|                     |        _backend: RendezvousBackend   |  |
|                     |        _state: _RendezvousState      |  |
|                     |        _settings: RendezvousSettings |  |
|                     |                                      |  |
|                     +--------------------------------------+  |
|                     +--------------------------------------+  |
|                     | _NodeDesc                            |  |
|     _node +-------> |              fqdn: str               |  |
|                     |              pid: int                |  |
|                     |              local_id: int           |  |
|                     |                                      |  |
|                     +--------------------------------------+  |
+---------------------------------------------------------------+

2.2.2 Вызов

Давайте возьмем несколько примеров, чтобы увидеть, как вызвать движок.Вы можете видеть, что сначала устанавливается оператор, а затем вызывается функция запуска движка.

2.2.2.1 _RendezvousKeepAliveOp
def _keep_alive(self) -> None:
    self._heartbeat_lock.acquire()
    op = _RendezvousKeepAliveOp() # 设置算子
    deadline = self._get_deadline(self._settings.timeout.heartbeat)
    self._op_executor.run(op, deadline) # 调用
2.2.2.2 _RendezvousCloseOp
def _close(self) -> None:
    op = _RendezvousCloseOp() # 设置算子
    deadline = self._get_deadline(self._settings.timeout.close)
    self._op_executor.run(op, deadline) # 调用
2.2.2.3 _RendezvousJoinOp
def next_rendezvous(self) -> Tuple[Store, int, int]:
    """See base class."""

    self._stop_heartbeats()

    # Delay the execution for a small random amount of time if this is our
    # first run. This will slightly skew the rendezvous attempts across the
    # nodes and reduce the load on the backend.
    if self._state_holder.state.round == 0:
        _delay(seconds=(0, 0.3))

    exit_op = _RendezvousExitOp() # 设置算子
    join_op = _RendezvousJoinOp() # 设置算子

    deadline = self._get_deadline(self._settings.timeout.join)

    self._op_executor.run(exit_op, deadline) # 这里会进行调用
    self._op_executor.run(join_op, deadline) # 调用

    self._start_heartbeats()

    rank, world_size = self._get_world()
    store = self._get_store()

    return store, rank, world_size

2.2.3 Функция

В _DistributedRendezvousOpExecutor функция запуска реализует базовую логику, заключающуюся в выполнении различных операций в зависимости от типа действия.

2.2.3.1 Основной цикл

Конкретный код запуска выглядит следующим образом:

    def run(
        self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
    ) -> None:
        """See base class."""
        action = None

        while action != _Action.FINISH: # 循环,一直到获得一个FINISH action 为止
            # Reads or writes the latest rendezvous state shared by all nodes in
            # the rendezvous. Note that our local changes might get overridden
            # by another node if that node synced its changes before us.
            
            # 这里很重要,在所有node之间做信息同步
            has_set = self._state_holder.sync() # 因为最新状态在 rendezvous。

            self._state = self._state_holder.state

            ctx = _RendezvousContext(self._node, self._state, self._settings)

            # Determine the next action to take based on the current state of
            # the rendezvous.
            action = state_handler(ctx, deadline) # 决定下一个操作,state_handler 就是算子

            if action == _Action.FINISH:
                continue

            if action == _Action.ERROR_CLOSED:
                raise RendezvousClosedError()

            if action == _Action.ERROR_TIMEOUT:
                raise RendezvousTimeoutError()

            if action == _Action.SYNC:
                # Delay the execution by one second to avoid overloading the
                # backend if we are asked to poll for state changes.
                _delay(seconds=1)
            else:
                if action == _Action.KEEP_ALIVE:
                    self._keep_alive()
                elif action == _Action.ADD_TO_PARTICIPANTS:
                    self._add_to_participants()
                elif action == _Action.ADD_TO_WAIT_LIST:
                    self._add_to_wait_list()
                elif action == _Action.REMOVE_FROM_PARTICIPANTS:
                    self._remove_from_participants()
                elif action == _Action.REMOVE_FROM_WAIT_LIST:
                    self._remove_from_wait_list()
                elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
                    self._mark_rendezvous_complete()
                elif action == _Action.MARK_RENDEZVOUS_CLOSED:
                    self._mark_rendezvous_closed()

                # Attempt to sync our changes back to other nodes.
                self._state_holder.mark_dirty()

Детали показаны ниже.

+-----------------------------------------+                          +---------------------------------------------------------------+
|DynamicRendezvousHandler                 |                          | _DistributedRendezvousOpExecutor                              |
|                                         |                          |                                                               |
|                                         |                          |                     +------------------------+                |
|   _settings: RendezvousSettings         |                          |        _state +---> | _RendezvousState       |                |
|                                         |                          |                     |                        |                |
|                                         |                          |                     |       participants     |                |
|   _store: Store                         |                          |                     |       wait_list        |                |
|                                         |                          |                     |       last_heartbeats  |                |
|                                         |                          |                     |       deadline         |                |
|   _state_holder: _RendezvousStateHolder |                          |                     +------------------------+                |
|                                         | run(_RendezvousJoinOp()) |                     +-------------------------+               |
|                                         |                          |      _settings +--> | RendezvousSettings      |               |
|   _op_executor  +------------------------------------------------> |                     |                         |               |
|                                         |                          |                     +-------------------------+               |
|                                         |                          |                     +--------------------------------------+  |
+-----------------------------------------+                          | _state_holder +---> | _BackendRendezvousStateHolder        |  |
                                                                     |                     |                                      |  |
                                                                     |                     |        _backend: RendezvousBackend   |  |
                                                                     |                     |        _state: _RendezvousState      |  |
                                                                     |                     |        _settings: RendezvousSettings |  |
                                                                     |                     |                                      |  |
                                                                     |                     +--------------------------------------+  |
                                                                     |                     +--------------------------------------+  |
                                                                     |                     | _NodeDesc                            |  |
                                                                     |     _node +-------> |              fqdn: str               |  |
                                                                     |                     |              pid: int                |  |
                                                                     |                     |              local_id: int           |  |
                                                                     |                     |                                      |  |
                                                                     |                     +--------------------------------------+  |
                                                                     +---------------------------------------------------------------+

Телефон такой:

2.2.3.2 Синхронизация

В функции запуска следует отметить, что:Перед выполнением различных операций оператора, вызовет self._state_holder.sync() для выполнения синхронизации состояния между каждым рабочим потоком для достижения консенсуса.

def sync(self) -> Optional[bool]:
    """See base class."""
    state_bits: Optional[bytes] = None
    token = None
    has_set: Optional[bool]

    if self._dirty: # 如果本node状态变化了
        has_set = False
        state_bits = pickle.dumps(self._state)
        # 把自己的状态设置到backend之中
        set_response = self._backend.set_state(state_bits, self._token)
        if set_response is not None:
            state_bits, token, has_set = set_response
    else: # 自己没变化,只能从后端获取
        has_set = None
        if self._cache_duration > 0:
            # Avoid overloading the backend if we are asked to retrieve the
            # state repeatedly. Try to serve the cached state.
            if self._last_sync_time >= max(time.monotonic() - self._cache_duration, 0):
                return None
        get_response = self._backend.get_state() # 从backend获取其他节点最新状态
        if get_response is not None:
            state_bits, token = get_response

    if state_bits is not None:
        try:
            self._state = pickle.loads(state_bits) # 用后端状态更新本身的状态
        except pickle.PickleError as exc:
            raise RendezvousStateError(
                "The rendezvous state is corrupt. See inner exception for details."
            ) from exc
    else:
        self._state = _RendezvousState()

    if has_set and self._dead_nodes and log.isEnabledFor(logging.DEBUG):
        node_list = ", ".join(f"'{dead_node}'" for dead_node in self._dead_nodes)
        msg = (
            f"As part of the sync operation the node(s) {node_list} have been removed from the "
            f"rendezvous '{self._settings.run_id}' since they had no heartbeat."
        )
        self._record(message=msg)

    self._token = token
    self._dirty = False
    self._last_sync_time = time.monotonic()
    self._sanitize()

    return has_set
задняя часть

Torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py — это соответствующий серверный код.

Бэкенд здесь использует хранилище как централизованное хранилище, которое является мастером. Каждый узел является клиентом и обращается к мастеру для обновления своего состояния и получения состояния других узлов. Таким образом, все узлы будут общаться друг с другом и достигать консенсуса. Сюда же периодически удаляются клиенты, которые не обновляют свои метаданные.

get_state просто извлекается из хранилища.

def get_state(self) -> Optional[Tuple[bytes, Token]]:
    """See base class."""
    base64_state: bytes = self._call_store("get", self._key)

    return self._decode_state(base64_state)

set_state выполнит набор сравнения, который вернет новое состояние и было ли оно обновлено.

def set_state(
    self, state: bytes, token: Optional[Token] = None
) -> Optional[Tuple[bytes, Token, bool]]:
    """See base class."""
    base64_state_str: str = b64encode(state).decode()

    if token:
        # Shortcut if we know for sure that the token is not valid.
        if not isinstance(token, bytes):
            result = self.get_state()
            if result is not None:
                tmp = *result, False
                # Python 3.6 does not support tuple unpacking in return
                # statements.
                return tmp
            return None

        token = token.decode()
    else:
        token = self._NULL_SENTINEL

    base64_state: bytes = self._call_store("compare_set", self._key, token, base64_state_str)

    state_token_pair = self._decode_state(base64_state)
    if state_token_pair is None:
        return None

    new_state, new_token = state_token_pair

    # C10d Store's compare_set method does not offer an easy way to find out
    # whether our write attempt was successful. As a brute-force solution we
    # perform a bitwise comparison of our local state and the remote state.
    return new_state, new_token, new_state == state
_sanitize

Метод _sanitize используется для обработки сообщений от других узлов, например для очистки отказавших узлов. То есть, если время последнего пульса превышает определенный пороговый диапазон, эти узлы будут помечены как dead_node, и эти узлы будут удалены из списка участников или списка ожидания.

def _sanitize(self) -> None:
    state = self._state

    expire_time = datetime.utcnow() - (
        self._settings.keep_alive_interval * self._settings.keep_alive_max_attempt
    )

    # Filter out the dead nodes.
    self._dead_nodes = [
        node
        for node, last_heartbeat in state.last_heartbeats.items()
        if last_heartbeat < expire_time
    ]

    participant_removed = False

    for dead_node in self._dead_nodes:
        del state.last_heartbeats[dead_node] # 移除故障节点

        try:
            del state.participants[dead_node] # 移除故障节点

            participant_removed = True
        except KeyError:
            pass

        try:
            state.wait_list.remove(dead_node) # 移除故障节点
        except KeyError:
            pass

    if participant_removed:
        # Common epilogue shared with the _remove_from_participants()
        # function of _DistributedRendezvousOpExecutor.
        _remove_participant_epilogue(state, self._settings)

После знакомства с тем, как запустить двигатель, давайте взглянем на конкретных операторов.

0x03 оператор

_RendezvousOpExecutorБизнес-логика движка разделена на два уровня: пользовательские операции и внутренняя бизнес-логика. Разделение между пользовательскими операциями и внутренними бизнес-механизмами.

  • Пользовательские операции делятся на различные операторы, в том числе: Heartbeat, Join, Close, End. Например, оператор соединения_RendevzousJoinOp.

  • Внутренняя бизнес-логика разделена на различные бизнес-функции, такие как метод _add_to_participants для удаления узла из списка ожидания и добавления этого узла к участникам.

  • Операторы и внутренняя бизнес-логика не соответствуют один в один, и для управления ими нужен механизм, похожий на конечный автомат.

    • Например, результатом оператора операции пульса может быть: тайм-аут/поддержание активности/нормальное завершение, поэтому в соответствии с этим результатом должны вызываться различные внутренние бизнес-функции. Эта логика соответствия осуществляется через Действие.
    • Различные операторы объединяются в конечный автомат.
    • Внутри оператора находится генерация различных действий, которые определяют следующий шаг конечного автомата.
  • Внутри движка конкретная бизнес-логика выполняется в соответствии с Action, или можно сказать, что она отделена через Action.

В частности, движок можно логически разделить на три уровня: верхний — это уровень оператора, средний — уровень действий, а нижний — уровень бизнес-функций.

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| _RendezvousKeepAliveOp    _RendezvousCloseOp    _RendezvousExitOp    _RendezvousJoinOp  |
|                                                                                         |
+-------------+---------------------+--------------------+------------------+-------------+
              |                     |                    |                  |
              |                     |                    |                  |
              |                     |                    |                  |
              |                     |                    |                  |
              v                     v                    v                  v

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| KEEP_ALIVE   ADD_TO_PARTICIPANTS   ADD_TO_WAIT_LIST   REMOVE_FROM_WAIT_LIST   ......    |
|                                                                                         |
+-------------+----------+----------+----------+---------+---------+---------+------------+
              |          |          |          |         |         |         |
              |          |          |          |         |         |         |
              |          |          |          |         |         |         |
              |          |          |          |         |         |         |
              v          v          v          v         v         v         v

+-----------------------------------------------------------------------------------------+
|                                                                                         |
| _add_to_participants    _remove_from_participants     _add_to_wait_list        ......   |
|                                                                                         |
|                                                                                         |
+-----------------------------------------------------------------------------------------+

Мы анализируем их один за другим.

3.1 Эксплуатация

Давайте сначала проанализируемПромежуточное действиечтобы увидеть, сколько действий есть. В зависимости от состояния рандеву действия двигателя следующие. Код находится в torch/distributed/elastic/rendezvous/dynamic_rendezvous.py.

class _Action(Enum):
    """Specifies the possible actions based on the state of the rendezvous."""

    KEEP_ALIVE = 1
    ADD_TO_PARTICIPANTS = 2
    ADD_TO_WAIT_LIST = 3
    REMOVE_FROM_PARTICIPANTS = 4
    REMOVE_FROM_WAIT_LIST = 5
    MARK_RENDEZVOUS_COMPLETE = 6
    MARK_RENDEZVOUS_CLOSED = 7
    SYNC = 8
    ERROR_CLOSED = 9
    ERROR_TIMEOUT = 10
    FINISH = 11

3.2 Операторы

Некоторые операторы реализованы в движке.В основном операция соответствует оператору.Приведем несколько примеров операторов операции.Оператор должен установить тип операции в соответствии с состоянием рандеву.

3.2.1 Сердцебиение

3.2.1.1 Проверка сердцебиения

Роль _RendezvousKeepAliveOp заключается в определении следующего действия на основе текущего состояния и времени. Основная цель — регулярно проверять, неисправен ли узел.

class _RendezvousKeepAliveOp:
    """Represents a rendezvous keep-alive update operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        if _should_keep_alive(ctx):
            if time.monotonic() > deadline:
                return _Action.ERROR_TIMEOUT
            return _Action.KEEP_ALIVE
        return _Action.FINISH

Метод _should_keep_alive:

def _should_keep_alive(ctx: _RendezvousContext) -> bool:
    """Determines whether a keep-alive heartbeat should be sent."""
    try:
        last_heartbeat = ctx.state.last_heartbeats[ctx.node]
    except KeyError:
        return False

    return last_heartbeat <= datetime.utcnow() - ctx.settings.keep_alive_interval
3.2.1.2 Периодический вызов

Здесь следует отметить, что операция синхронизации должна быть вызвана до выполнения любого оператора, и синхронизация будет синхронизировать состояние между узлами.Поскольку сердцебиение является регулярным, состояние синхронизации также является регулярным.

DynamicRendezvousHandlerзапустит таймер,звонил регулярно_keep_alive_weak метод.

def _start_heartbeats(self) -> None:
    self._keep_alive_timer = _PeriodicTimer(
        self._settings.keep_alive_interval, self._keep_alive_weak, weakref.ref(self)
    )

    self._keep_alive_timer.set_name(f"RendezvousKeepAliveTimer_{self._this_node.local_id}")
    self._keep_alive_timer.start()

Второй,_keep_alive_weakпозвонюself._keep_alive().

@staticmethod
def _keep_alive_weak(weak_self) -> None:
    self = weak_self()
    if self is not None:
        self._keep_alive()

_keep_alive вызывает _RendezvousKeepAliveOp.

def _keep_alive(self) -> None:
    self._heartbeat_lock.acquire()
    op = _RendezvousKeepAliveOp()
    deadline = self._get_deadline(self._settings.timeout.heartbeat)

    try:
        self._op_executor.run(op, deadline)
        msg = (
            f"The node '{self._this_node}' has sent a keep-alive heartbeat to the rendezvous "
            f"'{self._settings.run_id}'."
        )
        self._record(message=msg)
        log.debug(msg)
    except RendezvousError as ex:
        msg = (
            f"The node '{self._this_node}' has failed to send a keep-alive heartbeat to the "
            f"rendezvous '{self._settings.run_id}' due to an error of type {type(ex).__name__}."
        )
        self._record(message=msg, node_state=NodeState.FAILED)
    finally:
        self._heartbeat_lock.release()
3.2.1.2 Установка пульса

Кроме того, в _DistributedRendezvousOpExecutor есть одноименная функция _keep_alive, которая используется для реализации внутренней логики, о которой мы поговорим позже.

3.2.2 Закрыть

_RendezvousCloseOp определит следующее действие на основе текущего состояния и времени.

class _RendezvousCloseOp:
    """Represents a rendezvous close operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        if ctx.state.closed:
            return _Action.FINISH
        if time.monotonic() > deadline:
            return _Action.ERROR_TIMEOUT
        return _Action.MARK_RENDEZVOUS_CLOSED

3.2.3 Конец

_RendezvousExitOp определяет следующее действие в соответствии с текущим состоянием и временем. Если этого узла нет в участниках, он не будет обработан. В противном случае вернуть следующее действие, удаленное из списка участников. Если время истекло, будет возвращено соответствующее действие.

class _RendezvousExitOp:
    """Represents a rendezvous exit operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        if ctx.node in ctx.state.participants:
            if time.monotonic() > deadline:
                return _Action.ERROR_TIMEOUT
            return _Action.REMOVE_FROM_PARTICIPANTS
        return _Action.FINISH

3.2.4 Join

_RendezvousJoinOp Здесь, в зависимости от состояния системы, выполняются разные обработки, такие как попытка добавить этот узел в список участников, или список ожидания, или продолжение ожидания.Подробности см. в комментариях к коду.

  • Извлеките состояние _RendezvousState из контекста и сохраните результат в состоянии.
  • Если статус закрыт, значит рандеву на данный момент закончился, и возвращается _Action.ERROR_CLOSED.
  • Чтобы узнать, является ли он участником, сохраните результат в is_participant.
  • Если состояние завершилось, а узел уже является участником, значит, рандеву может заканчиваться, и возвращается _Action.FINISH.
  • Получить текущее время сейчас.
  • Если сейчас > крайний срок, это означает, что время истекло.
    • Если еще есть время на откат, значит, этот узел должен вернуться в предыдущее состояние.
      • Если этот узел уже является участником, это означает, что общее количество узлов не достигло минимума.Хотя он уже является участником, его нужно удалить из списка участников, поэтому возвращается _Action.REMOVE_FROM_PARTICIPANTS.
      • Если этот узел находится в списке ожидания, это означает, что общее количество узлов на данный момент не достигло максимального значения.Хотя он находится в списке ожидания, его необходимо удалить из списка ожидания, поэтому возвращается _Action.REMOVE_FROM_WAIT_LIST.
    • В противном случае вернуть _Action.ERROR_TIMEOUT.
  • В противном случае тайм-аута нет, и обработка продолжается.
    • Если state.complete и этот узел не является участником (если узел является участником, он уже был обработан ранее), то рандеву завершилось, если не достигнуто максимальное количество узлов, а текущий узел не в списке ожидания его необходимо добавить в Ожидание списка узлов, дождаться наступления следующего периода мониторинга и снова выполнить рандеву, и вы можете добавить узлы в списке ожидания в список участия. Поэтому верните _Action.ADD_TO_WAIT_LIST.
    • Если этот узел является участником и состояние не завершено (если оно завершено, то оно было обработано ранее), если достигнуто минимальное количество узлов и истекло время ожидания, это означает, что рандеву закончилось, и _Action. MARK_RENDEZVOUS_COMPLETE возвращается.
    • В противном случае описание не заканчивается, а узел не является участником, поэтому он сразу добавляется в список участников и возвращает _Action.ADD_TO_PARTICIPANTS.
  • Возвращает _Action.KEEP_ALIVE, если необходимо сохранить пульс.
  • В противном случае верните _Action.SYNC.
class _RendezvousJoinOp:
    """Represents a rendezvous join operation."""

    def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
        state = ctx.state # 从上下文之中提取 _RendezvousState 状态

        # A closed rendezvous means that it no longer accepts new nodes.
        if state.closed:
            return _Action.ERROR_CLOSED # 如果已经结束,就返回 _Action.ERROR_CLOSED

        is_participant = ctx.node in state.participants # 看看是不是参与者

        # If we are part of the rendezvous and it is already complete there is
        # no further action to take.
        if state.complete and is_participant: # 如果是参与者且状态是结束,就返回 _Action.FINISH
            return _Action.FINISH

        now = time.monotonic()
        if now > deadline: # 如果已经超时
            rollback_period = 5  # 5 seconds

            # If we still have time to rollback (a short period on top of the
            # operation deadline), try to remove ourself from the rendezvous.
            # It is okay if we can't though as our keep-alive will eventually
            # expire.
            if now <= deadline + rollback_period: # 如果还有时间来 rollback
                # If we are part of the rendezvous, it means we couldn't find
                # enough participants to complete it on time.
                if is_participant: # 此时尚未达到min,虽然已经是参与者,但是需要移除
                    return _Action.REMOVE_FROM_PARTICIPANTS # 需要从参与者列表移除
                # If we are in the wait list, it means we couldn't wait till the
                # next round of the rendezvous.
                if ctx.node in state.wait_list: # 此时已经达到 max,虽然已经在等待列表之中,需要移除
                    return _Action.REMOVE_FROM_WAIT_LIST # 需要从等待列表移除
            return _Action.ERROR_TIMEOUT # 返回超时

        if state.complete: # 如果 rendezvous 已经结束
            # If we are here, it means we are not part of the rendezvous. In
            # case the rendezvous has capacity for additional participants add
            # ourself to the wait list for the next round.
            if len(state.participants) < ctx.settings.max_nodes: # 如果还没有达到最大节点数
                if ctx.node not in state.wait_list: # 如果当前node不在等待列表之中
                    return _Action.ADD_TO_WAIT_LIST # 就加入到等待列表,发送一个等待action
        elif is_participant: # 如果已经在参与者列表
            # If the rendezvous has enough number of participants including us,
            # check whether we have passed the rendezvous deadline. If yes,
            # complete it.
            if len(state.participants) >= ctx.settings.min_nodes: # 如果达到了最小节点数
                if cast(datetime, state.deadline) < datetime.utcnow(): # 如果达到了超时
                    return _Action.MARK_RENDEZVOUS_COMPLETE # 标示 rendezvous 已经结束
        else: # 否则就直接加入到参与者
            # The rendezvous is not complete yet and we are not part of it. Try
            # to join.
            return _Action.ADD_TO_PARTICIPANTS

        if _should_keep_alive(ctx): # 如果需要保持心跳,就返回 _Action.KEEP_ALIVE
            return _Action.KEEP_ALIVE

        # At this point either the rendezvous is not complete, but we are part
        # of it, which means we have to wait for other participants to join; or
        # the rendezvous is complete, but we are not part of it, which means we
        # have to wait for the next round.
        return _Action.SYNC # 否则返回同步状态 _Action.SYNC

Конкретная логика выглядит следующим образом:

                           state.closed
                        +-------------------------->   _Action.ERROR_CLOSED
                        |
                        |
                        |  complete & participant
                        +-------------------------->   _Action.FINISH
                        |
                        |
                        |  timeout & participant
                        +-------------------------->   _Action.REMOVE_FROM_PARTICIPANTS
                        |
                        |
                        |  timeout & wait
                        +-------------------------->   _Action.REMOVE_FROM_WAIT_LIST
                        |
+-------------------+   |
|                   |   |  timeout
| _RendezvousJoinOp +------------------------------>   _Action.ERROR_TIMEOUT
|                   |   |
+-------------------+   |  complete & < max & not wait
                        |
                        +-------------------------->   _Action.ADD_TO_WAIT_LIST
                        |
                        |  complete & participant & > min & deadline
                        |
                        +-------------------------->   _Action.MARK_RENDEZVOUS_COMPLETE
                        |
                        |  not complete & not participant
                        |
                        +-------------------------->   _Action.ADD_TO_PARTICIPANTS
                        |
                        |  _should_keep_alive
                        |
                        +-------------------------->   _Action.KEEP_ALIVE
                        |
                        |  else
                        |
                        +-------------------------->   _Action.SYNC

Ниже приводится описание состояния Rendezvous бэкэнда ETCD в исходном коде, мы можем грубо сослаться на состояние c10d.

Видно, что джойн бэкэнда etcd можно разделить на 4 этапа:

  • На этапе настройки значение будет записано в фиксированный каталог, который является эксклюзивной блокировкой.Если запись не удалась, это означает, что в данный момент существуетrendezvousИдет процесс.
  • присоединяемая (присоединяемая) фаза. Если значение записи успешно, перейдите к фазе соединения. Если время ожидания истечет или узлы, участвующие в обучении, достигнут максимального значения, он перейдет в замороженную стадию.
  • замороженная (подтверждающая) фаза. Все узлы должны подтвердить и войти в финальный финальный этап.
  • Последняя стадия. присвоить звание,RANK 0Экземпляр становится ведущим.

Следуя рисунку выше, мы расширяем c10d следующим образом.

      +
      |
      |
      v
+-----+------+
|            |
|   closed   +---------------> ERROR_CLOSED
|            |
+-----+------+
      |
      |
      v
+-----+------+  is_participant
|            |
|  complete  +---------------> FINISH
|            |
+-----+------+
      |                                                                                 is_participant
      |
      v                                                                                +----> REMOVE_FROM_PARTICIPANTS
+-----+-------+  now > deadline  +-----------+    now < rollback     +-----------+     |
|             |                  |           |                       |           |     |
|    join     +----------------> |  timeout  +---------------------->+ rollback  +-----+
|             |                  |           |                       |           |     |
+-----+-------+                  +----+------+                       +-----------+     |
      |                               |                                                | in state.wait_list
      |                               |    now > rollback                              |
      |  now < deadline               |                                                +----> REMOVE_FROM_WAIT_LIST
      |                               +---------->  ERROR_TIMEOUT
      |
      |   complete && not is_participant && < max && not in state.wait_list
      |
      +------------------------------------------------------------------>  ADD_TO_WAIT_LIST
      |
      |   not complete && is_participant && > min && > deadline
      |
      +------------------------------------------------------------------>  MARK_RENDEZVOUS_COMPLETE
      |
      |   not complete && not is_participant
      |
      +----------------------------------------->  ADD_TO_PARTICIPANTS
      |
      |   _should_keep_alive
      |
      +--------------------------->  KEEP_ALIVE
      |
      |
      v
     SYNC

Телефон такой:

0x04 Деловая операция

Внутри _DistributedRendezvousOpExecutor.run можно выбрать различные бизнес-функции для выполнения в соответствии с действием.

            if action == _Action.KEEP_ALIVE:
                self._keep_alive()
            elif action == _Action.ADD_TO_PARTICIPANTS:
                self._add_to_participants()
            elif action == _Action.ADD_TO_WAIT_LIST:
                self._add_to_wait_list()
            elif action == _Action.REMOVE_FROM_PARTICIPANTS:
                self._remove_from_participants()
            elif action == _Action.REMOVE_FROM_WAIT_LIST:
                self._remove_from_wait_list()
            elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
                self._mark_rendezvous_complete()
            elif action == _Action.MARK_RENDEZVOUS_CLOSED:
                self._mark_rendezvous_closed()

Давайте взглянем на конкретную внутреннюю логику функции.

4.1 Присоединяйтесь к участникам

После получения ADD_TO_PARTICIPANTS вызовите _add_to_participants, чтобы удалить узел из списка ожидания и добавить этот узел к участникам.

    def _add_to_participants(self) -> None:

        state = self._state

        try:
            state.wait_list.remove(self._node)
        except KeyError:
            pass

        # The ranks of the participants will be set once the rendezvous is
        # complete.
        state.participants[self._node] = 0

        self._keep_alive()

        if len(state.participants) == self._settings.min_nodes:
            state.deadline = datetime.utcnow() + self._settings.timeout.last_call

        if len(state.participants) == self._settings.max_nodes:
            self._mark_rendezvous_complete()

4.2 Удаление участников

После получения REMOVE_FROM_PARTICIPANTS вызовите _remove_from_participants, чтобы удалить участников из участников и last_heartbeats.

    def _remove_from_participants(self) -> None:

        state = self._state
        del state.participants[self._node]
        del state.last_heartbeats[self._node]

        if state.complete:
            # If we do not have any participants left, move to the next round.
            if not state.participants:
                state.complete = False
                state.round += 1
        else:
            if len(state.participants) < self._settings.min_nodes:
                state.deadline = None

4.3 Присоединяйтесь к последовательности ожидания

После получения ADD_TO_WAIT_LIST вызовите _add_to_wait_list, чтобы добавить узлы в список ожидания.

    def _add_to_wait_list(self) -> None:
        self._state.wait_list.add(self._node)
        self._keep_alive()

4.4 Удаление последовательности ожидания

После получения REMOVE_FROM_WAIT_LIST вызовите _remove_from_wait_list, чтобы удалить узлы из списка ожидания.

    def _remove_from_wait_list(self) -> None:
        self._state.wait_list.remove(self._node)
        del self._state.last_heartbeats[self._node]

4.5 Конец настройки

После получения MARK_RENDEZVOUS_COMPLETE, когда операция агрегации рандеву завершится, установите ранг для каждого участника.

Каждый узел сортируется по одному и тому же алгоритму, поэтому ранг на каждом узле одинаков.

    def _mark_rendezvous_complete(self) -> None:
        state = self._state

        state.complete = True
        state.deadline = None

        # Assign the ranks.
        for rank, node in enumerate(sorted(state.participants)):
            state.participants[node] = rank

    def _mark_rendezvous_closed(self) -> None:
        self._state.closed = True

4.6 Сердцебиение

После получения действия KEEP_ALIVE будет вызван _keep_alive для поддержания пульса. Кроме того, keep_alive также будет вызываться в таких методах, как _add_to_participants, которые будут обновлять последние пульсации в локальном состоянии.При следующей синхронизации last_heartbeats будут записаны в хранилище ключ-значение, чтобы другие узлы могли знать статус этого узла. Локальный файл будет обработан в соответствии с last_heartbeats в _sanitize, о котором мы упоминали ранее.

def _keep_alive(self) -> None:
    msg = (
        f"The node '{self._node}' updated its keep-alive heartbeat time for the rendezvous "
        f"'{self._settings.run_id}'. Pending sync."
    )
    self._record(message=msg)
    self._state.last_heartbeats[self._node] = datetime.utcnow()

Метод _record выглядит следующим образом:

def _record(self, message: str, node_state: NodeState = NodeState.RUNNING) -> None:
    construct_and_record_rdzv_event(
        name=f"{self.__class__.__name__}.{get_method_name()}",
        run_id=self._settings.run_id,
        message=message,
        node_state=node_state,
        hostname=self._node.fqdn,
        pid=self._node.pid,
        local_id=self._node.local_id,
    )

Это вызов следующего кода для записи журнала.

def record_rdzv_event(event: RdzvEvent) -> None:
    _get_or_create_logger("dynamic_rendezvous").info(event.serialize())

def construct_and_record_rdzv_event(
    run_id: str,
    message: str,
    node_state: NodeState,
    name: str = "",
    hostname: str = "",
    pid: Optional[int] = None,
    master_endpoint: str = "",
    local_id: Optional[int] = None,
    rank: Optional[int] = None,
) -> None:
    # We don't want to perform an extra computation if not needed.
    if isinstance(get_logging_handler("dynamic_rendezvous"), logging.NullHandler):
        return

    # Set up parameters.
    if not hostname:
        hostname = socket.getfqdn()
    if not pid:
        pid = os.getpid()

    # Determines which file called this function.
    callstack = inspect.stack()
    filename = "no_file"
    if len(callstack) > 1:
        stack_depth_1 = callstack[1]
        filename = os.path.basename(stack_depth_1.filename)
        if not name:
            name = stack_depth_1.function

    # Delete the callstack variable. If kept, this can mess with python's
    # garbage collector as we are holding on to stack frame information in
    # the inspect module.
    del callstack

    # Set up error trace if this is an exception
    if node_state == NodeState.FAILED:
        error_trace = traceback.format_exc()
    else:
        error_trace = ""

    # Initialize event object
    event = RdzvEvent(
        name=f"{filename}:{name}",
        run_id=run_id,
        message=message,
        hostname=hostname,
        pid=pid,
        node_state=node_state,
        master_endpoint=master_endpoint,
        rank=rank,
        local_id=local_id,
        error_trace=error_trace,
    )

    # Finally, record the event.
    record_rdzv_event(event)

До сих пор была проанализирована и часть двигателя.В следующей статье мы посмотрим, сможем ли мы сделать всесторонний обзор с общей точки зрения.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

ссылка 0xFF

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (1) --- общая идея

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (2) --- запуск и процесс с одним узлом

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (3) --- агент

[Анализ исходного кода] Распределенное эластичное обучение PyTorch (4) --- Архитектура и логика рандеву