[Анализ исходного кода] (3) сервера параметров машинного обучения ps-lite ----- Агент Заказчик

машинное обучение глубокое обучение

0x00 сводка

Эта статья является третьей частью сервера параметров, которая знакомит с модулем «Клиент» для ps-lite.

В настоящее время с почтовым отделением (PostOffice) и тележкой модуля связи (Van) следующим шагом является рассмотрение клиента почтового отделения.

Клиент — агент SimpleApp в почтовом отделении. Поскольку воркерам и серверам необходимо сконцентрироваться на алгоритмах, функции отправки и получения сообщений, логически связанные с сетью воркеров и серверов, суммируются/передаются клиентам.

Другие статьи из этой серии:

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite (1) ----- PostOffice

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite(2) ----- Коммуникационный модуль Van

0x01 источник

1.1 Текущий общий

Подведем итоги текущего общего состояния:

  • PostOffice: глобальный класс управления в одиночном режиме, узел имеет PostOffice в своем жизненном цикле и полагается на членов своего класса для управления узлом;

  • Van: Коммуникационный модуль, отвечающий за сетевое взаимодействие с другими узлами и фактическую отправку и получение сообщений. PostOffice имеет члена Van;

  • SimpleApp: Родительский класс KVServer и KVWorker, предоставляет простые функции Request, Wait, Response, Process, KVServer и KVWorker переписывают эти функции в соответствии со своими задачами;

  • Node: Информационный класс, в котором хранится соответствующая информация об этом узле.Каждый узел может быть однозначно идентифицирован по имени хоста + порту.

  • Customer: каждый объект SimpleApp содержит член класса Customer, который должен быть зарегистрирован в PostOffice. Этот класс в основном отвечает за:

    • Как отправитель, отслеживать ответы на сообщения, отправленные SimpleApp;
    • В качестве получателя поддерживать очередь сообщений узла для получения сообщений для узла;

Знание контекста класса позволяет нам лучше понять класс, поэтому сначала нам нужно увидеть, где используется Customer.До сих пор мы проанализировали два класса, и давайте посмотрим, как Customer используется в этих двух классах.

1.2 Postoffice

В PostOffice есть следующие переменные-члены:

// app_id -> (customer_id -> customer pointer) 
std::unordered_map<int, std::unordered_map<int, Customer*>> customers_;

И следующая функция-член, которая должна зарегистрировать клиента в customers_:

void Postoffice::AddCustomer(Customer* customer) {
  std::lock_guard<std::mutex> lk(mu_);
  int app_id = CHECK_NOTNULL(customer)->app_id();
  // check if the customer id has existed
  int customer_id = CHECK_NOTNULL(customer)->customer_id();
  customers_[app_id].insert(std::make_pair(customer_id, customer));
  std::unique_lock<std::mutex> ulk(barrier_mu_);
  barrier_done_[app_id].insert(std::make_pair(customer_id, false));
}
​
Customer* Postoffice::GetCustomer(int app_id, int customer_id, int timeout) const {
  Customer* obj = nullptr;
  for (int i = 0; i < timeout * 1000 + 1; ++i) {
    {
      std::lock_guard<std::mutex> lk(mu_);
      const auto it = customers_.find(app_id);
      if (it != customers_.end()) {
        std::unordered_map<int, Customer*> customers_in_app = it->second;
        obj = customers_in_app[customer_id];
        break;
      }
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
  return obj;
}

Таким образом, мы можем видеть несколько моментов:

  • Экземпляр приложения может соответствовать нескольким клиентам;
  • Клиент должен быть зарегистрирован в почтовом отделении;

1.3 Van

В Ване мы видим, что при обработке сообщений данных мы:

  • Получить customer_id из почтового отделения по app_id в сообщении;
  • Получить клиента из почтового отделения на основе customer_id;
  • Вызовите метод Accept клиента для обработки сообщения;
void Van::ProcessDataMsg(Message* msg) {
  // data msg
  int app_id = msg->meta.app_id;
  int customer_id =
      Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id;
  auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5);
  obj->Accept(*msg);
}

Итак, мы знаем:

  • Экземпляр приложения может соответствовать нескольким клиентам;
  • Клиент должен быть зарегистрирован в почтовом отделении;
  • При получении сообщения он найдет клиента по идентификатору приложения в сообщении, а затем вызовет метод Accept клиента для обработки конкретного сообщения данных;

1.4 Customer

В клиенте мы видим, что роль Accept заключается в том, чтобы вставлять сообщения в очередь клиента.

ThreadsafePQueue recv_queue_;
 
inline void Accept(const Message& recved) {
   recv_queue_.Push(recved);
}

Сам объект Customer также запускает принимающий поток.recv_thread_, используя Customer::Receive(), которая вызывает зарегистрированныйrecv_handle_Функция обрабатывает сообщение.

std::unique_ptr<std::thread> recv_thread_;
​
recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
​
void Customer::Receiving() {
  while (true) {
    Message recv;
    recv_queue_.WaitAndPop(&recv);
    if (!recv.meta.control.empty() &&
        recv.meta.control.cmd == Control::TERMINATE) {
      break;
    }
    recv_handle_(recv);
    if (!recv.meta.request) {
      std::lock_guard<std::mutex> lk(tracker_mu_);
      tracker_[recv.meta.timestamp].second++;
      tracker_cond_.notify_all();
    }
  }
}

1.5 Текущая логика

Следовательно, можно сделать вывод, что текущая логика (логика приема сообщений) такова:

  • Рабочий узел или серверный узел будут выполняться в начале программы.Postoffice::start().

  • Postoffice::start()инициализирует информацию об узле и вызоветVan::start().

  • Van::start()Чтобы запустить собственный поток, используйтеVan::Receiving()непрерывно контролировать полученное сообщение.

  • Van::Receiving()Получив пост-сообщение, выполняйте разные действия по разным командам. Для сообщений данных, если требуется дальнейшая обработка, он вызоветProcessDataMsg:

    • Найти по идентификатору приложения в сообщенииCustomer.
    • передать сообщениеCustomer::Acceptфункция.
  • Customer::Accept()функция для добавления сообщения в очередьrecv_queue_;

  • CustomerСам объект также запускает принимающий потокrecv_thread_, используя Customer::Receive()

    • отrecv_queue_Очередь для получения сообщений.
    • вызов зарегистрированrecv_handle_Функция обрабатывает сообщение.

Логика краткой версии такова: поток данных осуществляется по порядку номеров на рисунке, также видно, что три класса Ван, Почта и Клиент несколько сверхсвязаны друг с другом. Может быть, лучше отсортировать их:

                +--------------------------+
                | Van                      |
                |                          |
DataMessage +----------->  Receiving       |
                |  1           +           |             +---------------------------+
                |              |           |             | Postoffice                |
                |              | 2         |             |                           |
                |              v           | GetCustomer |                           |
                |        ProcessDataMsg <------------------> unordered_map customers_|
                |              +           |      3      |                           |
                |              |           |             +---------------------------+
                +--------------------------+
                               |
                               |
                               | 4
                               |
                +-------------------------+
                | Customer     |          |
                |              |          |
                |              v          |
                |           Accept        |
                |              +          |
                |              |          |
                |              | 5        |
                |              v          |
                |         recv_queue_     |
                |              +          |
                |              | 6        |
                |              |          |
                |              v          |
                |          Receiving      |
                |              +          |
                |              | 7        |
                |              |          |
                |              v          |
                |         recv_handle_    |
                |                         |
                +-------------------------+

Ниже мы подробно разберем конкретную логику.

0x02 базовый класс

Сначала мы вводим некоторые основные классы.

2.1 SArray

SAArray имеет следующие характеристики:

  • SArrays — это интеллектуальные массивы общих данных, обеспечивающие функциональность, подобную std::vector.
  • SArray может быть создан из std::vector.
  • SArray можно копировать и назначать как указатель C. Когда ссылка на SArray равна 0, память SArray автоматически освобождается.
  • Его можно понимать как вектор с нулевой копией, который совместим с векторной структурой данных.

2.2 KVPairs

В ps-lite у каждого сервера есть непрерывный ключ и соответствующее значение этих ключей. Ключ и значение хранятся отдельно, и каждый ключ может соответствовать нескольким значениям, поэтому необходимо записывать длину каждого ключа, поэтому существует KVPairs.

Особенности KVPairs заключаются в следующем:

  • KVPairs инкапсулирует структуру ключ-значение, а также включает параметр длины с тремя массивами ключей, значений и линз.
  • KVPairs содержит классы шаблонов для ключей SArray, значений SArray и объектива SArray. Key на самом деле является псевдонимом для int64, а Val — это переменная шаблона.
  • Линза и клавиши имеют одинаковую длину, что указывает на количество значений, соответствующих каждой клавише.
  • линза может быть пустой, в этом случае значения делятся поровну.

Например:

  • Если ключи=[1,5], объектив=[2,3], то данные, соответствующие ключам[0], будут: значения[0] и значения[1], а данные, соответствующие ключам[1], будут значениями[ 2] , значения[3], значения[5].
  • И если len пусто, values.size() должно быть кратно keys.size() (здесь 2), а key[0] и key[1] соответствуют половине значений.

Определяется следующим образом:

struct KVPairs {
  // /** \brief empty constructor */
  // KVPairs() {}
  /** \brief the list of keys */
  SArray<Key> keys;
  /** \brief the according values */
  SArray<Val> vals;
  /** \brief the according value lengths (could be empty) */
  SArray<int> lens; // key对应value的长度vector
  /** \brief priority */
  int priority = 0;
};

2.3 Node

Узел инкапсулирует информацию об узле, такую ​​как роль, IP-адрес, порт и является ли он узлом восстановления.

struct Node {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief default constructor */
  Node() : id(kEmpty), port(kEmpty), is_recovery(false) {}
  /** \brief node roles */
  enum Role { SERVER, WORKER, SCHEDULER };
​
  /** \brief the role of this node */
  Role role;
  /** \brief node id */
  int id;
  /** \brief customer id */
  int customer_id;
  /** \brief hostname or ip */
  std::string hostname;
  /** \brief the port this node is binding */
  int port;
  /** \brief whether this node is created by failover */
  bool is_recovery;
};

2.4 Control

Управление: инкапсулирует метаинформацию управляющего сообщения, группа_барьеров (используется для определения, какие узлы необходимо синхронизировать, используется, когда команда=БАРЬЕР), узел (класс узла, используется для определения, для каких узлов используется управляющая команда) и т. д. , сигнатура метода.

Как видите, Control содержит описанный выше тип Node.

struct Control {
  /** \brief empty constructor */
  Control() : cmd(EMPTY) { }
  /** \brief return true is empty */
  inline bool empty() const { return cmd == EMPTY; }
​
  /** \brief all commands */
  enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK, HEARTBEAT };
  /** \brief the command */
  Command cmd;
  /** \brief node infos */
  std::vector<Node> node;
  /** \brief the node group for a barrier, such as kWorkerGroup */
  int barrier_group;
  /** message signature */
  uint64_t msg_sig;
};

2.5 Meta

Мета: часть метаданных сообщения, включая метку времени, идентификатор отправителя, идентификатор получателя, управляющую информацию, тип сообщения и т. д.;

struct Meta {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief default constructor */
  Meta() : head(kEmpty), app_id(kEmpty), customer_id(kEmpty),
           timestamp(kEmpty), sender(kEmpty), recver(kEmpty),
           request(false), push(false), pull(false), simple_app(false) {}
  /** \brief an int head */
  int head;
  /** \brief the unique id of the application of messsage is for*/
  int app_id;
  /** \brief customer id*/
  int customer_id;
  /** \brief the timestamp of this message */
  int timestamp;
  /** \brief the node id of the sender of this message */
  int sender;
  /** \brief the node id of the receiver of this message */
  int recver;
  /** \brief whether or not this is a request message*/
  bool request;
  /** \brief whether or not a push message */
  bool push;
  /** \brief whether or not a pull message */
  bool pull;
  /** \brief whether or not it's for SimpleApp */
  bool simple_app;
  /** \brief an string body */
  std::string body;
  /** \brief data type of message.data[i] */
  std::vector<DataType> data_type;
  /** \brief system control message */
  Control control;
  /** \brief the byte size */
  int data_size = 0;
  /** \brief message priority */
  int priority = 0;
};

2.6 Message

2.6.1 Структура

Сообщение — это сообщение для отправки, как показано ниже:

  • Мета заголовка сообщения: это метаданные (с использованием Protobuf для сжатия данных), в том числе:

    • Управляющая информация (Control) указывает значение этого сообщения (например, завершение, подтверждение ACK, синхронизация и т. д.), в том числе:

      • тип команды;

      • Список узлов (вектор), узлы включают:

        • Роль узла
        • ip, port
        • id
        • Это узел восстановления
      • Идентификатор группы указывает, кому выполняется команда управления;

      • сигнатура метода;

    • отправитель;

    • получатель;

    • отметка времени;

    • ...

  • Тело сообщения: это данные, отправленные с использованием пользовательского SArray для обмена данными, чтобы уменьшить копирование данных;

2.6.2 Логические отношения

Логическая связь между несколькими классами следующая:

img

Некоторые функции в Message должны полагаться на Meta для завершения и так далее.

2.6.3 тип сообщения

сообщение включает в себя следующие типы:

  • ADD_NODE: Worker и сервер добавляют узлы в планировщик
  • БАРЬЕР: Синхронные блокирующие сообщения между узлами
  • HEARTBEAT: сигнал сердцебиения между узлами, проверьте живость
  • TERMINATE: сигнал выхода узла
  • ПУСТОЙ: обычные сообщения, такие как push или pull

2.6.4 Определения

Конкретные определения следующие:

struct Message {
  /** \brief the meta info of this message */
  Meta meta;
  /** \brief the large chunk of data of this message */
  std::vector<SArray<char> > data;
  /**
   * \brief push array into data, and add the data type
   */
  template <typename V>
  void AddData(const SArray<V>& val) {
    CHECK_EQ(data.size(), meta.data_type.size());
    meta.data_type.push_back(GetDataType<V>());
    SArray<char> bytes(val);
    meta.data_size += bytes.size();
    data.push_back(bytes);
  }
};
​

Каждый раз, когда сообщение отправляется, сообщение инкапсулируется в этом формате, и член класса (класс Customer), ответственный за отправку сообщения, доставляет сообщение к двери в соответствии с информацией в мета.

0x03 Customer

3.1 Обзор

Фактически у клиента есть две функции:

  • Как отправитель, он используется для отслеживания ответа, соответствующего каждому запросу, отправленному SimpleApp;
  • В качестве получателя, поскольку у него есть собственный принимающий поток и очередь получающих сообщений, Заказчик фактически существует как принимающий механизм обработки сообщений (или часть механизма);

Особенности заключаются в следующем:

  • Каждый объект SimpleApp содержит член класса Customer, а Customer должен быть зарегистрирован в PostOffice.
  • Поскольку клиент также обрабатывает сообщение, но не берет на себя управление сетью, фактический ответ и сообщение должны быть переданы внешним вызывающим абонентом, поэтому функции и обязанности немного разделены.
  • Каждое соединение соответствует экземпляру Customer, и каждый Customer привязан к идентификатору узла, представляющему текущий узел для отправки на соответствующий узел идентификатора узла. Идентификатор подключенного партнера совпадает с идентификатором экземпляра Customer.
  • При создании нового запроса будет возвращена временная метка. Эта временная метка будет использоваться в качестве идентификатора этого запроса. Каждый запрос будет увеличиваться на 1, и соответствующий res также будет увеличиваться на 1. При вызове ожидания это гарантирует что последующие вызовы, такие как ожидание, используют этот идентификатор в качестве идентификации.

3.2 Определения

3.2.1 Переменные-члены

Давайте сначала посмотрим на переменные-члены Customer.

Следует отметить, что для понимания функции переменной здесь мы можем посмотреть на поток сообщений, то есть, если есть одно принимающее сообщение, поток данных этого процесса выглядит следующим образом, поэтому мы также разбираем переменные-члены Заказчика в таком порядке:

Van::ProcessDataMsg ---> Customer::Accept ---> Customer::recv_queue_ ---> Customer::recv_thread_ ---> Customer::recv_handle_ 

Основные переменные-члены следующие:

  • ThreadsafePQueue recv_queue_ : потокобезопасная очередь сообщений;

  • std::unique_ptr recv_thread_ : продолжить чтение сообщений из recv_queue и вызвать recv_handle_;

  • RecvHandle recv_handle_ : Функция обработки сообщений работника или сервера.

    • Привязать функцию обработки (SimpleApp::Process) после того, как Заказчик получит запрос;
    • Клиент запустит новый поток, чтобы использовать recv_handle_ для обработки принятого запроса в течение жизненного цикла клиента.Здесь используется потокобезопасная очередь, а Accept() используется для постоянной отправки сообщений в очередь.
    • Полученное сообщение поступает из принимающего потока Van, то есть после того, как объект Van каждого узла получит сообщение, оно будет передано различным объектам клиента в соответствии с разными сообщениями.
    • Для Worker, такого как KVWorker, recv_handle_ сохраняет данные в извлеченном сообщении,
    • Для сервера вам необходимо использовать set_request_handle для установки соответствующей функции обработки, такой как KVServerDefaultHandle,
  • std::vector<:pair int>> tracker_ : переменная синхронизации для запроса и ответа.

    • tracker_ — это карта, используемая в Customer для записи статуса запроса и ответа. Записывает, сколько узлов может быть отправлено каждым запросом (с использованием идентификатора запроса), и количество ответов, возвращенных от скольких узлов,
    • Нижний индекс tracker_ — это метка времени каждого запроса, то есть номер запроса.
    • tracker_[i] , сначала указывает, скольким узлам был отправлен запрос, то есть количество ответов, которые должен получить этот узел.
    • tracker_[i] .second указывает количество ответов, фактически полученных на данный момент.

3.2.2 Специальные определения

Конкретные определения следующие:

class Customer {
 public:
  /**
   * \brief the handle for a received message
   * \param recved the received message
   */
  using RecvHandle = std::function<void(const Message& recved)>;
​
  /**
   * \brief constructor
   * \param app_id the globally unique id indicating the application the postoffice
   *               serving for
   * \param customer_id the locally unique id indicating the customer of a postoffice
   * \param recv_handle the functino for processing a received message
   */
  Customer(int app_id, int customer_id, const RecvHandle& recv_handle);
​
  /**
   * \brief desconstructor
   */
  ~Customer();
​
  /**
   * \brief return the globally unique application id
   */
  inline int app_id() { return app_id_; }
​
  /**
   * \brief return the locally unique customer id
   */
  inline int customer_id() { return customer_id_; }
​
  /**
   * \brief get a timestamp for a new request. threadsafe
   * \param recver the receive node id of this request
   * \return the timestamp of this request
   */
  int NewRequest(int recver);
​
  /**
   * \brief wait until the request is finished. threadsafe
   * \param timestamp the timestamp of the request
   */
  void WaitRequest(int timestamp);
​
  /**
   * \brief return the number of responses received for the request. threadsafe
   * \param timestamp the timestamp of the request
   */
  int NumResponse(int timestamp);
​
  /**
   * \brief add a number of responses to timestamp
   */
  void AddResponse(int timestamp, int num = 1);
​
  /**
   * \brief accept a received message from \ref Van. threadsafe
   * \param recved the received the message
   */
  inline void Accept(const Message& recved) {
    recv_queue_.Push(recved);
  }
​
 private:
  /**
   * \brief the thread function
   */
  void Receiving();
​
  int app_id_;
  int customer_id_;
  RecvHandle recv_handle_;
  ThreadsafePQueue recv_queue_;
  std::unique_ptr<std::thread> recv_thread_;
  std::mutex tracker_mu_;
  std::condition_variable tracker_cond_;
  std::vector<std::pair<int, int>> tracker_;
  DISALLOW_COPY_AND_ASSIGN(Customer);
};
​

3.3 Принять нить

В функции сборки создается принимающий поток.

recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
​

Функция обработки потока выглядит следующим образом, конкретная логика такова:

  • Подождите в очереди сообщений и выньте, если есть сообщение;
  • Используйте recv_handle_ для обработки сообщений;
  • Если meta.request имеет значение false, что указывает на то, что это ответ, увеличьте соответствующий счетчик в трекере.
void Customer::Receiving() {
  while (true) {
    Message recv;
    recv_queue_.WaitAndPop(&recv);
    if (!recv.meta.control.empty() &&
        recv.meta.control.cmd == Control::TERMINATE) {
      break;
    }
    recv_handle_(recv);
    if (!recv.meta.request) {
      std::lock_guard<std::mutex> lk(tracker_mu_);
      tracker_[recv.meta.timestamp].second++;
      tracker_cond_.notify_all();
    }
  }
}
​

Поскольку recv_handle_ используется для конкретной бизнес-логики, давайте посмотрим, как настроен recv_handle_, и именно так клиент создает и использует его.

3.4 Как построить

Нам нужно заранее использовать некоторые классы, которые будут проанализированы ниже, потому что они являются потребителями Customer и слишком сильно связаны.

3.4.1 In SimpleApp

Сначала давайте посмотрим на SimpleApp, который является базовым классом для конкретных узлов логических функций.

Каждый объект SimpleApp содержит член класса Customer, а Customer должен быть зарегистрирован в PostOffice.

Здесь нужно создать новый пользовательский объект для инициализации члена obj_.

inline SimpleApp::SimpleApp(int app_id, int customer_id) : SimpleApp() {
  using namespace std::placeholders;
  obj_ = new Customer(app_id, customer_id, std::bind(&SimpleApp::Process, this, _1));
}
​

Давайте еще раз посмотрим на два подкласса SimpleApp.

3.4.2 KVServer(app_id)

Класс KVServer в основном используется для сохранения данных пар «ключ-значение» и выполнения некоторых бизнес-операций, таких как обновление градиента. Основные методы: Process() и Response().

В своем конструкторе он будет:

  • Создайте новый объект Customer для инициализации члена obj_;
  • Передача KVServer::Process в конструктор Customer фактически дает метод Process дляCustomer:: recv_handle_;
  • Для сервера app_id = custom_id = идентификатор сервера;

Конструктор выглядит следующим образом:

  /**
   * \brief constructor
   * \param app_id the app id, should match with \ref KVWorker's id
   */
  explicit KVServer(int app_id) : SimpleApp() {
    using namespace std::placeholders;
    obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
  }
​

3.4.3 KVWorker(app_id, custom_id)

Класс KVWorker в основном используется для отправки/получения собственных данных «ключ-значение» на сервер. Включая следующие методы: Push(), Pull(), Wait().

В своем конструкторе он будет:

  • Свяжите член slicer_ с KVWorker::DefaultSlicer по умолчанию;
  • Создайте новый объект Customer для инициализации члена obj_, используйте KVWorker::Process для передачи в конструкторе Customer, фактически назначьте метод Process для Customer:: recv_handle_;
  /**
   * \brief constructor
   *
   * \param app_id the app id, should match with \ref KVServer's id
   * \param customer_id the customer id which is unique locally
   */
  explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
    using namespace std::placeholders;
    slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
    obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
  }
​

3.4.4 Customer

Логика функции сборки следующая:

  • Инициализировать с параметрами, переданными конструктору соответственноapp_id_, custom_id_ , recv_handleчлен

  • Вызовите PostOffice::AddCustomer, чтобы зарегистрировать текущего клиента в PostOffice;

    • Customer_member PostOffice: добавьте custom_id в соответствующий элемент app_id;
    • Член Barrier_done_ PostOffice устанавливает статус синхронизации этого custom_id в false.
  • Начать новый поток получения recv_thread_;

Конкретная строительная функция заключается в следующем:

Customer::Customer(int app_id, int customer_id, const Customer::RecvHandle& recv_handle)
    : app_id_(app_id), customer_id_(customer_id), recv_handle_(recv_handle) {
  Postoffice::Get()->AddCustomer(this);
  recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
}
​

3.4.5 Кардочесание

3.4.5.1 Пример кода

У вас могут возникнуть вопросы о app_id и customer_id, например:

В функции сборки KVWorker есть:

  • app_id the app id, should match with KVServer's id
  • customer_id the customer id which is unique locally

В функции сборки KVServer есть:

  • app_id the app id, should match with KVWorker's id

Мы используем тесты/test_kv_app_multi_workers.cc, который поставляется с исходным кодом, чтобы разобраться в логической связи между app_id и customer_id.

Далее спойлер: работники используют customer_id для идентификации. Идентификатор клиента используется в коде рабочего процесса для определения диапазона ключа, соответствующего этому рабочему процессу.

Как видно из скрипта, для проверки используйте следующее:

find test_* -type f -executable -exec ./repeat.sh 4 ./local.sh 2 2 ./{} ;

Файл запускает сервер и двух рабочих.

  • И app_id, и customer_id сервера равны 0;

  • app_id работника равен 0, а customer_id — 0 и 1 соответственно;

  • Используйте std::thread для выполнения рабочих узлов, то есть запускайте два рабочих узла в одном процессе. Это объяснило бы «идентификатор клиента, который уникален локально» в комментарии в функции сборки KVWorker.

  • Таким образом, переменные-члены Postoffice std::unordered_map> customers_ имеют следующий вид:

    • [0, [ 0, Customer_0] ], первый 0 — идентификатор приложения, второй 0 — идентификатор клиента
    • [0, [ 1, Customer_1] ], первый 0 — идентификатор приложения, второй 1 — идентификатор клиента

Таким образом, мы можем выяснить:

  • идентификатор приложения используется для идентификации приложения.
  • идентификатор клиента используется для идентификации локального работника или сервера в этом приложении (идентификатор приложения).
  • Поэтому в функции построения KVServer сказано, что идентификатор приложения должен соответствовать идентификатору KVWorker, фактически это означает, что идентификатор приложения должен быть непротиворечивым для всех.
  • Идентификатор клиента используется в следующем рабочем коде для определения диапазона ключа, соответствующего этому рабочему процессу.

Конкретный код выглядит следующим образом:

#include <cmath>
#include "ps/ps.h"
using namespace ps;
​
void StartServer() { // 启动服务
  if (!IsServer()) return;
  auto server = new KVServer<float>(0);
  server->set_request_handle(KVServerDefaultHandle<float>());
  RegisterExitCallback([server](){ delete server; });
}
​
void RunWorker(int customer_id) { // 启动worker
  Start(customer_id);
  if (!IsWorker()) {
    return;
  }
  KVWorker<float> kv(0, customer_id);
  // init
  int num = 10000;
  std::vector<Key> keys(num);
  std::vector<float> vals(num);
​
  int rank = MyRank();
  srand(rank + 7);
  for (int i = 0; i < num; ++i) {
    keys[i] = kMaxKey / num * i + customer_id;
    vals[i] = (rand() % 1000);
  }
  // push
  int repeat = 50;
  std::vector<int> ts;
  for (int i = 0; i < repeat; ++i) {
    ts.push_back(kv.Push(keys, vals));
​
    // to avoid too frequency push, which leads huge memory usage
    if (i > 10) kv.Wait(ts[ts.size()-10]);
  }
  for (int t : ts) kv.Wait(t);
​
  // pull
  std::vector<float> rets;
  kv.Wait(kv.Pull(keys, &rets));
​
  // pushpull
  std::vector<float> outs;
  for (int i = 0; i < repeat; ++i) {
    kv.Wait(kv.PushPull(keys, vals, &outs));
  }
​
  float res = 0;
  float res2 = 0;
  for (int i = 0; i < num; ++i) {
    res += fabs(rets[i] - vals[i] * repeat);
    res += fabs(outs[i] - vals[i] * 2 * repeat);
  }
  CHECK_LT(res / repeat, 1e-5);
  CHECK_LT(res2 / (2 * repeat), 1e-5);
  LL << "error: " << res / repeat << ", " << res2 / (2 * repeat);
  // stop system
  Finalize(customer_id, true);
}
​
int main(int argc, char *argv[]) {
  // start system
  bool isWorker = (strcmp(argv[1], "worker") == 0);
  if (!isWorker) {
    Start(0);
    // setup server nodes,启动server节点
    StartServer();
    Finalize(0, true);
    return 0;
  }
  // run worker nodes,启动两个worker节点
  std::thread t0(RunWorker, 0);
  std::thread t1(RunWorker, 1);
​
  t0.join();
  t1.join();
  return 0;
}
​
3.4.5.2 Определение личности

Давайте снова вспомним инициализацию Postoffice, и мы увидим, что при запуске воркеры используют customer_id для определения своей личности. Поэтому идентификатор клиента используется в коде рабочего процесса для определения диапазона ключа, соответствующего этому рабочему процессу.

void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) {
​
    // init node info.
    // 对于所有的worker,进行node设置
    for (int i = 0; i < num_workers_; ++i) {
      int id = WorkerRankToID(i);
      for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup,
                    kWorkerGroup + kScheduler,
                    kWorkerGroup + kServerGroup + kScheduler}) {
        node_ids_[g].push_back(id);
      }
    }
    // 对于所有的server,进行node设置
    for (int i = 0; i < num_servers_; ++i) {
      int id = ServerRankToID(i);
      for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup,
                    kServerGroup + kScheduler,
                    kWorkerGroup + kServerGroup + kScheduler}) {
        node_ids_[g].push_back(id);
      }
    }
    // 设置scheduler的node
    for (int g : {kScheduler, kScheduler + kServerGroup + kWorkerGroup,
                  kScheduler + kWorkerGroup, kScheduler + kServerGroup}) {
      node_ids_[g].push_back(kScheduler);
    }
    init_stage_++;
  }
​
  // start van
  van_->Start(customer_id); // 这里有 customer_id
​
  ......
    
  // do a barrier here,这里有 customer_id
  if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);
}

Глядя на инициализацию Van, он также использует customer_id для определения своей собственной личности.

void Van::Start(int customer_id) {
  if (init_stage == 0) {
    // get my node info
    if (is_scheduler_) {
      my_node_ = scheduler_;
    } else {
      my_node_.hostname = ip;
      my_node_.role = role;
      my_node_.port = port;
      my_node_.id = Node::kEmpty;
      my_node_.customer_id = customer_id; // 这里有 customer_id
    }
  }
  
  if (!is_scheduler_) {
    // let the scheduler know myself
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id; // 这里有 customer_id
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }
  
  ......
}
​

Таким образом, это также может объяснить, почему app_id и customer_id используются, когда KVWorker отправляет сообщения.

template <typename Val>
void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) {
  .....
  for (size_t i = 0; i < sliced.size(); ++i) {
    Message msg;
    msg.meta.app_id = obj_->app_id(); // 注意这里
    msg.meta.customer_id = obj_->customer_id();// 注意这里
    msg.meta.request     = true;
    ......
    Postoffice::Get()->van()->Send(msg);
  }
}

В KVServer также необходимо использовать app_id и customer_id при ответе на сообщения.

template <typename Val>
void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) {
  Message msg;
  msg.meta.app_id = obj_->app_id();// 注意这里
  msg.meta.customer_id = req.customer_id;// 注意这里
  msg.meta.request     = false;
  msg.meta.push        = req.push;
  msg.meta.pull        = req.pull;
  msg.meta.head        = req.cmd;
  msg.meta.timestamp   = req.timestamp;
  msg.meta.recver      = req.sender;
  ......
  Postoffice::Get()->van()->Send(msg);
}
3.4.5.3 Проблемы

Итак, вопрос в том, почему app_id равен customer_id на стороне сервера?

Поскольку исходного кода для ps в настоящее время нет, предположение таково:

В коде ps также есть несколько клиентов на стороне сервера, но в целях упрощения эта часть функции была удалена в ps-lite, поэтому в ps-lite app_id равен customer_id.

3.5 Текущая логика

Итак, мы снова разбираем процесс (логика получения сообщения) следующим образом:

  • Рабочий узел или серверный узел будут выполняться в начале программы.Postoffice::start().

  • Postoffice::start()инициализирует информацию об узле и вызоветVan::start().

  • Van::start()Чтобы запустить собственный поток, используйтеVan::Receiving()непрерывно контролировать полученное сообщение.

  • Van::Receiving()Получив пост-сообщение, выполняйте разные действия по разным командам. Для сообщений данных, если требуется дальнейшая обработка, будет вызываться ProcessDataMsg:

    • В соответствии с идентификатором приложения в сообщении для поиска клиента сообщение будет отправлено в поток получения разных клиентов в соответствии с идентификатором клиента.
    • передать сообщениеCustomer::Acceptфункция.
  • Функция Customer::Accept() добавляет сообщение в очередьrecv_queue_;

  • Сам объект Customer также запускает принимающий поток.recv_thread_, используя Customer::Receive()

    • отrecv_queue_Очередь для получения сообщений.
    • Если (!recv.meta.request) означает ответ, тоtracker_[req.timestamp].second++
    • вызов зарегистрированrecv_handle_Функция обрабатывает сообщение.
  • Для рабочих зарегистрированrecv_handle_даKVWorker::Process()функция. Поскольку сообщения, полученные рабочим потоком recv, в основном представляют собой пары KV, извлеченные с сервера, этоProcess()В основном для получения пары KV в сообщении;

  • Для Сервера он зарегистрированrecv_handle_даKVServer::Process()функция. Поскольку сервер принимает пару KV, проталкиваемую рабочими процессами, ее необходимо обработать, поэтомуProcess()Пользователь в функции, вызываемой черезKVServer::set_request_handle()Переданный объект функции.

Текущая логика показана ниже: На шаге 8 recv_handle_ указывает на KVServer::Process или KVWorker::Process.

                +--------------------------+
                | Van                      |
                |                          |
DataMessage +----------->  Receiving       |
                |  1           +           |             +---------------------------+
                |              |           |             | Postoffice                |
                |              | 2         |             |                           |
                |              v           | GetCustomer |                           |
                |        ProcessDataMsg <------------------> unordered_map customers_|
                |              +           |      3      |                           |
                |              |           |             +---------------------------+
                +--------------------------+
                               |
                               |
                               | 4
                               |
                +-------------------------+
                | Customer     |          |
                |              |          |
                |              v          |
                |           Accept        |
                |              +          |
                |              |          |
                |              | 5        |
                |              v          |
                |         recv_queue_     |                +-----------------+
                |              +          |                |KVWorker         |
                |              | 6        |     +--------> |                 |
                |              |          |     |    8     |         Process |
                |              v          |     |          +-----------------+
                |          Receiving      |     |
                |              +          |     |
                |              | 7        |     |
                |              |          |     |          +-----------------+
                |              v          |     |          |KVServer         |
                |         recv_handle_+---------+--------> |                 |
                |                         |          8     |         Process |
                +-------------------------+                +-----------------+
​
​

0x04 Функциональная функция

Следующие клиентские функции вызываются другими модулями.

4.1 Customer::NewRequest

4.1.1 Реализация

Назначение этой функции: при отправке запроса добавляется счетчик этого запроса. Итак, когда нам нужно посчитать Запрос, используйте эту функцию.

Особенности следующие:

  • Перед отправкой каждого сообщения измените количество ответов, которое должно быть получено этим сообщением.
  • recver представляет node_id получателя, потому что целое число в ps-lite может соответствовать нескольким node_id, поэтому используйте декодирование Postoffice, чтобы получить количество всех реальных node_id.
  • Например, если вы отправляете сообщение в kServerGroup, в kServerGroup 3 сервера, то num равно 3, то есть вы должны получить 3 ответа. Элемент, соответствующий tracker_, равен [3,0], что означает, что должно быть получено 3 элемента, а в настоящее время получено 0 элементов.
  • Возвращаемое значение функции можно рассматривать как метку времени, и эта метка времени будет использоваться как идентификатор этого запроса.При вызове ожидания будет гарантировать, что последующие ожидания будут идентифицированы этим идентификатором.
int Customer::NewRequest(int recver) {
  std::lock_guard<std::mutex> lk(tracker_mu_);
  int num = Postoffice::Get()->GetNodeIDs(recver).size();  // recver 可能会代表一个group。
  tracker_.push_back(std::make_pair(num, 0));
  return tracker_.size() - 1;  // 代表此次请求的时间戳timestamp,后续customer使用这个值代表这个request
}

4.1.2 Вызов

Конкретный пример вызова — когда рабочий отправляет на сервер.

  int ZPush(const SArray<Key>& keys,
            const SArray<Val>& vals,
            const SArray<int>& lens = {},
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = obj_->NewRequest(kServerGroup); // 这里会调用
    AddCallback(ts, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.lens = lens;
    kvs.priority = priority;
    Send(ts, true, false, cmd, kvs);
    return ts;
  }
​

4.2 Customer::AddResponse

4.2.1 Реализация

Функция состоит в том, чтобы подсчитать ответ, который был возвращен на запрос.

Особенности следующие:

  • Когда внешний вызывающий объект получает ответ, вызовите AddResponse, чтобы сообщить об этом объекту Customer.
  • Активно увеличивать количество фактически полученных ответов на запрос.В основном используется при отправке запроса клиентом.Иногда связь с некоторыми серверами может быть пропущена (ключи этой связи на этих серверах не распределяются), и клиент может Прямо считается, что Ответ получен.
  • Кроме того, вCustomer::Receiving, при обработке запроса, не являющегося запросом, количество ответов на соответствующий запрос также будет увеличено.tracker_[recv.meta.timestamp].second++;
  • В этом классе есть недостаток: нет операции удаления информации запроса, которая не будет использоваться снова после истечения срока действия. А жизненный цикл одного объекта этого класса почти равен жизненному циклу процесса. Поэтому, если программа ps-lite будет работать долго, она будет в основном OOM.
void Customer::AddResponse(int timestamp, int num) {
  std::lock_guard<std::mutex> lk(tracker_mu_);
  tracker_[timestamp].second += num;
}
​

4.2.2 Вызов

Будет вызван метод Send у KVWorker, т.к. в некоторых случаях (ключи этого общения не раздаются на этих серверах) клиент может сразу подумать, что Response получен, поэтому его нужно пропустить.

template <typename Val>
void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) {
  // slice the message
  SlicedKVs sliced;
  slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced);
​
  // need to add response first, since it will not always trigger the callback
  int skipped = 0;
  for (size_t i = 0; i < sliced.size(); ++i) {
    if (!sliced[i].first) ++skipped;
  }
  
  obj_->AddResponse(timestamp, skipped); // 这里调用
  
  if ((size_t)skipped == sliced.size()) {
    RunCallback(timestamp);
  }
​
  for (size_t i = 0; i < sliced.size(); ++i) {
    const auto& s = sliced[i];
    if (!s.first) continue;
    Message msg;
    msg.meta.app_id = obj_->app_id();
    msg.meta.customer_id = obj_->customer_id();
    msg.meta.request     = true;
    msg.meta.push        = push;
    msg.meta.pull        = pull;
    msg.meta.head        = cmd;
    msg.meta.timestamp   = timestamp;
    msg.meta.recver      = Postoffice::Get()->ServerRankToID(i);
    msg.meta.priority    = kvs.priority;
    const auto& kvs = s.second;
    if (kvs.keys.size()) {
      msg.AddData(kvs.keys);
      msg.AddData(kvs.vals);
      if (kvs.lens.size()) {
        msg.AddData(kvs.lens);
      }
    }
    Postoffice::Get()->van()->Send(msg);
  }
}

4.3 Customer::WaitRequest

4.3.1 Реализация

Функция такова: когда нам нужно дождаться получения всех ответов, соответствующих отправленному запросу, использование этой функции заблокирует и будет ждать, пока количество ответов, которые должны быть получены, не сравняется с количеством фактически полученных ответов.

Процесс операции ожидания заключается в том, что tracker_cond_ блокируется и ждет, пока отправленное число не сравняется с возвращенным числом.

void Customer::WaitRequest(int timestamp) {
  std::unique_lock<std::mutex> lk(tracker_mu_);
  tracker_cond_.wait(lk, [this, timestamp]{
      return tracker_[timestamp].first == tracker_[timestamp].second;
    });
}

4.3.2 Вызов

Функция ожидания должна использовать функцию WaitRequest, чтобы убедиться, что операция завершена.

  /**
   * \brief Waits until a push or pull has been finished
   *
   * Sample usage:
   * \code
   *   int ts = w.Pull(keys, &vals);
   *   Wait(ts);
   *   // now vals is ready for use
   * \endcode
   *
   * \param timestamp the timestamp returned by the push or pull
   */
  void Wait(int timestamp) { obj_->WaitRequest(timestamp); }
​

Однако пользователь сам решает, как его вызывать, например:

  for (int i = 0; i < repeat; ++i) {
    kv.Wait(kv.Push(keys, vals));
  }

Так что это подходит к вопросу о стратегии синхронизации.

0x05 Политика синхронизации

Когда разные воркеры работают параллельно в одно и то же время, прогресс разных воркеров может отличаться из-за внешних причин, таких как конфигурация сети и машины.Как управлять механизмом синхронизации воркеров — относительно важная тема.

5.1 Протокол синхронизации

Вообще говоря, существует три уровня протоколов асинхронного управления: BSP (Bulk Synchronous Parallel), SSP (Stalness Synchronous Parallel) и ASP (Asynchronous Parallel), и ограничения их синхронизации, в свою очередь, ослабляются. В погоне за более высокой скоростью вычислений алгоритм может выбрать более свободный протокол синхронизации.

Чтобы решить проблему производительности, индустрия начала исследовать здесь модель согласованности.Первой версией была модель ASP.После ASP был предложен другой относительно экстремальный протокол синхронизации, BSP.Позднее некоторые люди предлагали найти компромисс между ASP и BSP., является SSP.

Эти три соглашения заключаются в следующем:

  • ASP: Нет необходимости ждать друг друга между задачами, а порядок воркеров полностью игнорируется.Каждый воркер ходит в своем темпе и обновляется после итерации.Задача, выполненная первой, переходит к следующему раунду обучения.

    • Плюсы: устраняет время ожидания для медленных задач, сокращает время простоя графического процессора и, следовательно, повышает эффективность оборудования по сравнению с BSP. Скорость вычислений высокая, что позволяет максимально использовать вычислительную мощность кластера, а машины, на которых находятся все рабочие, не должны ждать.

    • недостаток:

      • Этот процесс может привести к вычислению градиентов с устаревшими весами, что снижает статистическую эффективность.
      • Плохая применимость, сходимость не гарантируется в некоторых случаях

      img

  • BSP: это протокол синхронизации, используемый в общих распределенных вычислениях.В каждом раунде итерации ему необходимо дождаться завершения всех вычислений задачи. Каждый рабочий процесс должен выполняться в одной итерации. Только когда все рабочие процессы итеративной задачи будут завершены, будет выполнена синхронизация и обновление сегментов между рабочим процессом и сервером.

    • Режим BSP и одномашинный последовательный режим абсолютно одинаковы с точки зрения сходимости моделей, поскольку разница заключается только в размере пакета. В то же время, поскольку каждый рабочий процесс может выполнять параллельные вычисления за один цикл, он обладает определенной способностью к параллельной работе. Это то, что использует искра.

    • Преимущества: Широкий спектр приложений, высокое качество сходимости на каждой итерации.

    • Недостатки: В каждой итерации BSP требует, чтобы каждый воркер ждал или приостанавливал градиент от других воркеров, так что ему нужно ждать самую медленную задачу, что значительно снижает эффективность оборудования и приводит к длительному времени вычислений для общей задачи. Производительность всей рабочей группы определяется самым медленным рабочим среди них, такого рабочего обычно называют отставшим.

      bsp

  • SSP: Допускает определенную степень несогласованности хода выполнения задачи, но у этой несогласованности есть верхний предел, называемый значением устаревания, то есть самая быстрая задача опережает самую медленную задачу устаревающей не более чем на одну итерацию.

    • Это компромисс между ASP и BSP. Поскольку ASP позволяет интервалу итераций между разными рабочими процессами быть сколь угодно большим, а BSP допускает только 0, я беру константу s. С помощью SSP BSP можно получить, указав s=0. ASP также может быть достигнута формулировкой s=∞.

    • Преимущества: Время ожидания между задачами в определенной степени сокращается, а скорость вычислений выше.

    • Недостатки: Качество сходимости каждого раунда итераций не такое хорошее, как у BSP.Для достижения такого же эффекта сходимости может потребоваться больше раундов итераций, а применимость не такая хорошая, как у BSP.Некоторые алгоритмы не применимый.

      ssp

5.2 Диссертация

Мушен упомянул в документе, что сервер параметров предоставляет пользователям различные зависимости задач:

img

  • Sequential: на самом деле это синхронная задача, между задачами существует порядок, и следующая задача может быть запущена только после завершения предыдущей задачи;

  • Eventual: В отличие от последовательного, между всеми задачами нет порядка, и каждая выполняет свою задачу самостоятельно.

  • Bounded Delay: это компромисс между последовательным и возможным, вы можете установитьт\tauкак максимальное время задержки. То есть только>т>\tauВсе предыдущие задачи были завершены до того, как можно будет начать новую задачу; крайние случаи:

    • т\tau= 0, ситуация Последовательная;
    • т\tau= ∞, ситуация Eventual;

5.3 ps-lite

В ps-lite есть несколько мест, требующих ожидания синхронизации:

  • Worker pull — это асинхронная операция. Если вам нужно дождаться завершения pull, вы можете вызвать Wait, чтобы убедиться, что запрос и ответ в клиенте совпадают, то есть убедиться, что pull завершен до того, как будут выполнены другие операции. ;
  • В воркере может быть несколько клиентов.Когда первый отправляет барьер, планировщик получает запрос запроса, а затем оценивает запрос в соответствии с msg, а затем отправляет его всем узлам в группе барьеров.После того, как узел получает it, Postoffice::Get ()->Manage(*msg) Установите для логического значения, соответствующего customer_id в барьере_done_, значение true, чтобы завершить операцию синхронизации.
  • При построении узлового соединения также может быть выполнен барьер;

Более сложные, такие как Asp, bsp, ssp, можно сделать, добавив соответствующую команду.

0x06 Распределенная оптимизация

6.1 Определение проблемы

Предположим, мы хотим решить следующую задачу

в(yi, xi)— выборочная пара, а w — вес модели.

Мы рассматриваем мини-пакетный стохастический градиентный спуск (SGD) с размером пакета b для решения вышеуказанной проблемы. На шаге t алгоритм сначала случайным образом выбирает b отсчетов, а затем обновляет вес w по следующей формуле

Мы используем два примера, чтобы показать, как реализовать алгоритм распределенной оптимизации в ps-lite.

6.2 Asynchronous SGD

В первом примере мы расширяем SGD до асинхронного SGD. Сервер будет поддерживать вес модели w, где сервер k получит k-й этап веса w, определяемый формулойwkВыражать. Как только сервер получает градиенты от рабочего процесса, сервер k обновляет веса, которые он поддерживает.

t = 0;
while (Received(&grad)) {
  w_k -= eta(t) * grad;
  t++;
}

Для работника каждый шаг делает четыре вещи

Read(&X, &Y);  // 读取一个 minibatch 数据
Pull(&w);      // 从服务器拉去最新的权重
ComputeGrad(X, Y, w, &grad);  // 计算梯度
Push(grad);    // 把权重推送给服务器

ps-lite предоставит функции push и pull, а worker будет связываться с сервером с правильными частичными данными.

Обратите внимание: асинхронный SGD отличается от автономной версии алгоритмическим режимом. Поскольку связь между рабочими процессами отсутствует, возможно, пока один рабочий процесс вычисляет градиенты, другие рабочие процессы обновляют веса на сервере. То есть каждый работник может использовать вес задержки.

6.3 Synchronized SGD

В отличие от асинхронной версии синхронная версия семантически идентична автономному алгоритму. То есть каждая итерация требует, чтобы все рабочие вычисляли градиент и синхронизировали его с сервером.

Мы используем планировщик для управления синхронизацией данных.

for (t = 0, t < num_iteration; ++t) {
  for (i = 0; i < num_worker; ++i) {
     IssueComputeGrad(i, t);
  }
  for (i = 0; i < num_server; ++i) {
     IssueUpdateWeight(i, t);
  }
  WaitAllFinished();
}

IssueComputeGradиIssueUpdateWeightотправит команды воркерам и серверам, затем вызовет планировщикWaitAllFinishedДождитесь окончания всех отправленных команд.

Чтобы рабочий получил команду, он сделает следующее:

ExecComputeGrad(i, t) {
   Read(&X, &Y);  // 读取数据 minibatch = batch / num_workers 个样本
   Pull(&w);      // 从服务器拉取最新权重
   ComputeGrad(X, Y, w, &grad);  // 计算梯度
   Push(grad);    // 把权重推送给服务器
}

Этот алгоритм почти такой же, как и ASGD, за исключением того, что на каждом шаге толькоb/num_workersпробы были обработаны.

На серверном узле по сравнению с АСГД на один шаг агрегации больше. Это итерация со скоростью обучения после накопления градиентов всех рабочих.

ExecUpdateWeight(i, t) {
   for (j = 0; j < num_workers; ++j) {
      Receive(&grad);
      aggregated_grad += grad;
   }
   w_i -= eta(t) * aggregated_grad;
}
​

0x07 Сводка

  • PostOffice: глобальный класс управления в одноэлементном режиме, каждый узел (каждый узел может быть однозначно идентифицирован по имени хоста + порту) имеет PostOffice в течение своего существования.Это может быть известно из буквального значения, что PostOffice является почтовым отделением;

  • Van: Коммуникационный модуль, отвечающий за сетевое взаимодействие с другими узлами и фактическую отправку и получение сообщений. PostOffice имеет члена Van, и из буквального значения можно узнать, что Van — это небольшая тележка, которая используется для обеспечения функции отправки писем;

  • SimpleApp: Родительский класс KVServer и KVWorker, предоставляет простые функции Request, Wait, Response, Process, KVServer и KVWorker переписывают эти функции в соответствии со своими задачами;

  • Customer: каждый объект SimpleApp содержит член класса Customer, который должен быть зарегистрирован в PostOffice. Этот класс в основном отвечает за:

    • Как отправитель, отслеживать ответы на сообщения, отправленные SimpleApp;
    • В качестве получателя поддерживать очередь сообщений узла для получения сообщений для этого узла;

    Клиента можно узнать по имени, это клиент почтового отделения и агент SimpleApp в почтовом отделении. Поскольку воркерам и серверам необходимо сконцентрироваться на алгоритмах, функции отправки и получения сообщений, логически связанные с сетью воркеров и серверов, суммируются/передаются клиентам.

Логическая схема приведена ниже.

                +--------------------------+
                | Van                      |
                |                          |
DataMessage +----------->  Receiving       |
                |  1           +           |             +---------------------------+
                |              |           |             | Postoffice                |
                |              | 2         |             |                           |
                |              v           | GetCustomer |                           |
                |        ProcessDataMsg <------------------> unordered_map customers_|
                |              +           |      3      |                           |
                |              |           |             +---------------------------+
                +--------------------------+
                               |
                               |
                               | 4
                               |
                +-------------------------+
                | Customer     |          |
                |              |          |
                |              v          |
                |           Accept        |
                |              +          |
                |              |          |
                |              | 5        |
                |              v          |
                |         recv_queue_     |                +-----------------+
                |              +          |                |KVWorker         |
                |              | 6        |     +--------> |                 |
                |              |          |     |    8     |         Process |
                |              v          |     |          +-----------------+
                |          Receiving      |     |
                |              +          |     |
                |              | 7        |     |
                |              |          |     |          +-----------------+
                |              v          |     |          |KVServer         |
                |         recv_handle_+---------+--------> |                 |
                |                         |          8     |         Process |
                +-------------------------+                +-----------------+
​
​

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

Если вы хотите вовремя получать новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.

ссылка 0xFF

Woohoo. В это время. Grass. Quota/~Oyster/file/…

анализ кода ps-lite

Использование PS-Lite

изучение исходного кода ps-lite

примечания к коду ps-lite

Примечания к PS Lite

ps-lite интерпретация исходного кода

ps-lite углубленная интерпретация исходного кода

Введение в распределенный TensorFlow

Проектирование архитектуры распределенной платформы машинного обучения

Четверная область крупномасштабных платформ машинного обучения

sona: Введение в Spark на крупномасштабной платформе распределенного машинного обучения Angel

ps-lite углубленная интерпретация исходного кода

Масштабируемая распределенная архитектура машинного обучения на основе сервера параметров

Mu Li. Scaling Distributed Machine Learning with the Parameter Server.

CMU. parameterserver.org/

Joseph E.Gonzalez. Emerging Systems For Large-scale Machine Learning.

[Распределенные вычисления] Альтернатива MapReduce — сервер параметров

схема parameter_server

Адам: крупномасштабная распределенная среда машинного обучения

Parameter Server for Distributed Machine Learning PS-Lite Documents анализ исходного кода ps-lite

Анализ исходного кода PS-Lite

blog.CSDN.net/Настольные кроссворды/… blog.CSDN.net/Differentiation_24/Ariti… Вуууу. blog.CSDN.net/К Энг Роджер/Ах…

блог woo woo woo.cn на.com/heguanyou/afraid…

Принцип работы ps-lite и параметры сервера MXNet

создание среды, связанной с ps-lite

Одна из серий обучения ps-lite ----- mac install ps-lite

заметки ps-lite (анализ dist-lr)

[Tech1] Краткий сервер параметров: анализ ps-lite

Начало работы с распределенным машинным обучением — принцип логистической регрессии на основе сервера параметров

анализ исходного кода ps-lite

Ууху. Call.com/topic/20175…

Крупномасштабное машинное обучение — инженерная перспектива — Содержание

Параллельная логистическая регрессия

Распространяется word2vec на основе ps-lite

ps-lite: изучите серию 3 --- Введение в ps-lite (1. Обзор)

примечания к коду ps-lite

заметки ps-lite (анализ dist-lr)

Ууху. Call.com/topic/20175…

blog.CSDN.net/тяжелый вкус вас/art IC…

заметки ps-lite (анализ dist-lr)

Ууху. Call.com/topic/20175…

blog.CSDN.net/тяжелый вкус вас/art IC…