[Анализ исходного кода] Распределенный PyTorch (6) ---DistributedDataParallel -- инициализация и хранение

машинное обучение PyTorch

0x00 сводка

Эта статья является шестой в серии распределенных PyTorch и знакомит с двумя концепциями метода инициализации и хранилища, на которые опирается DistributedDataParallel.

Другие статьи о распространении PyTorch:

Другие связанные статьи:

Автоматическая дифференциация инструментов глубокого обучения (1)

Автоматическая дифференциация инструментов глубокого обучения (2)

Автоматическая дифференциация оружия глубокого обучения (3) --- Пример интерпретации

[Анализ исходного кода] Как PyTorch реализует прямое распространение (1) --- Базовый класс (1)

[Анализ исходного кода] Как PyTorch реализует прямое распространение (2) --- Базовый класс (ниже)

[Анализ исходного кода] Как PyTorch реализует прямое распространение (3) --- конкретная реализация

[Анализ исходного кода] Как Pytorch реализует обратное распространение (1) ---- вызов движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (2) ---- Статическая структура движка

[Анализ исходного кода] Как Pytorch реализует обратное распространение (3) ---- Динамическая логика движка

[Анализ исходного кода] Как PyTorch реализует обратное распространение (4) ---- конкретный алгоритм

[Анализ исходного кода] Распределенный PyTorch (5) ------ Обзор DistributedDataParallel и способы его использования

0x01 Обзор

1.1 Основные понятия

Что касается распределенной связи, PyTorch предоставляет несколько концепций: группа процессов, серверная часть, инициализация, хранилище.

  • группа процессов: DDP — это настоящее распределенное обучение, которое может использовать несколько машин для формирования задачи параллельной работы. Чтобы обеспечить связь между различными рабочими DDP, PyTorch устанавливает концепцию групп процессов.
  • задняя часть: Концепция бэкенда — логичная концепция. По сути, серверная часть представляет собой механизм связи IPC.
  • инициализация: Хотя у нас есть концепция бэкенда и группы процессов, как сделать так, чтобы рабочие обнаруживали друг друга перед созданием группы процессов? Для этого требуется метод инициализации, сообщающий всем о передаче сообщения: как связаться с процессом на других машинах.
  • Store: можно рассматривать как распределенное хранилище «ключ-значение», которое позволяет обмениваться информацией между процессами в группе и инициализировать распределенные пакеты (путем явного создания хранилища какinit_methodзаменять).

1.2 Инициализировать группу процессов

Перед вызовом любого другого метода DDP вам необходимо использоватьtorch.distributed.init_process_group()для инициализации. Этот метод инициализирует распределенную группу процессов по умолчанию и распределенный пакет. Этот метод блокируется, пока все процессы не присоединятся.Функция определяется следующим образом:

init_process_group ( backend , 
                       init_method = None , 
                       timeout = default_pg_timeout , 
                       world_size =- 1 , 
                       rank =- 1 , 
                       store = None , 
                       group_name = '' , 
                       pg_options = None )

Существует два основных способа инициализации группы процессов:

  1. Явно укажите store, rank и world_size.
  2. Укажите init_method (строка URL), который указывает, где и как обнаруживать одноранговые узлы.

Если ни один из них не указан,init_methodпредполагается "env://". Итак, вы можете видеть, что store и init_method являются взаимоисключающими.

Параметры init_process_group следующие:

  • задняя часть- Бэкенд для использования. Допустимые значения включаютmpi,gloonccl. Поле должно быть представлено в виде строчной строки (например,"gloo") дано, а также может бытьBackendсвойства (напр.Backend.GLOO) для доступа . если вncclСерверная часть использует несколько процессов на машине, каждый процесс должен иметь эксклюзивный доступ к каждому используемому графическому процессору, поскольку совместное использование графических процессоров между процессами может привести к взаимоблокировкам.
  • init_method– URL-адрес, указывающий, как инициализировать группу процессов. если не указаноinit_methodилиstoreЕсли указано, по умолчанию используется "env://". иstoreвзаимоисключающий.
  • world_size– Количество процессов, участвующих в задании. еслиstoreуказан, требуется world_size.
  • rank– уровень текущего процесса (должно быть значение от 0 доworld_sizeчисло между -1). еслиstoreуказано, требуется ранг.
  • store– Хранилище ключей/значений, доступное для всех работников для обмена информацией о соединении/адресе. иinit_methodвзаимоисключающий.
  • timeout– Время ожидания операции, выполняемой над группой процессов, истекло. Значение по умолчанию равно 30 минутам. Это относится кglooзадняя часть. заnccl, который находится только в переменной окруженияNCCL_BLOCKING_WAITилиNCCL_ASYNC_ERROR_HANDLINGПрименяется при установке на 1.
  • group_name- Название группы.
  • pg_options ( Process Group Options , optional) — параметры группы процессов, указывающие, какие дополнительные параметры необходимо передать при построении конкретной группы процессов.

0x02 инициализация

2.1 Метод инициализации

В настоящее время модуль DDP поддерживает три метода инициализации:

  • Environment variable initialization
  • Инициализация общей файловой системы: init_method**=**'file:///mnt/nfs/sharedfile'
  • Инициализация TCP: init_method**=**'tcp://10.1.1.20:23456'

переменная среды

Этот метод считывает конфигурацию из переменных среды и позволяет полностью настроить способ получения информации. Установив следующие четыре переменные среды на всех машинах, все процессы могут нормально подключаться к главному (то есть процессу ранга 0) для получения информации о других процессах и, наконец, обмениваться с ними рукопожатиями.

  • MASTER_PORT: Порт на машине ранга 0 процесса.
  • MASTER_ADDR: IP-адрес на машине процесса ранга 0.
  • WORLD_SIZE: общее количество процессов, чтобы мастер знал, сколько рабочих ждать.
  • RANK: ранг каждого процесса, чтобы процесс знал, является ли он ведущим.

общая файловая система

Общая файловая система требует, чтобы все процессы имели доступ к общей файловой системе и координировали их через общие файлы. Это означает, что каждый процесс откроет файл, запишет в него информацию и будет ждать, пока каждый процесс сделает это. После этого вся необходимая информация будет доступна всем процессам. Чтобы избежать состояния гонки, файловая система должна пройтиfcntlПоддержка блокировки.

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP

Метод инициализации TCP реализуется путем предоставления IP-адреса и порта процесса ранга 0, где все рабочие процессы могут подключаться к процессу ранга 0 и обмениваться информацией о том, как связаться друг с другом.

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

2.2 init_method VS store

Нам интересно, а почему там два параметра init_method и store?

Глядя на код init_process_group, мы можем найти следующие правила.

  • При MPI метод init_method бесполезен.
  • На бэкэндах без MPI, если параметр хранилища отсутствует, используйте метод init_method для создания хранилища.

Так что в итоге все равно падает на магазин, магазин и есть сущность его роли.

        if store is None:
            rendezvous_iterator = rendezvous(
                init_method, rank, world_size, timeout=timeout
            )
            store, rank, world_size = next(rendezvous_iterator)
            store.set_timeout(timeout)

Код init_process_group выглядит следующим образом:

def init_process_group(backend,
                       init_method=None,
                       timeout=default_pg_timeout,
                       world_size=-1,
                       rank=-1,
                       store=None,
                       group_name='',
                       pg_options=None):

    global _pg_group_ranks
    global _backend
    global _default_pg_init_method

    if store is not None:
        assert world_size > 0, 'world_size must be positive if using store'
        assert rank >= 0, 'rank must be non-negative if using store'
    elif init_method is None:
        init_method = "env://"

    backend = Backend(backend)

    if backend == Backend.MPI:
          default_pg = _new_process_group_helper(
            -1,
            -1,
            [],
            Backend.MPI,
            None,
            group_name=group_name,
            timeout=timeout)
        _update_default_pg(default_pg)
    else:
        # backward compatible API
        if store is None:
            # 如果没有store,还是要用init_method构建一个store。
            rendezvous_iterator = rendezvous(
                init_method, rank, world_size, timeout=timeout
            )
            store, rank, world_size = next(rendezvous_iterator)
            store.set_timeout(timeout)

        default_pg = _new_process_group_helper(
            world_size,
            rank,
            [],
            backend,
            store,
            pg_options=pg_options,
            group_name=group_name,
            timeout=timeout)
        _update_default_pg(default_pg)

    _pg_group_ranks[GroupMember.WORLD] = {i: i for i in range(GroupMember.WORLD.size())}  # type: ignore[attr-defined, index]
    _backend = _pg_map[GroupMember.WORLD][0]  # type: ignore[index]
    _default_pg_init_method = init_method

    # 省略

2.3 rendezvous

Rendezvous упоминается в приведенном выше коде, давайте взглянем на эту концепцию.

Прежде чем мы сможем запустить ансамблевый алгоритм, участвующие процессы должны найти друг друга и обменяться информацией, чтобы иметь возможность общаться. Мы называем этот процесс рандеву. Результатом процесса рандеву является тройка, содержащая общее хранилище ключей/значений, ранг процесса и общее количество участвующих процессов. Если ни один из встроенных методов рандеву не доступен для вашей среды выполнения, вы можете зарегистрировать свой собственный обработчик рандеву. вызовrendezvousфункции, выберите уникальное имя и используйте схему URL для его идентификации.

Метод рандеву заключается в выборе различных обработчиков для обработки в соответствии с параметрами.

def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):

    # Append node-specific arguments.
    result = urlparse(url)
    if rank != -1 or world_size != -1:
        query_dict: Dict[str, Union[int, str]] = dict(
            # mypy doesn't allow dict() to accept List of values (#257)
            pair.split("=") for pair in filter(None, result.query.split("&"))  # type: ignore[arg-type, misc]
        )
        if rank != -1:
            query_dict["rank"] = rank
        if world_size != -1:
            query_dict["world_size"] = world_size

        result = result._replace(
            query="{}".format("&".join(["{}={}".format(k, v) for k, v in query_dict.items()]))
        )
        url = urlunparse(result)

    return _rendezvous_handlers[result.scheme](url, **kwargs)
    

Обработчик выглядит следующим образом, вы обнаружите, что фактически обработчик соответствует трем методам инициализации:

register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
register_rendezvous_handler("env", _env_rendezvous_handler)
register_rendezvous_handler("file", _file_rendezvous_handler)

2.4 Резюме

  • init_method наконец падает на хранилище, а хранилище — это сущность, которая работает.
  • Участвующие процессы должны находить друг друга и обмениваться информацией, чтобы иметь возможность общаться. Этот процесс называется рандеву.

0x03 Store

Даем формальное понятие. Store — это распределенное хранилище ключей и значений, предоставляемое распределенным пакетом, к которому обращаются все рабочие процессы для обмена информацией и инициализации распределенных пакетов. Пользователи могут явно создавать хранилище какinit_methodзаменять. В настоящее время существует 3 хранилища ключей-значений:TCPStore,FileStoreHashStore.

Мы продолжаем рассматривать концепцию обработчика из предыдущего раздела.

3.1 _rendezvous_handlers

В PyTorch определена глобальная переменная _rendezvous_handlers для сохранения метода возврата в хранилище, который можно рассматривать как фабричный метод.

_rendezvous_handlers = {}

Конкретный метод регистрации:

register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
register_rendezvous_handler("env", _env_rendezvous_handler)
register_rendezvous_handler("file", _file_rendezvous_handler)

Код регистрации выглядит следующим образом: он должен вставить обработчик в глобальную переменную.

def register_rendezvous_handler(scheme, handler):
    """Registers a new rendezvous handler.
    Args:
        scheme (str): URL scheme to identify your rendezvous handler.
        handler (function): Handler that is invoked when the
            `rendezvous()` function is called with a URL that uses
            the corresponding scheme. It must be a generator function
            that yields the triplet.
    """
    global _rendezvous_handlers
    if scheme in _rendezvous_handlers:
        raise RuntimeError(
            "Rendezvous handler for {}:// already registered".format(scheme)
        )
    _rendezvous_handlers[scheme] = handler

3.2 handlers

Если вы внимательно посмотрите на код обработчиков, то обнаружите, что он возвращает другое хранилище, такое как _tcp_rendezvous_handler, которое использует различную информацию для создания TCPStore и возврата.

Все следующие коды удаляют некритический код.

3.2.1 _file_rendezvous_handler

Здесь возвращается FileStore.

def _file_rendezvous_handler(url: str, **kwargs):

    result = urlparse(url)
    path = result.path
    query: Dict[str, str]
    # mypy doesn't allow dict() to accept List of values (#257)
    query = dict(pair.split("=") for pair in filter(None, result.query.split("&")))  # type: ignore[misc, arg-type]

    rank = int(query["rank"])
    world_size = int(query["world_size"])
    store = FileStore(path, world_size)
    yield (store, rank, world_size)

    # If this configuration is invalidated, there is nothing we can do about it
    raise RuntimeError("Unable to perform rerendezvous using file:// method")

3.2.2 _tcp_rendezvous_handler

Здесь возвращается TCPStore.

def _tcp_rendezvous_handler(url: str, timeout: timedelta = default_pg_timeout, **kwargs):
    result = urlparse(url)
    query: Dict[str, Union[int, str]]
    # mypy doesn't allow dict() to accept List of values (#257)
    query = dict(pair.split("=") for pair in filter(None, result.query.split("&")))  # type: ignore[misc, arg-type]
​
    rank = int(query["rank"])
    world_size = int(query["world_size"])
    start_daemon = rank == 0
    assert result.hostname is not None
    store = TCPStore(result.hostname, result.port, world_size, start_daemon, timeout)
    yield (store, rank, world_size)
​
    # If this configuration is invalidated, there is nothing we can do about it
    raise RuntimeError("Unable to perform rerendezvous using tcp:// method")

3.2.3 _env_rendezvous_handler

Он также возвращает TCPStore, но извлекает необходимую информацию из переменных среды.

def _env_rendezvous_handler(url: str, timeout: timedelta = default_pg_timeout, **kwargs):
​
    result = urlparse(url)
    query: Dict[str, Union[int, str]]
    query = dict(pair.split("=") for pair in filter(None, result.query.split("&"))) 
    rank: Optional[Union[str, int]]
    world_size: Optional[Union[str, int]]
    master_port: Optional[Union[str, int]]
​
    if "rank" in query:
        rank = int(query["rank"])
    else:
        rank = int(_get_env_or_raise("RANK"))
​
    if "world_size" in query:
        world_size = int(query["world_size"])
    else:
        world_size = int(_get_env_or_raise("WORLD_SIZE"))
​
    master_addr = _get_env_or_raise("MASTER_ADDR")
    master_port = int(_get_env_or_raise("MASTER_PORT"))
​
    use_torchelastic_store = os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None)
​
    if use_torchelastic_store == str(True):
        worker_process_prefix = "/worker"
        # When TORCHELASTIC_USE_AGENT_STORE is set up, the worker process is assumed
        # to be invoked by the torchelastic agent. Torchelastic agent creates a tcp daemon thread
        # on the GROUP_RANK=0, as a result all user worker processes should create store with: daemon=False
        tcp_store = TCPStore(master_addr, master_port, world_size, False, timeout)
        yield (PrefixStore(worker_process_prefix, tcp_store), rank, world_size)
    else:
        # Start the TCP store daemon on the rank 0
        start_daemon = rank == 0
        store = TCPStore(master_addr, master_port, world_size, start_daemon, timeout)
        yield (store, rank, world_size)
​
    # If this configuration is invalidated, there is nothing we can do about it
    raise RuntimeError("Unable to perform rerendezvous using env:// method")

3.3 Использование

3.3.1 Использование обработчиков

Как использовать обработчик? Среди init_process_group есть:

rendezvous_iterator = rendezvous(
    init_method, rank, world_size, timeout=timeout
)
store, rank, world_size = next(rendezvous_iterator)

В частности, рандеву выбирает _rendezvous_handler на основе метода init_method, а затем _rendezvous_handler возвращается в хранилище.

def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):
    # Append node-specific arguments.
    result = urlparse(url)
    if rank != -1 or world_size != -1:
        query_dict: Dict[str, Union[int, str]] = dict(
            # mypy doesn't allow dict() to accept List of values (#257)
            pair.split("=") for pair in filter(None, result.query.split("&"))  # type: ignore[arg-type, misc]
        )
        if rank != -1:
            query_dict["rank"] = rank
        if world_size != -1:
            query_dict["world_size"] = world_size

        result = result._replace(
            query="{}".format("&".join(["{}={}".format(k, v) for k, v in query_dict.items()]))
        )
        url = urlunparse(result)

    return _rendezvous_handlers[result.scheme](url, **kwargs)

3.3.2 Использование магазина

Давайте продолжим смотреть, как использовать магазин. В коде init_process_group рядом с инициализацией группы процессов используется store.

default_pg = _new_process_group_helper(
    world_size,
    rank,
    [],
    backend,
    store,
    pg_options=pg_options,
    group_name=group_name,
    timeout=timeout)
_update_default_pg(default_pg)
3.3.2.1 _new_process_group_helper

Чтобы перейти к _new_process_group_helper, давайте сначала рассмотрим несколько глобальных переменных. Информация о следующих переменных ProcessGroup хранится глобально, например _pg_map[pg] = (Backend.NCCL, store).

# Cached process groups
# For NCCL and GLOO pg, it is a map from ProcessGroup to (Backend, Store)
# For MPI pg, it is a map from ProcessGroup to (Backend, None)
_pg_map: Dict[ProcessGroup, Tuple[str, Optional[Store]]] = {}
# Process group's names, map from ProcessGroup to str
_pg_names: Dict[ProcessGroup, str] = {}
# Process group's global rank to local rank mapping
_pg_group_ranks: Dict[ProcessGroup, Dict[int, int]] = {}

_new_process_group_helperПосле получения параметра хранилища на основе этого создается prefix_store, а затем на основе этого pre_store генерируется ProcessGroupGloo._new_process_group_helperКод выглядит следующим образом:

def _new_process_group_helper(world_size,
                              rank,
                              group_ranks,
                              backend,
                              store,
                              pg_options=None,
                              group_name=None,
                              timeout=default_pg_timeout):
    """
    Create a new distributed process group.

    This function must be called by ALL processes in the global group, even if
    the calling process is not part of the newly created group. In that case,
    this function returns GroupMember.NON_GROUP_MEMBER.

    This function is called with ``group_ranks == []`` for the default group.
    """
    global _pg_map
    global _group_count
    global _pg_names

    if not group_name:
        group_name = str(_group_count)
        _group_count += 1

    # The list of group ranks is empty if we're creating the default group.
    is_default_group = (len(group_ranks) == 0)

    backend = Backend(backend)
    pg: Union[ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL]
    if backend == Backend.MPI: # 没有使用store
        pg = ProcessGroupMPI.create(group_ranks)
        if not pg:
            return GroupMember.NON_GROUP_MEMBER
        _pg_map[pg] = (Backend.MPI, None)
        _pg_names[pg] = group_name
    else:
      	# 这里会使用store
      
        # If this is a subgroup (which means group_ranks is specified),
        # we check if the current process is a member of the new group.
        if not is_default_group:
            global_rank = _get_default_group().rank()
            if global_rank not in group_ranks:
                return GroupMember.NON_GROUP_MEMBER

        # Use the group name as prefix in the default store, such that
        # a single store can be reused by multiple groups.
        
        prefix_store = PrefixStore(group_name, store) # 构建了 PrefixStore

        if backend == Backend.GLOO:
            pg = ProcessGroupGloo(
                prefix_store, # 使用PrefixStore构建进程组
                rank,
                world_size,
                timeout=timeout)
            _pg_map[pg] = (Backend.GLOO, store)
            _pg_names[pg] = group_name
        elif backend == Backend.NCCL:
            if pg_options is not None:
                assert isinstance(pg_options, ProcessGroupNCCL.Options), \
                    "Expected pg_options argument to be of type ProcessGroupNCCL.Options"
            else:
                # default pg_options for NCCL
                pg_options = ProcessGroupNCCL.Options()
                pg_options.is_high_priority_stream = False
                pg_options._timeout = timeout

            pg = ProcessGroupNCCL(
                prefix_store, # 使用PrefixStore构建进程组
                rank,
                world_size,
                pg_options)
            _pg_map[pg] = (Backend.NCCL, store)
            _pg_names[pg] = group_name
        else:
            pg = getattr(Backend, backend.upper())(
                prefix_store,
                rank,
                world_size,
                timeout)
            _pg_map[pg] = (backend, store)
            _pg_names[pg] = group_name

    return pg
3.3.2.2 ProcessGroupGloo

В ProcessGroupGloo есть специфические применения, такие как создание GlooStore в PrefixStore, использование PrefixStore для создания сети и так далее.

ProcessGroupGloo::ProcessGroupGloo(
    const c10::intrusive_ptr<Store>& store,
    int rank,
    int size,
    c10::intrusive_ptr<Options> options)
    : ProcessGroup(rank, size),
      store_(new GlooStore(store)), // 在PrefixStore之上生成了一个GlooStore
      options_(options),
      stop_(false),
      collectiveCounter_(0) {
  auto& devices = options->devices;

  contexts_.reserve(options->devices.size());
  for (size_t i = 0; i < options->devices.size(); i++) {
    auto context = std::make_shared<::gloo::rendezvous::Context>(rank_, size_);
    // 又生成了一个PrefixStore
    auto store = ::gloo::rendezvous::PrefixStore(std::to_string(i), *store_);
    context->setTimeout(options->timeout);
    // 利用 PrefixStore 建立网络
    context->connectFullMesh(store, options->devices[i]);
    contexts_.push_back(std::move(context));
  }

  // Every worker thread stores the AsyncWork object it's currently
  // working on in the workInProgress_ vector. It must have size equal
  // to the number of workers such that they can simply index into it
  // using the worker index they are started with.
  workInProgress_.resize(options->threads);

  threads_.resize(options->threads);
  for (size_t i = 0; i < threads_.size(); i++) {
    threads_[i] = std::thread(&ProcessGroupGloo::runLoop, this, i);
  }
}

В следующем коде также используются функции store_, такие как ожидание, доступ.

void ProcessGroupGloo::setSequenceNumberForGroup() {
  if (rank_ == 0) {
    // Create and broadcast sequence number
    auto seq = 1 + rand();
    sequenceNum_ = c10d::SequenceNum(seq);
    std::vector<char> values = c10d::toVec<char>(seq, kBytes);
    store_->set(kSeqNumStoreKey, values); // 存value
  } else {
    // Read rank 0's sequence number from store.
    sequenceNum_ = c10d::SequenceNum();
    store_->wait({kSeqNumStoreKey}, options_->timeout); // 等待
    std::vector<char> values = store_->get(kSeqNumStoreKey); // 取value
    uint64_t num = c10d::fromVec<char>(values);
    sequenceNum_->set(num);
  }
}  

3.4 Резюме

Исходя из текущих результатов анализа, мы расширяем наши выводы следующим образом:

  • init_method наконец падает на хранилище, а хранилище — это сущность, которая работает.
  • Участвующие процессы должны находить друг друга и обмениваться информацией, чтобы иметь возможность общаться. Этот процесс называется рандеву.
  • рандеву фактически возвращает определенный магазин для последующего общения.
  • В группе процессов хранилище используется для построения связи, ожидания, доступа и т. д.

Затем мы выбираем TCPStore для анализа убеждений.

0x04 TCPStore

TCPStore — это реализация распределенного хранилища ключей и значений, основанная на TCP. Сервер хранит/сохраняет данные, а клиент хранилища может подключаться к серверному хранилищу через TCP и выполнять такие задачи, какset()вставить пары ключ-значение,get()Получить одноранговые операции "ключ-значение". В системе должен быть инициализирован сервер хранения TCPStore, так как клиенты хранилища будут ждать, пока эта служба хранилища установит соединение.

Параметры TCPStore следующие:

  • host_name (str) — имя хоста или IP-адрес. Сервер хранения работает на нем.

  • порт (целое) — порт, на котором сервер хранения прослушивает входящие запросы.

  • world_size ( int , необязательно) — общее количество пользователей.

    • world_size = количество клиентов + 1, где 1 — это сервер.
    • Значение по умолчанию равно -1 (отрицательные значения указывают на переменное количество пользователей).
  • is_master (bool, необязательный) — true при инициализации сервера хранения, false при инициализации клиента хранилища. Значение по умолчанию — ложь.

  • тайм-аут ( timedelta , необязательный) — тайм-аут, используемый хранилищем во время инициализации, а также методами get() и wait(). По умолчанию используется timedelta (секунды = 300).

  • wait_for_worker (bool, необязательный) — следует ли ждать, пока все рабочие процессы подключатся к серверу хранения. Это применимо только в том случае, если world_size является фиксированным значением. Значение по умолчанию верно.

Пример использования следующий:

import torch.distributed as dist
from datetime import timedelta
# Run on process 1 (server)
server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30))
# Run on process 2 (client)
client_store = dist.TCPStore("127.0.0.1", 1234, 2, False)
# Use any of the store methods from either the client or server after initialization
server_store.set("first_key", "first_value")
client_store.get("first_key")

или

    >>> import torch.distributed as dist
    >>> from datetime import timedelta
    >>> # Using TCPStore as an example, other store types can also be used
    >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
    >>> # This will throw an exception after 10 seconds
    >>> store.wait(["bad_key"], timedelta(seconds=10))

В примере это простые отношения между сервером, клиентом или мастером, работником, мы подробно проанализируем их далее.

4.1 TCPStore in python

В мире Python хост и порт просто устанавливаются.

class TCPStore(Store):
    def __init__(self, host_name, port, world_size=-1, is_master=False, timeout=None, *args, **kwargs): # real signature unknown; NOTE: unreliably restored from __doc__ 
        pass
​
    host = property(lambda self: object(), lambda self, v: None, lambda self: None)  # default
    """Gets the hostname on which the store listens for requests."""
​
    port = property(lambda self: object(), lambda self, v: None, lambda self: None)  # default
    """Gets the port number on which the store listens for requests."""

Нам нужно заглянуть вглубь мира C++.

4.2 TCPStore in CPP

4.2.1 API-интерфейс

Прежде всего, TCPStore в C++ можно рассматривать как API-интерфейс, который определяется следующим образом:

class TCPStore : public Store {
 public:
  explicit TCPStore(
      const std::string& masterAddr,
      PortType masterPort,
      c10::optional<int> numWorkers = c10::nullopt_t(-1),
      bool isServer = false,
      const std::chrono::milliseconds& timeout = kDefaultTimeout,
      bool waitWorkers = true);

  virtual ~TCPStore();

  void set(const std::string& key, const std::vector<uint8_t>& value) override;
  std::vector<uint8_t> compareSet(
      const std::string& key,
      const std::vector<uint8_t>& expectedValue,
      const std::vector<uint8_t>& desiredValue) override;
  std::vector<uint8_t> get(const std::string& key) override;
  int64_t add(const std::string& key, int64_t value) override;
  bool deleteKey(const std::string& key) override;

  // NOTE: calling other TCPStore APIs inside the callback is NOT threadsafe
  // watchKey() is a blocking operation. It will register the socket on
  // TCPStoreMasterDaemon and the callback on TCPStoreWorkerDaemon. It will
  // return once it has verified the callback is registered on both background
  // threads. Only one thread can call watchKey() at a time.
  void watchKey(const std::string& key, WatchKeyCallback callback) override;
  bool check(const std::vector<std::string>& keys) override;
  int64_t getNumKeys() override;
  void wait(const std::vector<std::string>& keys) override;
  void wait(
      const std::vector<std::string>& keys,
      const std::chrono::milliseconds& timeout) override;
  // Waits for all workers to join.
  void waitForWorkers();
  // Returns the hostname used by the TCPStore.
  const std::string& getHost() const noexcept;
  // Returns the port used by the TCPStore.
  PortType getPort() const noexcept;

 private:
  int64_t addHelper_(const std::string& key, int64_t value);
  std::vector<uint8_t> getHelper_(const std::string& key);
  void waitHelper_(
      const std::vector<std::string>& keys,
      const std::chrono::milliseconds& timeout);

  std::mutex watchKeyMutex_;
  bool isServer_;
  int storeSocket_ = -1; // 
  int listenSocket_ = -1; // 
  int masterListenSocket_ = -1; // master 在这里监听

  std::string tcpStoreAddr_;
  PortType tcpStorePort_;

  c10::optional<int> numWorkers_;
  const std::string initKey_;
  const std::string regularPrefix_;

  std::unique_ptr<TCPStoreMasterDaemon> tcpStoreMasterDaemon_ = nullptr;
  std::unique_ptr<TCPStoreWorkerDaemon> tcpStoreWorkerDaemon_ = nullptr;
};


4.2.2 Использование сокета

Наиболее важными среди его переменных-членов являются три сокета, или они суть (сложность) хранилища.

  int storeSocket_ = -1; // 
  int listenSocket_ = -1; // 
  int masterListenSocket_ = -1; // master 在这里监听
4.2.2.1 Бизнес-подразделение

Конкретное объяснение выглядит следующим образом (анализ будет продолжен в сочетании с кодом позже):

  • masterListenSocket_ прослушивается поверх masterPort.

    • tcpStoreMasterDaemon_Это сам мастер, сервер, который предоставляет услуги для всего TCPStore.
    • tcpStoreMasterDaemon_использоватьtcputil::addPollfd(fds, storeListenSocket_, POLLIN)контролироватьmasterListenSocket_.
    • Ключ-значение: std::unordered_map<:string std::vector>> tcpStore.
  • storeSocket_Поверх tcpStoreWorkerDaemon_, который подключается кmasterListenSocket_ : masterPortвыше.

    • storeSocket_Функция состоит в том, чтобы инкапсулировать операции, обращенные к главному порту.Пользователям нужно только выполнять установку, получение и другие операции, не зная главного порта.
    • Роль набора (ключ, данные) заключается в передачеstoreSocket_Отправьте запрос мастеру, чтобы установить ключ: значение.
    • tcpStoreMasterDaemon_При прослушивании изменений сокета он начинает реагировать.
    • tcpStoreMasterDaemon_Внутренне добавляет ключ: значение кstd::unordered_map<std::string, std::vector<uint8_t>> tcpStore_выше.
  • listenSocket_ поверх tcpStoreWorkerDaemon_, также подключенный кmasterListenSocket_: masterPortвыше. Ниже есть развязка, как отмечено в комментариях,It will register the socket on TCPStoreMasterDaemon and the callback on TCPStoreWorkerDaemon.

    • listenSocket_ инкапсулирует обработку watchKey. Использование клиента магазинаwatchKey(const std::string& key, WatchKeyCallback callback)Запросить регистрацию, т.е.:

      • Работник запрашивает регистрацию. использоватьtcpStoreWorkerDaemon_->setCallback(regKey, callback)придти дляtcpStoreWorkerDaemon_изstd::unordered_map<std::string, WatchKeyCallback> keyToCallbacks_Добавьте обратный вызов сверху.
      • Работник отправляет запрос. Отправьте мастеру сообщение (ключ, WATCH_KEY) через listenSocket_, чтобы указать мастеру вызвать этот обратный вызов, если значение ключа изменится.
    • Мастер выполняет регистрацию. Мастер регистрируется после получения сообщения WATCH_KEY, вызывает watchHandler, использует watchSockets_[key].push_back(socket) для настройки и говорит себе, что если ключ изменится, отправить сообщение в этот сокет.

    • Мастер уведомляет работника. В TCPStoreMasterDaemon::setHandler, после установки нового значения, вызовите sendKeyUpdatesToClients, он будет проходить по WatchSockets_[key], если есть сокет, он отправит уведомление об изменении сообщения в сокет.

    • Рабочий выполняет обратный вызов. Поэтому, если ключ меняется, этот обратный вызов вызывается в tcpStoreWorkerDaemon_.

4.2.2.2 Пример установки

Давайте сначала рассмотрим пример Set следующим образом, то есть Worker устанавливает значение на Master через сокет.

                                                                          +
+----------------------------------------------------------------------+  |  +----------------------------------------------+
| TCPStore                                                      Master |  |  | TCPStore                              Worker |
|                                                                      |  |  |                                              |
|                                                                      |  |  |                                              |
|                                                                      |  |  |                                              |
|   +------------------------------------------------------------+     |  |  |                                              |
|   | TcpStoreMasterDaemon_                            MasterPort|     |  |  |                                              |
|   |                                                            |     |  |  |                                              |
|   |    TCPStore.masterListenSocket_                            |     |  |  |      +---------------------------------+     |
|   |                                                            |     |  |  |      | set(key, value)                 |     |
|   |                                                            |     |  |  |      |                                 |     |
|   |    tcpStore_[key] = value  <------------------------------------------------+ |    storeSocket_                 |     |
|   |                                                            |     |  |  |      |                                 |     |
|   |                                                            |     |  |  |      +---------------------------------+     |
|   |                                                            |     |  |  |                                              |
|   +------------------------------------------------------------+     |  |  |                                              |
|                                                                      |  |  |                                              |
+----------------------------------------------------------------------+  |  +----------------------------------------------+
                                                                          +


Телефон такой:

4.2.2.3 Комбинация Set и watchKey

Схематическая диаграмма комбинации Set и watchKey выглядит следующим образом (воркер запрашивает регистрацию и выполняет обратный вызов; мастер выполняет регистрацию и сообщает рабочему процессу выполнить обратный вызов):

  1. Работник запрашивает регистрацию. Использование клиента магазинаwatchKey(const std::string& key, WatchKeyCallback callback)это использоватьtcpStoreWorkerDaemon_->setCallback(regKey, callback)придти дляtcpStoreWorkerDaemon_изstd::unordered_map<std::string, WatchKeyCallback> keyToCallbacks_Добавьте обратный вызов сверху.
  2. Работник отправляет запрос. Рабочий процесс отправляет мастеру сообщение (ключ, WATCH_KEY) через listenSocket_, говоря мастеру вызвать этот обратный вызов, если значение ключа изменится.
  3. Мастер выполняет регистрацию. После того, как Мастер получает сообщение WATCH_KEY, он вызывает watchHandler, использует WatchSockets_[key].push_back(socket) для настройки и говорит себе, что если ключ изменится, он отправит сообщение в этот сокет.
  4. Ниже мы предполагаем, что у Store Client (предполагается, что это тот же самый рабочий параметр, но на самом деле это может быть другой рабочий процесс) установлено значение.
  5. Мастер уведомляет работника. В TCPStoreMasterDaemon::setHandler, если Мастер вызывает sendKeyUpdatesToClients после установки нового значения, он будет проходить по WatchSockets_[key], и если есть сокет, он отправит уведомление об изменении сообщения в сокет.
  6. Рабочий выполняет обратный вызов. Этот обратный вызов вызывается в tcpStoreWorkerDaemon_ при изменении ключа.
+----------------------------------------------------------------------+  +  +------------------------------------------------------------------------+
| TCPStore                                                      Master |  |  | TCPStore                                                        Worker |
|                                                                      |  |  |                                                                        |
|   +------------------------------------------------------------+     |  |  |                                                                        |
|   | TcpStoreMasterDaemon_                            MasterPort|     |  |  |      +---------------------------------+                               |
|   |                                                            |     |  |  |      |                                 |                               |
|   |                                                  2         |     |  |  |      | watchKey(key, callback) +----------------------+                |
|   |           TCPStore.masterListenSocket_   <----------------------------------+ |                                 |              |                |
|   |                       +                                    |     |  |  |      |    listenSocket_                |              |                |
|   |                       | 3                                  |     |  |  |      |                                 |            1 |                |
|   |                       v                                    |     |  |  |      |                                 |              |                |
|   |           watchedSockets_[key] = socket                    |     |  |  |      +---------------------------------+              |                |
|   |                                                            |     |  |  |                                                       |                |
|   |  +-------------------------------------------------+       |     |  |  |                                                       |                |
|   |  |                                                 |       |     |  |  |                                                       |                |
|   |  |    setHandler                                   |       |     |  |  |   +----------------------------------------------------------------+   |
|   |  |                                                 |       |     |  |  |   | TCPStoreWorkerDaemon                              |            |   |
|   |  |                                                 |       |     |  |  |   |                                                   v            |   |
|   |  |       tcpStore_[key] = newData                  |       |     |  |  |   |   unordered_map<string, WatchKeyCallback> keyToCallbacks_      |   |
|   |  |                   +                             |       |     |  |  |   |                                                                |   |
|   |  |                   |                             |       |     |  |  |   |   TCPStore.listenSocket_                                       |   |
|   |  |                   |                             |       |     |  |  |   |                                                                |   |
|   |  |                   v                             |       |     |  |  |   |  +----------------------------------------------------------+  |   |
|   |  |       sendKeyUpdatesToClients                   |       |     |  |  |   |  | run                                                      |  |   |
|   |  |                   +                             |       |  5  |  |  |   |  |                                                          |  |   |
|   |  |                   |                             |  +---------------------->+                                        6                 |  |   |
|   |  |                   |                             |  |    |     |  |  |   |  |       callbackHandler +-----> keyToCallbacks_(callback)  |  |   |
|   |  |                   v                             |  |    |     |  |  |   |  |                                                          |  |   |
|   |  |                                                 |  |    |     |  |  |   |  +----------------------------------------------------------+  |   |
|   |  |    for (int socket : watchedSockets_[key]){     |  |    |     |  |  |   +----------------------------------------------------------------+   |
|   |  |       tcputil::sendString(socket, key, true) +-----+    |     |  |  |                                                                        |
|   |  |    }                                            |       |     |  |  |                                                                        |
|   |  |                                                 |       |     |  |  |       +------------------------+                                       |
|   |  |                                                 |       |  4  |  |  |       | set(key, newData)      |                                       |
|   |  |                                                 | <-----------------------+ |                        |                                       |
|   |  +-------------------------------------------------+       |     |  |  |       |                        |                                       |
|   |                                                            |     |  |  |       +------------------------+                                       |
|   +------------------------------------------------------------+     |  |  |                                                                        |
|                                                                      |  |  |                                                                        |
+----------------------------------------------------------------------+  +  +------------------------------------------------------------------------+


Телефон такой:

4.2.3 Функциональная функция

TCPStore предоставляет несколько функциональных функций.

void TCPStore::set(const std::string& key, const std::vector<uint8_t>& data) {
  std::string regKey = regularPrefix_ + key;
  tcputil::sendValue<QueryType>(storeSocket_, QueryType::SET);
  tcputil::sendString(storeSocket_, regKey, true);
  tcputil::sendVector<uint8_t>(storeSocket_, data);
}

std::vector<uint8_t> TCPStore::get(const std::string& key) {
  std::string regKey = regularPrefix_ + key;
  return getHelper_(regKey);
}

int64_t TCPStore::add(const std::string& key, int64_t value) {
  std::string regKey = regularPrefix_ + key;
  return addHelper_(regKey, value);
}

int64_t TCPStore::addHelper_(const std::string& key, int64_t value) {
  tcputil::sendValue<QueryType>(storeSocket_, QueryType::ADD);
  tcputil::sendString(storeSocket_, key, true);
  tcputil::sendValue<int64_t>(storeSocket_, value);
  return tcputil::recvValue<int64_t>(storeSocket_);
}

Эти функциональные функции вызывают следующие основные функции для отправки и получения.

// this is only for convenience when sending rvalues
template <typename T>
void sendValue(int socket, const T& value, bool moreData = false) {
  sendBytes<T>(socket, &value, 1, moreData);
}
​
template <typename T>
T recvValue(int socket) {
  T value;
  recvBytes<T>(socket, &value, 1);
  return value;
}

4.2.4 Построение функций

Мы можем видеть из функции сборки:

  • Для роли сервера хранения главное запуститьtcpStoreMasterDaemon_, обратите внимание, что после запуска демона сервер переходит в состояние ожидания для воркеров,tcpStoreWorkerDaemon_ в следующем коде не будет запущен.
  • Для клиентов хранилища запускается tcpStoreWorkerDaemon_.
// TCPStore class methods
TCPStore::TCPStore(
    const std::string& masterAddr,
    PortType masterPort,
    c10::optional<int> numWorkers,
    bool isServer,
    const std::chrono::milliseconds& timeout,
    bool waitWorkers)
    : Store(timeout),
      isServer_(isServer),
      tcpStoreAddr_(masterAddr),
      tcpStorePort_(masterPort),
      numWorkers_(numWorkers),
      initKey_("init/"),
      regularPrefix_("/") {
  tcputil::socketInitialize();
  if (isServer_) { // 如果设置了是server,就在masterPort上监听
    // Opening up the listening socket
    std::tie(masterListenSocket_, tcpStorePort_) = tcputil::listen(masterPort);
  }
  try {
    if (isServer_) { // 如果设置了是server,就启动 tcpStoreMasterDaemon_
      // Now start the daemon
      tcpStoreMasterDaemon_ =
          std::make_unique<TCPStoreMasterDaemon>(masterListenSocket_);
    }
    // Connect to the daemon
    // worker 会与 master port 建立联系
    storeSocket_ = tcputil::connect(
        tcpStoreAddr_, tcpStorePort_, /* wait= */ true, timeout_);
    if (numWorkers.value_or(-1) >= 0 && waitWorkers) {
      waitForWorkers(); // server 等待 worker
    }

    // socket to handle requests from server,因为 master 也会给 worker 发消息
    listenSocket_ = tcputil::connect(
        tcpStoreAddr_, tcpStorePort_, /* wait= */ true, timeout_);
    // 启动 worker daemon
    tcpStoreWorkerDaemon_ =
        std::make_unique<TCPStoreWorkerDaemon>(listenSocket_);
  } catch (const std::exception&) {
    if (isServer_) {
      tcpStoreMasterDaemon_ = nullptr;
      tcputil::closeSocket(masterListenSocket_);
    }
    tcpStoreWorkerDaemon_ = nullptr;
    if (listenSocket_ != -1) {
      tcputil::closeSocket(listenSocket_);
    }
    if (storeSocket_ != -1) {
      tcputil::closeSocket(storeSocket_);
    }
    throw;
  }
}

Сервер будет использовать следующую функцию для ожидания работника.

void TCPStore::waitForWorkers() {
  addHelper_(initKey_, 1);
  // Let server block until all workers have completed, this ensures that
  // the server daemon thread is always running until the very end
  if (isServer_) {
    const auto start = std::chrono::steady_clock::now();
    while (true) {
      std::vector<uint8_t> value = getHelper_(initKey_);
      auto buf = reinterpret_cast<const char*>(value.data());
      auto len = value.size();
      int numWorkersCompleted = std::stoi(std::string(buf, len));
      if (numWorkersCompleted >= numWorkers_.value_or(-1)) {
        break;
      }
      const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
          std::chrono::steady_clock::now() - start);
      if (timeout_ != kNoTimeout && elapsed > timeout_) {
        break;
      }
      /* sleep override */
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
  }
}

4.2.5 TCPStoreWorkerDaemon

Этот демон-процесс используется только для обработки watchKey.

// Separate thread that is launched on all instances (including master)
// Right now only handles callbacks registered from watchKey()
class TCPStoreWorkerDaemon : public BackgroundThread {
 public:
  explicit TCPStoreWorkerDaemon(int listenSocket);
  // Set the callback to run key change
  void setCallback(std::string key, WatchKeyCallback cb);
  void waitForCallbackRegistration() {
    // Block until callback has been registered successfully
    std::unique_lock<std::mutex> callbackRegistrationLock(
        callbackRegistrationMutex_);
    callbackRegisteredCV_.wait(
        callbackRegistrationLock, [&] { return callbackRegisteredData_; });

    // Reset payload for next callback
    callbackRegisteredData_ = false;
  }
  void setCallbackRegistered() {
    callbackRegisteredData_ = true;
    callbackRegisteredCV_.notify_one();
  }

 private:
  void run();
  void callbackHandler(int socket);
  // List of callbacks map each watched key
  std::unordered_map<std::string, WatchKeyCallback> keyToCallbacks_;
  std::mutex keyToCallbacksMutex_;
  std::mutex callbackRegistrationMutex_;
  std::condition_variable callbackRegisteredCV_;
  bool callbackRegisteredData_ = false;
};



Его конструктор просто создает поток.

// TCPStoreListener class methods
TCPStoreWorkerDaemon::TCPStoreWorkerDaemon(int listenSocket)
    : BackgroundThread(listenSocket) {
  daemonThread_ = std::thread(&TCPStoreWorkerDaemon::run, this);
}
4.2.5.1 watchKey

Использование клиентского магазинаwatchKey(const std::string& key, WatchKeyCallback callback)Функция состоит в том, чтобы зарегистрировать контрольный ключ у мастера:

  • Работник запрашивает регистрацию. использоватьtcpStoreWorkerDaemon_->setCallback(regKey, callback)придти дляtcpStoreWorkerDaemon_изstd::unordered_map<std::string, WatchKeyCallback> keyToCallbacks_Добавьте обратный вызов сверху.
  • Работник отправляет запрос. Отправьте мастеру сообщение (ключ, WATCH_KEY) через listenSocket_, чтобы указать мастеру вызвать этот обратный вызов, если значение ключа изменится.
  • Затем используйте функцию waitForCallbackRegistration, чтобы дождаться завершения регистрации.
void TCPStore::watchKey(const std::string& key, WatchKeyCallback callback) {
  // Only allow one thread to perform watchKey() at a time
  const std::lock_guard<std::mutex> watchKeyLock(watchKeyMutex_);

  // Register callback with TCPStoreMasterDaemon to call TCPStoreWorkerDaemon on
  // key change
  std::string regKey = regularPrefix_ + key;
  tcpStoreWorkerDaemon_->setCallback(regKey, callback);
  tcputil::sendValue<QueryType>(listenSocket_, QueryType::WATCH_KEY);
  tcputil::sendString(listenSocket_, regKey);

  // Block until callback has been registered successfully
  tcpStoreWorkerDaemon_->waitForCallbackRegistration();
}
4.2.5.2 Эксплуатация

Его работа разделена на окна и другие системы, но главное - получить бизнес-ключ, а затем выполнить соответствующую бизнес-обработку.

  • Мастер выполняет регистрацию. После того, как Мастер получает сообщение WATCH_KEY, он вызывает watchHandler, использует WatchSockets_[key].push_back(socket) для настройки и говорит себе, что если ключ изменится, он отправит сообщение в этот сокет.
  • Мастер уведомляет работника. В TCPStoreMasterDaemon::setHandler, после установки нового значения, вызовите sendKeyUpdatesToClients, он будет проходить по WatchSockets_[key], если есть сокет, он отправит уведомление об изменении сообщения в сокет.
  • Рабочий выполняет обратный вызов. Поэтому, если ключ меняется, этот обратный вызов вызывается в tcpStoreWorkerDaemon_.
#ifdef _WIN32 
void TCPStoreWorkerDaemon::run() { // 这里是windows系统
  std::vector<struct pollfd> fds;
  tcputil::addPollfd(fds, storeListenSocket_, POLLIN);

  while (true) {
    // Check control and exit early if triggered
    int res;
    SYSCHECK_ERR_RETURN_NEG1(
        res = WSAPoll(fds.data(), fds.size(), checkTimeout_.count()))
    if (res == 0) {
      auto rvPoll = WaitForSingleObject(ghStopEvent_, 0);
      if (rvPoll != WAIT_TIMEOUT) {
        break;
      }
      continue;
    }

    // if connection is closed gracefully by master, peeked data will return 0
    char data;
    int ret = recv(fds[0].fd, &data, 1, MSG_PEEK);
    if (ret == 0) {
      auto rvData = WaitForSingleObject(ghStopEvent_, 0);
      if (rvData != WAIT_TIMEOUT) {
        break;
      }
      continue;
    }

    // valid request, perform callback logic
    callbackHandler(fds[0].fd); // 业务处理
  }
}
#else
void TCPStoreWorkerDaemon::run() {
  std::vector<struct pollfd> fds;
  tcputil::addPollfd(fds, controlPipeFd_[0], POLLHUP);
  tcputil::addPollfd(fds, storeListenSocket_, POLLIN);

  while (true) {
    SYSCHECK_ERR_RETURN_NEG1(::poll(fds.data(), fds.size(), -1));

    // Check control and exit early if triggered
    // The pipe receives an event which tells us to shutdown the listener thread
    if (fds[0].revents != 0) {
      // Will be POLLUP when the pipe is closed
      if (fds[0].revents ^ POLLHUP) {
        throw std::system_error(
            ECONNABORTED,
            std::system_category(),
            "Unexpected poll revent on the control pipe's reading fd: " +
                std::to_string(fds[0].revents));
      }
      break;
    }

    // if connection is closed gracefully by master, peeked data will return 0
    char data;
    int ret = recv(fds[1].fd, &data, 1, MSG_PEEK);
    if (ret == 0) {
      continue;
    }

    // valid request, perform callback logic
    callbackHandler(fds[1].fd); // 业务处理
  }
}
#endif


4.2.6 TCPStoreMasterDaemon

Здесь std::unordered_map<:string std::vector>> tcpStore_; — настоящий kv.

Поэтому TCPStoreMasterDaemon отвечает за операции kv, такие как доступ.

// Separate thread that is only launched on master
class TCPStoreMasterDaemon : public BackgroundThread {
 public:
  explicit TCPStoreMasterDaemon(int storeListenSocket);

 private:
  void run();
  void queryFds(std::vector<struct pollfd>& fds);
  void query(int socket);

  // The master runs on a single thread so only
  // one handler can be executed at a time
  void setHandler(int socket);
  void compareSetHandler(int socket);
  void addHandler(int socket);
  void getHandler(int socket) const;
  void checkHandler(int socket) const;
  void getNumKeysHandler(int socket) const;
  void deleteHandler(int socket);
  void waitHandler(int socket);
  void watchHandler(int socket);

  bool checkKeys(const std::vector<std::string>& keys) const;
  // Helper function to alerts waiting workers, used in setHandler, getHandler
  void wakeupWaitingClients(const std::string& key);
  // Helper function used when the key is changed
  // used in setHandler, addHandler, getHandler, deleteHandler
  void sendKeyUpdatesToClients(
      const std::string& key,
      const enum WatchResponseType& type,
      std::vector<uint8_t>& oldData,
      std::vector<uint8_t>& newData);
  std::unordered_map<std::string, std::vector<uint8_t>> tcpStore_;
  // From key -> the list of sockets waiting on the key
  std::unordered_map<std::string, std::vector<int>> waitingSockets_;
  // From socket -> number of keys awaited
  std::unordered_map<int, size_t> keysAwaited_;
  // From key -> the list of sockets watching the key
  std::unordered_map<std::string, std::vector<int>> watchedSockets_;
};
4.2.6.1 Эксплуатация

TCPStoreMasterDaemon ожидает сокета, то есть masterListenSocket_ прослушивает masterPort.

  • tcpStoreMasterDaemon_использоватьtcputil::addPollfd(fds, storeListenSocket_, POLLIN)контролироватьmasterListenSocket_.
  • tcpStoreMasterDaemon_ сам становится мастером, сервером, предоставляющим услуги для всего TCPStore.
  • Ключ-значение: std::unordered_map<:string std::vector>> tcpStore.
#ifdef _WIN32
void TCPStoreMasterDaemon::run() {
  std::vector<struct pollfd> fds;
  tcputil::addPollfd(fds, storeListenSocket_, POLLIN);

  // receive the queries
  bool finished = false;
  while (!finished) {
    for (size_t i = 0; i < sockets_.size(); i++) {
      fds[i].revents = 0;
    }

    int res;
    SYSCHECK_ERR_RETURN_NEG1(
        res = WSAPoll(fds.data(), fds.size(), checkTimeout_.count()))
    if (res == 0) {
      auto rv = WaitForSingleObject(ghStopEvent_, 0);
      if (rv != WAIT_TIMEOUT) {
        finished = true;
        break;
      }
      continue;
    }

    // TCPStore's listening socket has an event and it should now be able to
    // accept new connections.
    if (fds[0].revents != 0) { // 收到了消息
      if (!(fds[0].revents & POLLIN)) {
        throw std::system_error(
            ECONNABORTED,
            std::system_category(),
            "Unexpected poll revent on the master's listening socket: " +
                std::to_string(fds[0].revents));
      }
      int sockFd = std::get<0>(tcputil::accept(storeListenSocket_));
      sockets_.push_back(sockFd);
      tcputil::addPollfd(fds, sockFd, POLLIN);
    }
    queryFds(fds); // 业务处理
  }
}
#else

void TCPStoreMasterDaemon::run() {
  std::vector<struct pollfd> fds;
  tcputil::addPollfd(fds, storeListenSocket_, POLLIN);
  // Push the read end of the pipe to signal the stopping of the daemon run
  tcputil::addPollfd(fds, controlPipeFd_[0], POLLHUP);

  // receive the queries
  bool finished = false;
  while (!finished) {
    for (size_t i = 0; i < sockets_.size(); i++) {
      fds[i].revents = 0;
    }

    SYSCHECK_ERR_RETURN_NEG1(::poll(fds.data(), fds.size(), -1));

    // TCPStore's listening socket has an event and it should now be able to
    // accept new connections.
    if (fds[0].revents != 0) {
      if (fds[0].revents ^ POLLIN) {
        throw std::system_error(
            ECONNABORTED,
            std::system_category(),
            "Unexpected poll revent on the master's listening socket: " +
                std::to_string(fds[0].revents));
      }
      int sockFd = std::get<0>(tcputil::accept(storeListenSocket_));
      sockets_.push_back(sockFd);
      tcputil::addPollfd(fds, sockFd, POLLIN);
    }

    // The pipe receives an event which tells us to shutdown the daemon
    if (fds[1].revents != 0) { // 收到了消息
      // Will be POLLUP when the pipe is closed
      if (fds[1].revents ^ POLLHUP) {
        throw std::system_error(
            ECONNABORTED,
            std::system_category(),
            "Unexpected poll revent on the control pipe's reading fd: " +
                std::to_string(fds[1].revents));
      }
      finished = true;
      break;
    }
    queryFds(fds); // 业务处理
  }
}
#endif
4.2.6.2 Вызвать бизнес

queryFds будет вызывать разные службы в соответствии с результатом мониторинга сокета.

void TCPStoreMasterDaemon::queryFds(std::vector<struct pollfd>& fds) {
  // Skipping the fds[0] and fds[1],
  // fds[0] is master's listening socket
  // fds[1] is control pipe's reading fd, it is not for Windows platform
  for (size_t fdIdx = CONNECT_SOCKET_OFFSET; fdIdx < fds.size(); ++fdIdx) {
    if (fds[fdIdx].revents == 0) {
      continue;
    }

    // Now query the socket that has the event
    try {
      query(fds[fdIdx].fd); // 处理业务
    } catch (...) {
      tcputil::closeSocket(fds[fdIdx].fd);

      // Remove all the tracking state of the close FD
      for (auto it = waitingSockets_.begin(); it != waitingSockets_.end();) {
        for (auto vecIt = it->second.begin(); vecIt != it->second.end();) {
          if (*vecIt == fds[fdIdx].fd) {
            vecIt = it->second.erase(vecIt);
          } else {
            ++vecIt;
          }
        }
        if (it->second.size() == 0) {
          it = waitingSockets_.erase(it);
        } else {
          ++it;
        }
      }
      for (auto it = keysAwaited_.begin(); it != keysAwaited_.end();) {
        if (it->first == fds[fdIdx].fd) {
          it = keysAwaited_.erase(it);
        } else {
          ++it;
        }
      }
      fds.erase(fds.begin() + fdIdx);
      sockets_.erase(sockets_.begin() + fdIdx - CONNECT_SOCKET_OFFSET);
      --fdIdx;
      continue;
    }
  }
}


4.2.6.4 Процессинговый бизнес

Прочитайте сообщение из сокета и выполните соответствующую бизнес-обработку в соответствии с содержимым сообщения.

// query communicates with the worker. The format
// of the query is as follows:
// type of query | size of arg1 | arg1 | size of arg2 | arg2 | ...
// or, in the case of wait
// type of query | number of args | size of arg1 | arg1 | ...
void TCPStoreMasterDaemon::query(int socket) {
  QueryType qt;
  tcputil::recvBytes<QueryType>(socket, &qt, 1);
  if (qt == QueryType::SET) {
    setHandler(socket);

  } else if (qt == QueryType::COMPARE_SET) {
    compareSetHandler(socket);

  } else if (qt == QueryType::ADD) {
    addHandler(socket);

  } else if (qt == QueryType::GET) {
    getHandler(socket);

  } else if (qt == QueryType::CHECK) {
    checkHandler(socket);

  } else if (qt == QueryType::WAIT) {
    waitHandler(socket);

  } else if (qt == QueryType::GETNUMKEYS) {
    getNumKeysHandler(socket);

  } else if (qt == QueryType::DELETE_KEY) {
    deleteHandler(socket);

  } else if (qt == QueryType::WATCH_KEY) {
    watchHandler(socket);

  } else {
    throw std::runtime_error("Unexpected query type");
  }
}


Добавить к

Вот бизнес, который занимается добавлением стоимости.

void TCPStoreMasterDaemon::setHandler(int socket) {
  std::string key = tcputil::recvString(socket);
  std::vector<uint8_t> newData = tcputil::recvVector<uint8_t>(socket);
  std::vector<uint8_t> oldData;
  bool newKey = true;
  auto it = tcpStore_.find(key);
  if (it != tcpStore_.end()) {
    oldData = it->second;
    newKey = false;
  }
  tcpStore_[key] = newData;
  // On "set", wake up all clients that have been waiting
  wakeupWaitingClients(key);
  // Send key update to all watching clients
  newKey ? sendKeyUpdatesToClients(
               key, WatchResponseType::KEY_CREATED, oldData, newData)
         : sendKeyUpdatesToClients(
               key, WatchResponseType::KEY_UPDATED, oldData, newData);
}
Получать

Provenance занимается получением ценности.

void TCPStoreMasterDaemon::getHandler(int socket) const {
  std::string key = tcputil::recvString(socket);
  auto data = tcpStore_.at(key);
  tcputil::sendVector<uint8_t>(socket, data);
}
​
watchKey

Для WATCH_KEY к соответствующему ключу добавляется сокет как объект для отправки уведомлений в будущем.

Здесь добавляется ключ, который вы хотите отслеживать.

void TCPStoreMasterDaemon::watchHandler(int socket) {
  std::string key = tcputil::recvString(socket);

  // Record the socket to respond to when the key is updated
  watchedSockets_[key].push_back(socket);

  // Send update to TCPStoreWorkerDaemon on client
  tcputil::sendValue<WatchResponseType>(
      socket, WatchResponseType::KEY_CALLBACK_REGISTERED);
}
Уведомление

Сообщите клиенту, если ключ изменится.

void TCPStoreMasterDaemon::sendKeyUpdatesToClients(
    const std::string& key,
    const enum WatchResponseType& type,
    std::vector<uint8_t>& oldData,
    std::vector<uint8_t>& newData) {
  for (int socket : watchedSockets_[key]) {
    tcputil::sendValue<WatchResponseType>(socket, type);
    tcputil::sendString(socket, key, true);
    tcputil::sendVector<uint8_t>(socket, oldData);
    tcputil::sendVector<uint8_t>(socket, newData);
  }
}

4.2.7 Резюме

Резюмируем легенду следующим образом:

  • Мастер использует MasterPort для мониторинга запросов.

  • О доступе к значению.

    • Среди воркеров storeSocket_ используется для сохранения/получения значения, соответствующего цифре 1 на рисунке ниже.
    • Это соответствует tcpStore_ в Мастере.
  • О мониторинге.

    • Среди воркеров listenSocket_ используется для уведомления Мастера о том, что мне нужно прослушать этот ключ, что соответствует цифре 2 на рисунке ниже. При этом worker внутренне устанавливает callback для этого ключа, что соответствует цифре 3 на рисунке ниже.
    • Мониторинг соответствует МастеруwatchedSockets_[key] = socket_.
    • Среди Мастера, если при установке значения будет обнаружен отслеживаемый ключ, он уведомит об этом WatchSockets_[key], что соответствует цифре 4 на рисунке ниже.
    • Связанные деловые звонки будут совершаться в Worker.
                                                                          +
+----------------------------------------------------------------------+  |  +------------------------------------------------------------------------+
| TCPStore                                                      Master |  |  | TCPStore                                                        Worker |
|                                                                      |  |  |                                                                        |
|   storeSocket_                                                       |  |  |                                                                        |
|                                                                      |  |  |                                                                        |
|   +------------------------------------------------------------+     |  |  |                                                                        |
|   | TcpStoreMasterDaemon_                            MasterPort|     |  |  |  1   +---------------------------------+                               |
|   |                                                            | <--------------+ | set(key, value)                 |                               |
|   |   unordered_map<string, vector<uint8_t> > tcpStore_+---+   |     |  |  |      |                                 |                               |
|   |                                                        |   |     |  |  |      |    storeSocket_                 |                               |
|   |   TCPStore.masterListenSocket_                         |   |     |  |  |      |                                 |                               |
|   |                                                        |   |     |  |  |      +---------------------------------+                               |
|   |   +-----------------------------------------------+    |   |     |  |  |                                                                        |
|   |   |  run                                          |    |   |     |  |  |  2   +---------------------------------+                               |
|   |   |                                               |    |   | <--------------+ |                                 |                               |
|   |   |    queryFds     query                         |    |   |     |  |  |      | watchKey(key, callback) +-------------------------------+       |
|   |   |                                               |    |   |     |  |  |      |                                 |        3              |       |
|   |   |    setHandler   getHandler                    |    |   |     |  |  |      |    listenSocket_                |                       |       |
|   |   |                                               |    |   |     |  |  |      |                                 |                       |       |
|   |   +-----------------------------------------------+    |   |     |  |  |      |                                 |                       |       |
|   |                                                        |   |     |  |  |      +---------------------------------+                       |       |
|   +------------------------------------------------------------+     |  |  |                                                                |       |
|                                                            |         |  |  |                                                                |       |
|                                                            |         |  |  |                                                                |       |
|                                                            |         |  |  |   +----------------------------------------------------------------+   |
|                                                            |         |  |  |   | TCPStoreWorkerDaemon                                       |   |   |
|                                                            |         |  |  |   |                                                            |   |   |
|                                                            |         |  |  |   |   unordered_map<string, WatchKeyCallback> keyToCallbacks_  |   |   |
|                                                            |         |  |  |   |                                                            |   |   |
|                                                            |         |  |  |   |   TCPStore.listenSocket_                              +----+   |   |
|                                                            |         |  |  |   |                                                       |        |   |
|                                                            |         |  |  |   |  +----------------------------------------------------------+  |   |
|                                                            |         |  |  |   |  | run                                                |     |  |   |
|                                                            |     4   |  |  |   |  |                                                    |     |  |   |
|                                                            +--------------------->+                                                    v     |  |   |
|                                                                      |  |  |   |  |       callbackHandler +-----> keyToCallbacks_(callback)  |  |   |
|                                                                      |  |  |   |  |                                                          |  |   |
|                                                                      |  |  |   |  +----------------------------------------------------------+  |   |
|                                                                      |  |  |   +----------------------------------------------------------------+   |
+----------------------------------------------------------------------+  +  +------------------------------------------------------------------------+


Телефон такой:

Пока мы разобрались с понятиями метода инициализации и Store, в итоге именно понятие Store играет роль в процессе инициализации. Мы также знаем функции, которые должен иметь Store, посредством анализа TCPStore, такие как установка KV, мониторинг смены ключа и т. д. Именно эти функции позволяют нескольким процессам узнать о существовании друг друга.

В следующей статье мы познакомим вас с концепцией группы процессов, так что следите за обновлениями.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси