0x00 сводка
В предыдущих статьях мы изучили основные распространяемые модули PyTorch и представили несколько официальных примеров. Далее мы представим эластичное обучение PyTorch. Эта статья пятая. Давайте посмотрим на внутренний движок Rendezvous. , например, как обрабатывать присоединение к узлу, выход из узла, ожидание, пульс и т. д.
Статьи о тренировках на эластичность следующие:
[Анализ исходного кода] Распределенное эластичное обучение PyTorch (1) --- общая идея
[Анализ исходного кода] Распределенное эластичное обучение PyTorch (3) --- агент
0x01 Предисловие
1.1 Общая система
Эластичный тренинг можно понимать как систему бега на основе Rendezvous.
-
Агент предпочитает логику на определенных узлах
- Агент отвечает за определенные операции, связанные с бизнес-логикой, такие как запуск процесса для выполнения пользовательской программы, мониторинг состояния выполнения пользовательской программы и уведомление Rendezvous в случае возникновения исключения.
- Агент — это менеджер рабочих процессов, отвечающий за запуск и управление рабочими процессами, формирование рабочей группы, отслеживание рабочего состояния рабочих процессов, сбор данных об отказавших рабочих процессах и перезапуск группы рабочих процессов в случае сбоя или нового добавленного рабочего процесса.
- Агент отвечает за сохранение информации WORLD_SIZE и RANK. Пользователю не нужно предоставлять его вручную, и Агент сделает это автоматически.
- Агент — это фоновый процесс на определенном узле и независимый индивидуум. Сам агент не может реализовать общее эластичное обучение, поэтому ему нужен механизм для завершения взаимного обнаружения между рабочими, синхронизации изменений и т. д. (информация WORLD_SIZE и RANK фактически должна быть синхронизирована несколькими узлами для определения), это следующая концепция Rendezvous.
-
Рандеву несет ответственность
кластерная логика
, чтобы убедиться, что узлы достигли строгого консенсуса в отношении того, «какие узлы участвуют в обучении».
- Каждый Агент включает в себя обработчик Rendezvous, и эти обработчики составляют кластер Rendezvous в целом, формируя, таким образом, кластер Агента.
- После завершения Rendezvous создается общее хранилище ключей и значений, которое реализует
torch.distributed.Store
API. Это хранилище используется совместно только участниками, завершившими Rendezvous, и оно предназначено для того, чтобы позволить Torch Distributed Elastic обмениваться управляющей информацией и данными во время заданий инициализации. - Rendezvous отвечает за сохранение всей необходимой информации о текущей группе поверх каждого агента. Поверх каждого агента происходит рандеву, они будут общаться друг с другом и поддерживать набор информации в целом, которая хранится в упомянутом выше Магазине.
- Rendezvous отвечает за логику кластера, такую как добавление новых узлов, удаление узлов, присвоение рангов и т. д.
1.2 Rendezvous
Пока что информация Rendezvous выглядит следующим образом: DynamicRendezvousHandler относится к динамической логике, среди которых_RendezvousStateHolder
Это состояние и другое хранилище метаинформации (статическая структура). Вы обнаружите, что на рисунке также есть _RendezvousOpExecutor, который не представлен. Это механизм выполнения, поэтому в этой статье мы увидим, как _RendezvousOpExecutor обрабатывает его.
+-----------------------------+ +------------------------------------------------+
| LocalElasticAgent | | WorkerSpec |
| | | |
| +------------------------+ | | rdzv_handler = {DynamicRendezvousHandler} -------+
| |WorkerGroup | | | | |
| | spec +--------------> | entry = worker_fn | |
| | workers | | | | |
| | store | | | role = {str} 'trainer' | |
| | group_rank | | | | |
| | group_world_size | | +------------------------------------------------+ |
| | | | |
| +------------------------+ | |
| | |
| rdzv_run_id | |
| store | +-----------------------------------------+ |
| | |DynamicRendezvousHandler | |
+-----------------------------+ | | |
| | |
| _settings: RendezvousSettings | <--+
| |
| _store: Store |
| |
| _state_holder: _RendezvousStateHolder |
| |
| _op_executor: _RendezvousOpExecutor |
| |
+-----------------------------------------+
1.3 Развязка
_RendezvousOpExecutor разделяет сегментацию функций:
- Бизнес-логика абстрагируется в ряд операторов, таких как
_RendevzousJoinOp
. - Rendezvous внутренне поддерживает набор конечных автоматов, состоящих из бизнес-функций, таких как функция _add_to_participants для добавления участников.
-
_RendezvousOpExecutor
Механизм выполняет различные операторы, получает действие на основе результата оператора, а затем использует действие для вызова бизнес-функций для работы.
В этой статье в основном представлен движок Rendezvous, соответствующий серверной части C10d.
Реализация движка 0x02
2.1 Базовый класс
_RendezvousOpExecutor — это базовый класс движка, он просто определяет виртуальную функцию запуска.
class _RendezvousOpExecutor(ABC):
"""Executes rendezvous operations."""
@abstractmethod
def run(
self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
) -> None:
"""Executes a rendezvous operation.
An operation is run inside a state machine and is expected to transition
the rendezvous from one state to another.
Args:
state_handler:
A callable that is expected to return the next state transition
action based on the current state of the rendezvous.
deadline:
The time, in seconds, at which the operation will be considered
timed-out.
"""
Здесь используется _RendezvousContext, и его функция состоит в том, чтобы инкапсулировать различную информацию Rendezvous и предоставить ее операционной системе. Вот вам и использование _RendezvousState и RendezvousSettings.
class _RendezvousContext:
"""Holds the context of the rendezvous.
Attributes:
node:
The node descriptor associated with the current rendezvous handler
instance.
state:
The current state of the rendezvous.
settings:
The rendezvous settings.
"""
node: _NodeDesc
state: _RendezvousState
settings: RendezvousSettings
def __init__(
self, node: _NodeDesc, state: _RendezvousState, settings: RendezvousSettings
) -> None:
self.node = node
self.state = state
self.settings = settings
2.2 Механизм распределенной работы
_DistributedRendezvousOpExecutor расширяет _RendezvousOpExecutor и является фактическим исполнителем ElasticTorch. Подобно Looper, он отвечает за распространение сообщений, звонки и поддержание состояния.
2.2.1 Определения
По сравнению со своим базовым классом _DistributedRendezvousOpExecutor добавляет переменные-члены, такие как информация об узле, состояние и конфигурация.
class _DistributedRendezvousOpExecutor(_RendezvousOpExecutor):
"""Executes rendezvous operations using a shared state.
Args:
node:
The node descriptor associated with the current rendezvous handler
instance.
state_holder:
The ``RendezvousStateHolder`` to use to sync the rendezvous state
with other nodes.
settings:
The rendezvous settings.
"""
_node: _NodeDesc
_state: _RendezvousState
_state_holder: _RendezvousStateHolder
_settings: RendezvousSettings
def __init__(
self,
node: _NodeDesc,
state_holder: _RendezvousStateHolder,
settings: RendezvousSettings,
) -> None:
self._node = node
self._state_holder = state_holder
self._settings = settings
Логика следующая:
+---------------------------------------------------------------+
| _DistributedRendezvousOpExecutor |
| |
| +------------------------+ |
| _state +---> | _RendezvousState | |
| | | |
| | participants | |
| | wait_list | |
| | last_heartbeats | |
| | deadline | |
| +------------------------+ |
| |
| +-------------------------+ |
| _settings +--> | RendezvousSettings | |
| | | |
| +-------------------------+ |
| |
| +--------------------------------------+ |
| _state_holder +---> | _BackendRendezvousStateHolder | |
| | | |
| | _backend: RendezvousBackend | |
| | _state: _RendezvousState | |
| | _settings: RendezvousSettings | |
| | | |
| +--------------------------------------+ |
| +--------------------------------------+ |
| | _NodeDesc | |
| _node +-------> | fqdn: str | |
| | pid: int | |
| | local_id: int | |
| | | |
| +--------------------------------------+ |
+---------------------------------------------------------------+
2.2.2 Вызов
Давайте возьмем несколько примеров, чтобы увидеть, как вызвать движок.Вы можете видеть, что сначала устанавливается оператор, а затем вызывается функция запуска движка.
2.2.2.1 _RendezvousKeepAliveOp
def _keep_alive(self) -> None:
self._heartbeat_lock.acquire()
op = _RendezvousKeepAliveOp() # 设置算子
deadline = self._get_deadline(self._settings.timeout.heartbeat)
self._op_executor.run(op, deadline) # 调用
2.2.2.2 _RendezvousCloseOp
def _close(self) -> None:
op = _RendezvousCloseOp() # 设置算子
deadline = self._get_deadline(self._settings.timeout.close)
self._op_executor.run(op, deadline) # 调用
2.2.2.3 _RendezvousJoinOp
def next_rendezvous(self) -> Tuple[Store, int, int]:
"""See base class."""
self._stop_heartbeats()
# Delay the execution for a small random amount of time if this is our
# first run. This will slightly skew the rendezvous attempts across the
# nodes and reduce the load on the backend.
if self._state_holder.state.round == 0:
_delay(seconds=(0, 0.3))
exit_op = _RendezvousExitOp() # 设置算子
join_op = _RendezvousJoinOp() # 设置算子
deadline = self._get_deadline(self._settings.timeout.join)
self._op_executor.run(exit_op, deadline) # 这里会进行调用
self._op_executor.run(join_op, deadline) # 调用
self._start_heartbeats()
rank, world_size = self._get_world()
store = self._get_store()
return store, rank, world_size
2.2.3 Функция
В _DistributedRendezvousOpExecutor функция запуска реализует базовую логику, заключающуюся в выполнении различных операций в зависимости от типа действия.
2.2.3.1 Основной цикл
Конкретный код запуска выглядит следующим образом:
def run(
self, state_handler: Callable[[_RendezvousContext, float], _Action], deadline: float
) -> None:
"""See base class."""
action = None
while action != _Action.FINISH: # 循环,一直到获得一个FINISH action 为止
# Reads or writes the latest rendezvous state shared by all nodes in
# the rendezvous. Note that our local changes might get overridden
# by another node if that node synced its changes before us.
# 这里很重要,在所有node之间做信息同步
has_set = self._state_holder.sync() # 因为最新状态在 rendezvous。
self._state = self._state_holder.state
ctx = _RendezvousContext(self._node, self._state, self._settings)
# Determine the next action to take based on the current state of
# the rendezvous.
action = state_handler(ctx, deadline) # 决定下一个操作,state_handler 就是算子
if action == _Action.FINISH:
continue
if action == _Action.ERROR_CLOSED:
raise RendezvousClosedError()
if action == _Action.ERROR_TIMEOUT:
raise RendezvousTimeoutError()
if action == _Action.SYNC:
# Delay the execution by one second to avoid overloading the
# backend if we are asked to poll for state changes.
_delay(seconds=1)
else:
if action == _Action.KEEP_ALIVE:
self._keep_alive()
elif action == _Action.ADD_TO_PARTICIPANTS:
self._add_to_participants()
elif action == _Action.ADD_TO_WAIT_LIST:
self._add_to_wait_list()
elif action == _Action.REMOVE_FROM_PARTICIPANTS:
self._remove_from_participants()
elif action == _Action.REMOVE_FROM_WAIT_LIST:
self._remove_from_wait_list()
elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
self._mark_rendezvous_complete()
elif action == _Action.MARK_RENDEZVOUS_CLOSED:
self._mark_rendezvous_closed()
# Attempt to sync our changes back to other nodes.
self._state_holder.mark_dirty()
Детали показаны ниже.
+-----------------------------------------+ +---------------------------------------------------------------+
|DynamicRendezvousHandler | | _DistributedRendezvousOpExecutor |
| | | |
| | | +------------------------+ |
| _settings: RendezvousSettings | | _state +---> | _RendezvousState | |
| | | | | |
| | | | participants | |
| _store: Store | | | wait_list | |
| | | | last_heartbeats | |
| | | | deadline | |
| _state_holder: _RendezvousStateHolder | | +------------------------+ |
| | run(_RendezvousJoinOp()) | +-------------------------+ |
| | | _settings +--> | RendezvousSettings | |
| _op_executor +------------------------------------------------> | | | |
| | | +-------------------------+ |
| | | +--------------------------------------+ |
+-----------------------------------------+ | _state_holder +---> | _BackendRendezvousStateHolder | |
| | | |
| | _backend: RendezvousBackend | |
| | _state: _RendezvousState | |
| | _settings: RendezvousSettings | |
| | | |
| +--------------------------------------+ |
| +--------------------------------------+ |
| | _NodeDesc | |
| _node +-------> | fqdn: str | |
| | pid: int | |
| | local_id: int | |
| | | |
| +--------------------------------------+ |
+---------------------------------------------------------------+
Телефон такой:
2.2.3.2 Синхронизация
В функции запуска следует отметить, что:Перед выполнением различных операций оператора, вызовет self._state_holder.sync() для выполнения синхронизации состояния между каждым рабочим потоком для достижения консенсуса.
def sync(self) -> Optional[bool]:
"""See base class."""
state_bits: Optional[bytes] = None
token = None
has_set: Optional[bool]
if self._dirty: # 如果本node状态变化了
has_set = False
state_bits = pickle.dumps(self._state)
# 把自己的状态设置到backend之中
set_response = self._backend.set_state(state_bits, self._token)
if set_response is not None:
state_bits, token, has_set = set_response
else: # 自己没变化,只能从后端获取
has_set = None
if self._cache_duration > 0:
# Avoid overloading the backend if we are asked to retrieve the
# state repeatedly. Try to serve the cached state.
if self._last_sync_time >= max(time.monotonic() - self._cache_duration, 0):
return None
get_response = self._backend.get_state() # 从backend获取其他节点最新状态
if get_response is not None:
state_bits, token = get_response
if state_bits is not None:
try:
self._state = pickle.loads(state_bits) # 用后端状态更新本身的状态
except pickle.PickleError as exc:
raise RendezvousStateError(
"The rendezvous state is corrupt. See inner exception for details."
) from exc
else:
self._state = _RendezvousState()
if has_set and self._dead_nodes and log.isEnabledFor(logging.DEBUG):
node_list = ", ".join(f"'{dead_node}'" for dead_node in self._dead_nodes)
msg = (
f"As part of the sync operation the node(s) {node_list} have been removed from the "
f"rendezvous '{self._settings.run_id}' since they had no heartbeat."
)
self._record(message=msg)
self._token = token
self._dirty = False
self._last_sync_time = time.monotonic()
self._sanitize()
return has_set
задняя часть
Torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py — это соответствующий серверный код.
Бэкенд здесь использует хранилище как централизованное хранилище, которое является мастером. Каждый узел является клиентом и обращается к мастеру для обновления своего состояния и получения состояния других узлов. Таким образом, все узлы будут общаться друг с другом и достигать консенсуса. Сюда же периодически удаляются клиенты, которые не обновляют свои метаданные.
get_state просто извлекается из хранилища.
def get_state(self) -> Optional[Tuple[bytes, Token]]:
"""See base class."""
base64_state: bytes = self._call_store("get", self._key)
return self._decode_state(base64_state)
set_state выполнит набор сравнения, который вернет новое состояние и было ли оно обновлено.
def set_state(
self, state: bytes, token: Optional[Token] = None
) -> Optional[Tuple[bytes, Token, bool]]:
"""See base class."""
base64_state_str: str = b64encode(state).decode()
if token:
# Shortcut if we know for sure that the token is not valid.
if not isinstance(token, bytes):
result = self.get_state()
if result is not None:
tmp = *result, False
# Python 3.6 does not support tuple unpacking in return
# statements.
return tmp
return None
token = token.decode()
else:
token = self._NULL_SENTINEL
base64_state: bytes = self._call_store("compare_set", self._key, token, base64_state_str)
state_token_pair = self._decode_state(base64_state)
if state_token_pair is None:
return None
new_state, new_token = state_token_pair
# C10d Store's compare_set method does not offer an easy way to find out
# whether our write attempt was successful. As a brute-force solution we
# perform a bitwise comparison of our local state and the remote state.
return new_state, new_token, new_state == state
_sanitize
Метод _sanitize используется для обработки сообщений от других узлов, например для очистки отказавших узлов. То есть, если время последнего пульса превышает определенный пороговый диапазон, эти узлы будут помечены как dead_node, и эти узлы будут удалены из списка участников или списка ожидания.
def _sanitize(self) -> None:
state = self._state
expire_time = datetime.utcnow() - (
self._settings.keep_alive_interval * self._settings.keep_alive_max_attempt
)
# Filter out the dead nodes.
self._dead_nodes = [
node
for node, last_heartbeat in state.last_heartbeats.items()
if last_heartbeat < expire_time
]
participant_removed = False
for dead_node in self._dead_nodes:
del state.last_heartbeats[dead_node] # 移除故障节点
try:
del state.participants[dead_node] # 移除故障节点
participant_removed = True
except KeyError:
pass
try:
state.wait_list.remove(dead_node) # 移除故障节点
except KeyError:
pass
if participant_removed:
# Common epilogue shared with the _remove_from_participants()
# function of _DistributedRendezvousOpExecutor.
_remove_participant_epilogue(state, self._settings)
После знакомства с тем, как запустить двигатель, давайте взглянем на конкретных операторов.
0x03 оператор
_RendezvousOpExecutor
Бизнес-логика движка разделена на два уровня: пользовательские операции и внутренняя бизнес-логика. Разделение между пользовательскими операциями и внутренними бизнес-механизмами.
-
Пользовательские операции делятся на различные операторы, в том числе: Heartbeat, Join, Close, End. Например, оператор соединения
_RendevzousJoinOp
. -
Внутренняя бизнес-логика разделена на различные бизнес-функции, такие как метод _add_to_participants для удаления узла из списка ожидания и добавления этого узла к участникам.
-
Операторы и внутренняя бизнес-логика не соответствуют один в один, и для управления ими нужен механизм, похожий на конечный автомат.
- Например, результатом оператора операции пульса может быть: тайм-аут/поддержание активности/нормальное завершение, поэтому в соответствии с этим результатом должны вызываться различные внутренние бизнес-функции. Эта логика соответствия осуществляется через Действие.
- Различные операторы объединяются в конечный автомат.
- Внутри оператора находится генерация различных действий, которые определяют следующий шаг конечного автомата.
-
Внутри движка конкретная бизнес-логика выполняется в соответствии с Action, или можно сказать, что она отделена через Action.
В частности, движок можно логически разделить на три уровня: верхний — это уровень оператора, средний — уровень действий, а нижний — уровень бизнес-функций.
+-----------------------------------------------------------------------------------------+
| |
| _RendezvousKeepAliveOp _RendezvousCloseOp _RendezvousExitOp _RendezvousJoinOp |
| |
+-------------+---------------------+--------------------+------------------+-------------+
| | | |
| | | |
| | | |
| | | |
v v v v
+-----------------------------------------------------------------------------------------+
| |
| KEEP_ALIVE ADD_TO_PARTICIPANTS ADD_TO_WAIT_LIST REMOVE_FROM_WAIT_LIST ...... |
| |
+-------------+----------+----------+----------+---------+---------+---------+------------+
| | | | | | |
| | | | | | |
| | | | | | |
| | | | | | |
v v v v v v v
+-----------------------------------------------------------------------------------------+
| |
| _add_to_participants _remove_from_participants _add_to_wait_list ...... |
| |
| |
+-----------------------------------------------------------------------------------------+
Мы анализируем их один за другим.
3.1 Эксплуатация
Давайте сначала проанализируемПромежуточное действиечтобы увидеть, сколько действий есть. В зависимости от состояния рандеву действия двигателя следующие. Код находится в torch/distributed/elastic/rendezvous/dynamic_rendezvous.py.
class _Action(Enum):
"""Specifies the possible actions based on the state of the rendezvous."""
KEEP_ALIVE = 1
ADD_TO_PARTICIPANTS = 2
ADD_TO_WAIT_LIST = 3
REMOVE_FROM_PARTICIPANTS = 4
REMOVE_FROM_WAIT_LIST = 5
MARK_RENDEZVOUS_COMPLETE = 6
MARK_RENDEZVOUS_CLOSED = 7
SYNC = 8
ERROR_CLOSED = 9
ERROR_TIMEOUT = 10
FINISH = 11
3.2 Операторы
Некоторые операторы реализованы в движке.В основном операция соответствует оператору.Приведем несколько примеров операторов операции.Оператор должен установить тип операции в соответствии с состоянием рандеву.
3.2.1 Сердцебиение
3.2.1.1 Проверка сердцебиения
Роль _RendezvousKeepAliveOp заключается в определении следующего действия на основе текущего состояния и времени. Основная цель — регулярно проверять, неисправен ли узел.
class _RendezvousKeepAliveOp:
"""Represents a rendezvous keep-alive update operation."""
def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
if _should_keep_alive(ctx):
if time.monotonic() > deadline:
return _Action.ERROR_TIMEOUT
return _Action.KEEP_ALIVE
return _Action.FINISH
Метод _should_keep_alive:
def _should_keep_alive(ctx: _RendezvousContext) -> bool:
"""Determines whether a keep-alive heartbeat should be sent."""
try:
last_heartbeat = ctx.state.last_heartbeats[ctx.node]
except KeyError:
return False
return last_heartbeat <= datetime.utcnow() - ctx.settings.keep_alive_interval
3.2.1.2 Периодический вызов
Здесь следует отметить, что операция синхронизации должна быть вызвана до выполнения любого оператора, и синхронизация будет синхронизировать состояние между узлами.Поскольку сердцебиение является регулярным, состояние синхронизации также является регулярным.
DynamicRendezvousHandlerзапустит таймер,звонил регулярно_keep_alive_weak метод.
def _start_heartbeats(self) -> None:
self._keep_alive_timer = _PeriodicTimer(
self._settings.keep_alive_interval, self._keep_alive_weak, weakref.ref(self)
)
self._keep_alive_timer.set_name(f"RendezvousKeepAliveTimer_{self._this_node.local_id}")
self._keep_alive_timer.start()
Второй,_keep_alive_weak
позвонюself._keep_alive()
.
@staticmethod
def _keep_alive_weak(weak_self) -> None:
self = weak_self()
if self is not None:
self._keep_alive()
_keep_alive вызывает _RendezvousKeepAliveOp.
def _keep_alive(self) -> None:
self._heartbeat_lock.acquire()
op = _RendezvousKeepAliveOp()
deadline = self._get_deadline(self._settings.timeout.heartbeat)
try:
self._op_executor.run(op, deadline)
msg = (
f"The node '{self._this_node}' has sent a keep-alive heartbeat to the rendezvous "
f"'{self._settings.run_id}'."
)
self._record(message=msg)
log.debug(msg)
except RendezvousError as ex:
msg = (
f"The node '{self._this_node}' has failed to send a keep-alive heartbeat to the "
f"rendezvous '{self._settings.run_id}' due to an error of type {type(ex).__name__}."
)
self._record(message=msg, node_state=NodeState.FAILED)
finally:
self._heartbeat_lock.release()
3.2.1.2 Установка пульса
Кроме того, в _DistributedRendezvousOpExecutor есть одноименная функция _keep_alive, которая используется для реализации внутренней логики, о которой мы поговорим позже.
3.2.2 Закрыть
_RendezvousCloseOp определит следующее действие на основе текущего состояния и времени.
class _RendezvousCloseOp:
"""Represents a rendezvous close operation."""
def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
if ctx.state.closed:
return _Action.FINISH
if time.monotonic() > deadline:
return _Action.ERROR_TIMEOUT
return _Action.MARK_RENDEZVOUS_CLOSED
3.2.3 Конец
_RendezvousExitOp определяет следующее действие в соответствии с текущим состоянием и временем. Если этого узла нет в участниках, он не будет обработан. В противном случае вернуть следующее действие, удаленное из списка участников. Если время истекло, будет возвращено соответствующее действие.
class _RendezvousExitOp:
"""Represents a rendezvous exit operation."""
def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
if ctx.node in ctx.state.participants:
if time.monotonic() > deadline:
return _Action.ERROR_TIMEOUT
return _Action.REMOVE_FROM_PARTICIPANTS
return _Action.FINISH
3.2.4 Join
_RendezvousJoinOp Здесь, в зависимости от состояния системы, выполняются разные обработки, такие как попытка добавить этот узел в список участников, или список ожидания, или продолжение ожидания.Подробности см. в комментариях к коду.
- Извлеките состояние _RendezvousState из контекста и сохраните результат в состоянии.
- Если статус закрыт, значит рандеву на данный момент закончился, и возвращается _Action.ERROR_CLOSED.
- Чтобы узнать, является ли он участником, сохраните результат в is_participant.
- Если состояние завершилось, а узел уже является участником, значит, рандеву может заканчиваться, и возвращается _Action.FINISH.
- Получить текущее время сейчас.
- Если сейчас > крайний срок, это означает, что время истекло.
- Если еще есть время на откат, значит, этот узел должен вернуться в предыдущее состояние.
- Если этот узел уже является участником, это означает, что общее количество узлов не достигло минимума.Хотя он уже является участником, его нужно удалить из списка участников, поэтому возвращается _Action.REMOVE_FROM_PARTICIPANTS.
- Если этот узел находится в списке ожидания, это означает, что общее количество узлов на данный момент не достигло максимального значения.Хотя он находится в списке ожидания, его необходимо удалить из списка ожидания, поэтому возвращается _Action.REMOVE_FROM_WAIT_LIST.
- В противном случае вернуть _Action.ERROR_TIMEOUT.
- Если еще есть время на откат, значит, этот узел должен вернуться в предыдущее состояние.
- В противном случае тайм-аута нет, и обработка продолжается.
- Если state.complete и этот узел не является участником (если узел является участником, он уже был обработан ранее), то рандеву завершилось, если не достигнуто максимальное количество узлов, а текущий узел не в списке ожидания его необходимо добавить в Ожидание списка узлов, дождаться наступления следующего периода мониторинга и снова выполнить рандеву, и вы можете добавить узлы в списке ожидания в список участия. Поэтому верните _Action.ADD_TO_WAIT_LIST.
- Если этот узел является участником и состояние не завершено (если оно завершено, то оно было обработано ранее), если достигнуто минимальное количество узлов и истекло время ожидания, это означает, что рандеву закончилось, и _Action. MARK_RENDEZVOUS_COMPLETE возвращается.
- В противном случае описание не заканчивается, а узел не является участником, поэтому он сразу добавляется в список участников и возвращает _Action.ADD_TO_PARTICIPANTS.
- Возвращает _Action.KEEP_ALIVE, если необходимо сохранить пульс.
- В противном случае верните _Action.SYNC.
class _RendezvousJoinOp:
"""Represents a rendezvous join operation."""
def __call__(self, ctx: _RendezvousContext, deadline: float) -> _Action:
state = ctx.state # 从上下文之中提取 _RendezvousState 状态
# A closed rendezvous means that it no longer accepts new nodes.
if state.closed:
return _Action.ERROR_CLOSED # 如果已经结束,就返回 _Action.ERROR_CLOSED
is_participant = ctx.node in state.participants # 看看是不是参与者
# If we are part of the rendezvous and it is already complete there is
# no further action to take.
if state.complete and is_participant: # 如果是参与者且状态是结束,就返回 _Action.FINISH
return _Action.FINISH
now = time.monotonic()
if now > deadline: # 如果已经超时
rollback_period = 5 # 5 seconds
# If we still have time to rollback (a short period on top of the
# operation deadline), try to remove ourself from the rendezvous.
# It is okay if we can't though as our keep-alive will eventually
# expire.
if now <= deadline + rollback_period: # 如果还有时间来 rollback
# If we are part of the rendezvous, it means we couldn't find
# enough participants to complete it on time.
if is_participant: # 此时尚未达到min,虽然已经是参与者,但是需要移除
return _Action.REMOVE_FROM_PARTICIPANTS # 需要从参与者列表移除
# If we are in the wait list, it means we couldn't wait till the
# next round of the rendezvous.
if ctx.node in state.wait_list: # 此时已经达到 max,虽然已经在等待列表之中,需要移除
return _Action.REMOVE_FROM_WAIT_LIST # 需要从等待列表移除
return _Action.ERROR_TIMEOUT # 返回超时
if state.complete: # 如果 rendezvous 已经结束
# If we are here, it means we are not part of the rendezvous. In
# case the rendezvous has capacity for additional participants add
# ourself to the wait list for the next round.
if len(state.participants) < ctx.settings.max_nodes: # 如果还没有达到最大节点数
if ctx.node not in state.wait_list: # 如果当前node不在等待列表之中
return _Action.ADD_TO_WAIT_LIST # 就加入到等待列表,发送一个等待action
elif is_participant: # 如果已经在参与者列表
# If the rendezvous has enough number of participants including us,
# check whether we have passed the rendezvous deadline. If yes,
# complete it.
if len(state.participants) >= ctx.settings.min_nodes: # 如果达到了最小节点数
if cast(datetime, state.deadline) < datetime.utcnow(): # 如果达到了超时
return _Action.MARK_RENDEZVOUS_COMPLETE # 标示 rendezvous 已经结束
else: # 否则就直接加入到参与者
# The rendezvous is not complete yet and we are not part of it. Try
# to join.
return _Action.ADD_TO_PARTICIPANTS
if _should_keep_alive(ctx): # 如果需要保持心跳,就返回 _Action.KEEP_ALIVE
return _Action.KEEP_ALIVE
# At this point either the rendezvous is not complete, but we are part
# of it, which means we have to wait for other participants to join; or
# the rendezvous is complete, but we are not part of it, which means we
# have to wait for the next round.
return _Action.SYNC # 否则返回同步状态 _Action.SYNC
Конкретная логика выглядит следующим образом:
state.closed
+--------------------------> _Action.ERROR_CLOSED
|
|
| complete & participant
+--------------------------> _Action.FINISH
|
|
| timeout & participant
+--------------------------> _Action.REMOVE_FROM_PARTICIPANTS
|
|
| timeout & wait
+--------------------------> _Action.REMOVE_FROM_WAIT_LIST
|
+-------------------+ |
| | | timeout
| _RendezvousJoinOp +------------------------------> _Action.ERROR_TIMEOUT
| | |
+-------------------+ | complete & < max & not wait
|
+--------------------------> _Action.ADD_TO_WAIT_LIST
|
| complete & participant & > min & deadline
|
+--------------------------> _Action.MARK_RENDEZVOUS_COMPLETE
|
| not complete & not participant
|
+--------------------------> _Action.ADD_TO_PARTICIPANTS
|
| _should_keep_alive
|
+--------------------------> _Action.KEEP_ALIVE
|
| else
|
+--------------------------> _Action.SYNC
Ниже приводится описание состояния Rendezvous бэкэнда ETCD в исходном коде, мы можем грубо сослаться на состояние c10d.
Видно, что джойн бэкэнда etcd можно разделить на 4 этапа:
- На этапе настройки значение будет записано в фиксированный каталог, который является эксклюзивной блокировкой.Если запись не удалась, это означает, что в данный момент существует
rendezvous
Идет процесс. - присоединяемая (присоединяемая) фаза. Если значение записи успешно, перейдите к фазе соединения. Если время ожидания истечет или узлы, участвующие в обучении, достигнут максимального значения, он перейдет в замороженную стадию.
- замороженная (подтверждающая) фаза. Все узлы должны подтвердить и войти в финальный финальный этап.
- Последняя стадия. присвоить звание,
RANK 0
Экземпляр становится ведущим.
Следуя рисунку выше, мы расширяем c10d следующим образом.
+
|
|
v
+-----+------+
| |
| closed +---------------> ERROR_CLOSED
| |
+-----+------+
|
|
v
+-----+------+ is_participant
| |
| complete +---------------> FINISH
| |
+-----+------+
| is_participant
|
v +----> REMOVE_FROM_PARTICIPANTS
+-----+-------+ now > deadline +-----------+ now < rollback +-----------+ |
| | | | | | |
| join +----------------> | timeout +---------------------->+ rollback +-----+
| | | | | | |
+-----+-------+ +----+------+ +-----------+ |
| | | in state.wait_list
| | now > rollback |
| now < deadline | +----> REMOVE_FROM_WAIT_LIST
| +----------> ERROR_TIMEOUT
|
| complete && not is_participant && < max && not in state.wait_list
|
+------------------------------------------------------------------> ADD_TO_WAIT_LIST
|
| not complete && is_participant && > min && > deadline
|
+------------------------------------------------------------------> MARK_RENDEZVOUS_COMPLETE
|
| not complete && not is_participant
|
+-----------------------------------------> ADD_TO_PARTICIPANTS
|
| _should_keep_alive
|
+---------------------------> KEEP_ALIVE
|
|
v
SYNC
Телефон такой:
0x04 Деловая операция
Внутри _DistributedRendezvousOpExecutor.run можно выбрать различные бизнес-функции для выполнения в соответствии с действием.
if action == _Action.KEEP_ALIVE:
self._keep_alive()
elif action == _Action.ADD_TO_PARTICIPANTS:
self._add_to_participants()
elif action == _Action.ADD_TO_WAIT_LIST:
self._add_to_wait_list()
elif action == _Action.REMOVE_FROM_PARTICIPANTS:
self._remove_from_participants()
elif action == _Action.REMOVE_FROM_WAIT_LIST:
self._remove_from_wait_list()
elif action == _Action.MARK_RENDEZVOUS_COMPLETE:
self._mark_rendezvous_complete()
elif action == _Action.MARK_RENDEZVOUS_CLOSED:
self._mark_rendezvous_closed()
Давайте взглянем на конкретную внутреннюю логику функции.
4.1 Присоединяйтесь к участникам
После получения ADD_TO_PARTICIPANTS вызовите _add_to_participants, чтобы удалить узел из списка ожидания и добавить этот узел к участникам.
def _add_to_participants(self) -> None:
state = self._state
try:
state.wait_list.remove(self._node)
except KeyError:
pass
# The ranks of the participants will be set once the rendezvous is
# complete.
state.participants[self._node] = 0
self._keep_alive()
if len(state.participants) == self._settings.min_nodes:
state.deadline = datetime.utcnow() + self._settings.timeout.last_call
if len(state.participants) == self._settings.max_nodes:
self._mark_rendezvous_complete()
4.2 Удаление участников
После получения REMOVE_FROM_PARTICIPANTS вызовите _remove_from_participants, чтобы удалить участников из участников и last_heartbeats.
def _remove_from_participants(self) -> None:
state = self._state
del state.participants[self._node]
del state.last_heartbeats[self._node]
if state.complete:
# If we do not have any participants left, move to the next round.
if not state.participants:
state.complete = False
state.round += 1
else:
if len(state.participants) < self._settings.min_nodes:
state.deadline = None
4.3 Присоединяйтесь к последовательности ожидания
После получения ADD_TO_WAIT_LIST вызовите _add_to_wait_list, чтобы добавить узлы в список ожидания.
def _add_to_wait_list(self) -> None:
self._state.wait_list.add(self._node)
self._keep_alive()
4.4 Удаление последовательности ожидания
После получения REMOVE_FROM_WAIT_LIST вызовите _remove_from_wait_list, чтобы удалить узлы из списка ожидания.
def _remove_from_wait_list(self) -> None:
self._state.wait_list.remove(self._node)
del self._state.last_heartbeats[self._node]
4.5 Конец настройки
После получения MARK_RENDEZVOUS_COMPLETE, когда операция агрегации рандеву завершится, установите ранг для каждого участника.
Каждый узел сортируется по одному и тому же алгоритму, поэтому ранг на каждом узле одинаков.
def _mark_rendezvous_complete(self) -> None:
state = self._state
state.complete = True
state.deadline = None
# Assign the ranks.
for rank, node in enumerate(sorted(state.participants)):
state.participants[node] = rank
def _mark_rendezvous_closed(self) -> None:
self._state.closed = True
4.6 Сердцебиение
После получения действия KEEP_ALIVE будет вызван _keep_alive для поддержания пульса. Кроме того, keep_alive также будет вызываться в таких методах, как _add_to_participants, которые будут обновлять последние пульсации в локальном состоянии.При следующей синхронизации last_heartbeats будут записаны в хранилище ключ-значение, чтобы другие узлы могли знать статус этого узла. Локальный файл будет обработан в соответствии с last_heartbeats в _sanitize, о котором мы упоминали ранее.
def _keep_alive(self) -> None:
msg = (
f"The node '{self._node}' updated its keep-alive heartbeat time for the rendezvous "
f"'{self._settings.run_id}'. Pending sync."
)
self._record(message=msg)
self._state.last_heartbeats[self._node] = datetime.utcnow()
Метод _record выглядит следующим образом:
def _record(self, message: str, node_state: NodeState = NodeState.RUNNING) -> None:
construct_and_record_rdzv_event(
name=f"{self.__class__.__name__}.{get_method_name()}",
run_id=self._settings.run_id,
message=message,
node_state=node_state,
hostname=self._node.fqdn,
pid=self._node.pid,
local_id=self._node.local_id,
)
Это вызов следующего кода для записи журнала.
def record_rdzv_event(event: RdzvEvent) -> None:
_get_or_create_logger("dynamic_rendezvous").info(event.serialize())
def construct_and_record_rdzv_event(
run_id: str,
message: str,
node_state: NodeState,
name: str = "",
hostname: str = "",
pid: Optional[int] = None,
master_endpoint: str = "",
local_id: Optional[int] = None,
rank: Optional[int] = None,
) -> None:
# We don't want to perform an extra computation if not needed.
if isinstance(get_logging_handler("dynamic_rendezvous"), logging.NullHandler):
return
# Set up parameters.
if not hostname:
hostname = socket.getfqdn()
if not pid:
pid = os.getpid()
# Determines which file called this function.
callstack = inspect.stack()
filename = "no_file"
if len(callstack) > 1:
stack_depth_1 = callstack[1]
filename = os.path.basename(stack_depth_1.filename)
if not name:
name = stack_depth_1.function
# Delete the callstack variable. If kept, this can mess with python's
# garbage collector as we are holding on to stack frame information in
# the inspect module.
del callstack
# Set up error trace if this is an exception
if node_state == NodeState.FAILED:
error_trace = traceback.format_exc()
else:
error_trace = ""
# Initialize event object
event = RdzvEvent(
name=f"{filename}:{name}",
run_id=run_id,
message=message,
hostname=hostname,
pid=pid,
node_state=node_state,
master_endpoint=master_endpoint,
rank=rank,
local_id=local_id,
error_trace=error_trace,
)
# Finally, record the event.
record_rdzv_event(event)
До сих пор была проанализирована и часть двигателя.В следующей статье мы посмотрим, сможем ли мы сделать всесторонний обзор с общей точки зрения.
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
[Анализ исходного кода] Распределенное эластичное обучение PyTorch (1) --- общая идея
[Анализ исходного кода] Распределенное эластичное обучение PyTorch (3) --- агент