[Анализ исходного кода] Сервер параметров машинного обучения ps-lite(2) ----- Коммуникационный модуль Van

машинное обучение глубокое обучение

0x00 сводка

Эта статья является второй частью серии серверов параметров, посвященной коммуникационному модулю Van для PS-lite.

Другие статьи из этой серии:

[Анализ исходного кода] Сервер параметров машинного обучения ps-lite (1) ----- PostOffice

0x01 Обзор функций

Когда в почтовом отделении есть адресная книга, для вывоза и доставки объектов требуется грузовик.Фургон является коммуникационным модулем всего сервера параметров, и его характеристики следующие.

  • При создании экземпляра класса PostOffice экземпляр класса Van создается как переменная-член. Экземпляр Van имеет тот же жизненный цикл, что и экземпляр PostOffice (есть только один такой объект на узел);
  • Ван отвечает за конкретную связь между узлами. В частности, он отвечает за установление взаимосвязи между узлами (например, соединение между Worker и Scheduler) и открытие локального принимающего потока для отслеживания полученного сообщения.

В настоящее время VAN имеет две реализации:

  • ZMQVan — это реализация Van на основе zeromq, то есть низкоуровневые детали подключения реализованы с помощью библиотеки zmq (библиотека zmq — это библиотека с открытым исходным кодом, которая обеспечивает отличную инкапсуляцию сокетов, что упрощает программирование сокетов, лаконичнее и эффективнее).
  • IBVerbsVan — это реализация ByteDance, которая не была подробно изучена.

0x02 Определение

2.1 UML-диаграммы

Диаграмма UML приведена первой.

2.2 Основное описание

Ниже мы даем описание только ключевых переменных и функций-членов объекта Van.

Его основные переменные следующие:

  • Node scheduler_ : параметры узла планировщика, каждый узел будет записывать информацию об узле планировщика;
  • Узел my_node_ : Параметры этого узла. Если этот узел является планировщиком, my_node_ будет указывать на указанный выше scheduler_ ;
  • bool is_scheduler_ : является ли этот узел планировщиком;
  • std::unique_ptr Receiver_thread_ : указатель на поток, который получает сообщение;
  • std::unique_ptr heartbeat_thread_ : указатель на поток пульса отправки;
  • std::vector барьер_счет_ : Счетчик барьеров используется для записи количества зарегистрированных узлов.Только после того, как все узлы зарегистрированы, система переходит в состояние готовности, и планировщик отправляет сообщение о готовности всем узлам, и система официально запущена. .
  • Resender *resender_ = nullptr : повторно отправить указатель сообщения;
  • std::atomic timestamp_{0} : идентификатор автоинкремента сообщения, атомарная переменная;
  • std::unordered_map<:string int>connected_nodes_ : записывает, к каким узлам в данный момент подключено;

Его основные функции заключаются в следующем:

  • start : установить инициализацию связи;

  • Получение: функция обработки потока сообщений получения;

  • Heartbeat : обработчик для отправки потока сердцебиения;

  • ProcessAddNodeCommandAtScheduler : функция обработки сообщений планировщика AddNode;

    • ProcessHearbeat: функция обработки пакетов Heartbeat;
    • ProcessDataMsg : функция обработки сообщений данных (push & pull);
    • ProcessAddNodeCommand : функция обработки сообщений AddNode воркера и сервера;
    • ProcessBarrierCommand : функция обработки барьерных сообщений;

2.3 Управление потоками

Три роли, определенные PS Lite, работают в многопоточном механизме, и каждый поток берет на себя определенные обязанности и создается при запуске экземпляра Van, которому он принадлежит.

Конкретное описание выглядит следующим образом:

  • Все экземпляры Van планировщика, рабочего и сервера содержат поток, который принимает данные.
  • Экземпляры Van Worker и Server также содержат поток, который периодически отправляет тактовые импульсы планировщику.
  • Scheduler, Worker и Server также запускают поток мониторинга, если определена переменная среды PS_RESEND со значением, отличным от 0.

2.4 Определение класса

Подробный код (резюме) выглядит следующим образом:

class Van {
 public:
  static Van *Create(const std::string &type);
  virtual void Start(int customer_id);
  int Send(const Message &msg);
  virtual void Stop();
  inline int GetTimestamp() { return timestamp_++; }
  inline bool IsReady() { return ready_; }
​
 protected:
  //连结节点
  virtual void Connect(const Node &node) = 0;
  //绑定到自己节点之上  
  virtual int Bind(const Node &node, int max_retry) = 0;
  //接收消息,用阻塞方式  
  virtual int RecvMsg(Message *msg) = 0;
 //发送消息
  virtual int SendMsg(const Message &msg) = 0;
​
  /**
   * \brief pack meta into a string
   */
  void PackMeta(const Meta &meta, char **meta_buf, int *buf_size);
  /**
   * \brief pack meta into protobuf
   */
  void PackMetaPB(const Meta &meta, PBMeta *pb);
  /**
   * \brief unpack meta from a string
   */
  void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta);
​
  Node scheduler_;
  Node my_node_;
  bool is_scheduler_;
  std::mutex start_mu_;
​
 private:
  /** thread function for receving */
  void Receiving();
​
  /** thread function for heartbeat */
  void Heartbeat();
​
  // node's address string (i.e. ip:port) -> node id
  // this map is updated when ip:port is received for the first time
  std::unordered_map<std::string, int> connected_nodes_;
  // maps the id of node which is added later to the id of node
  // which is with the same ip:port and added first
  std::unordered_map<int, int> shared_node_mapping_;
​
  /** whether it is ready for sending */
  std::atomic<bool> ready_{false};
  std::atomic<size_t> send_bytes_{0};
  size_t recv_bytes_ = 0;
  int num_servers_ = 0;
  int num_workers_ = 0;
  /** the thread for receiving messages */
  std::unique_ptr<std::thread> receiver_thread_;
  /** the thread for sending heartbeat */
  std::unique_ptr<std::thread> heartbeat_thread_;
  std::vector<int> barrier_count_;
  /** msg resender */
  Resender *resender_ = nullptr;
  int drop_rate_ = 0;
  std::atomic<int> timestamp_{0};
  int init_stage = 0;
​
 //以下是处理各种类型消息
  void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes,
                                        Meta *recovery_nodes);
  void ProcessTerminateCommand();
  void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes);
  void ProcessBarrierCommand(Message *msg);
  void ProcessHearbeat(Message *msg);
  void ProcessDataMsg(Message *msg);
​
  //更新本地NodeID
  void UpdateLocalID(Message *msg, std::unordered_set<int> *deadnodes_set,
                     Meta *nodes, Meta *recovery_nodes);
​
  const char *heartbeat_timeout_val =
      Environment::Get()->find("PS_HEARTBEAT_TIMEOUT");
  int heartbeat_timeout_ =
      heartbeat_timeout_val ? atoi(heartbeat_timeout_val) : 0;
​
  DISALLOW_COPY_AND_ASSIGN(Van);
};

0x03 Инициализировать

Функция функции инициализации объекта Van заключается в выполнении различных настроек в соответствии с различными типами локальных узлов, чтобы запустить порт, установить соединение с планировщиком, запустить поток приема сообщений, поток пульса и т. д. , так что связь может быть выполнена. детали следующим образом:

  1. Сначала получите соответствующую информацию из переменных среды, таких как «ip, порт» планировщика (эти два предустановлены), роль этого узла (Рабочий/Сервер/Планировщик) и т. д., а затем инициализируйте переменную-член scheduler_;

  2. Если этот узел является планировщиком, назначьте scheduler_ для my_node_;

  3. Если этот узел не является планировщиком, то:

    1. Получите информацию об ip этого узла из системы;
    2. Используйте GetAvailablePort, чтобы получить порт;
  4. Используйте Bind для привязки порта;

  5. Вызовите Connect, чтобы установить соединение с планировщиком (планировщик также подключается к своему предустановленному фиксированному порту);

  6. Запустите поток сообщений получения локального узла.receiver_thread_,воплощать в жизньVan::Receiving;

  7. Если этот узел не является планировщиком, отправьте планировщику сообщение ADD_NODE, чтобы информацию о локальном узле можно было уведомить планировщику, то есть зарегистрировать в планировщике;

  8. Затем войдите в состояние ожидания и подождите, пока планировщик уведомит о готовности (планировщик будет ждать, пока все узлы завершат регистрацию, а затем отправит сообщение о готовности); обратите внимание, что здесь узел планировщика также будет ждать, но это не влияет на приемный поток узла планировщика для приема и обработки сообщений;

  9. После готовности запустите поток пульса и установите соединение пульса с планировщиком;

Дальнейшее объяснение пунктов 7 и 8:

  • Когда рабочий и серверный узлы связывают ip и порт, они отправляют на узел планировщикаADD_NODEсообщение.
  • Когда планировщик получает все рабочие и серверныеADD_NODEПосле сообщения отвечайте по очередиADD_NODEсообщение,
  • В этом процессе каждый узел ожидает завершения вышеуказанного процесса через атомарную переменную ready_.

Конкретный код выглядит следующим образом:

void Van::Start(int customer_id) {
  // get scheduler info
  start_mu_.lock();
​
  if (init_stage == 0) {
    // 初始化scheduler_这个成员变量
    scheduler_.hostname = std::string(
        CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
    scheduler_.port =
        atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
    scheduler_.role = Node::SCHEDULER;
    scheduler_.id = kScheduler;
    // 确认本节点是scheduler节点
    is_scheduler_ = Postoffice::Get()->is_scheduler();
​
    // get my node info
    if (is_scheduler_) {
      // 初始化本节点,因为是scheduler,所以直接赋值
      my_node_ = scheduler_;
    } else {
      auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
      const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
      std::string ip;
      if (nhost) ip = std::string(nhost);
      if (ip.empty()) {
        const char* itf = Environment::Get()->find("DMLC_INTERFACE");
        std::string interface;
        if (itf) interface = std::string(itf);
        if (interface.size()) {
          GetIP(interface, &ip);
        } else {
          GetAvailableInterfaceAndIP(&interface, &ip);
        }
      }
      int port = GetAvailablePort();
      const char* pstr = Environment::Get()->find("PORT");
      if (pstr) port = atoi(pstr);
      my_node_.hostname = ip;
      my_node_.role = role;
      my_node_.port = port;
      // cannot determine my id now, the scheduler will assign it later
      // set it explicitly to make re-register within a same process possible
      my_node_.id = Node::kEmpty;
      my_node_.customer_id = customer_id;
    }
​
    // bind.
    //绑定接口,把本节点绑定到ip:port这个socket上,理论来说这个函数就是初始化了receiver_
    my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40);
​
    // connect to the scheduler
    // 连接上scheduler_,由于本节点就是scheduler_,其实就是初始化senders_,由于发送的节点很多,所以这里是一个map<int,void*>
      // 在这里就是senders_[1] = socket_1, socket_1中的body设置一点字符“ps1***”, 注意链接不是sendMsg
    Connect(scheduler_);
​
    // for debug use
    if (Environment::Get()->find("PS_DROP_MSG")) {
      drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
    }
    // start receiver
    // 开启一个接收消息的线程,这里就是处理消息
    receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
    init_stage++;
  }
  start_mu_.unlock();
  
  if (!is_scheduler_) {
    // let the scheduler know myself
    // worker和server节点会通过 ADD_NODE 消息把本地节点的信息告诉scheduler,比如角色,ip,port...
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }
​
  // wait until ready
  // 等待 ready_ 从false变成true,当是scheduler的时候,必须要有等worker和server节点过来,不然一直都是阻塞在这,如果是 worker/server,则是等待 scheduler 发送系统allready消息。
  while (!ready_.load()) {
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
  }
​
  start_mu_.lock();
  if (init_stage == 1) {
    // resender
    if (Environment::Get()->find("PS_RESEND") &&
        atoi(Environment::Get()->find("PS_RESEND")) != 0) {
      int timeout = 1000;
      if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
        timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
      }
      // 如果设置了超时重传,就初始化resender_这个变量
      resender_ = new Resender(timeout, 10, this);
    }
​
    if (!is_scheduler_) {
      // start heartbeat thread
      // 初始化心跳线程
      heartbeat_thread_ =
          std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
    }
    init_stage++;
  }
  start_mu_.unlock();
}

0x04 Принять сообщение

Сначала мы расскажем, как работают фоновые потоки, а затем подробно проанализируем, как обрабатывать различные сообщения.

4.1 Поток сообщений фоновой обработки

ps-lite запускает фоновый поток Receiver_thread_ для получения/обработки сообщений.

// start receiver
receiver_thread_ =
        std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));

4.2 Функции обработки

Receiver_thread_ использует функцию получения для обработки сообщений.

4.2.1 Информация управления

В дополнение к сообщениям данных, которые передают параметры, управляющая информация между каждым узлом включает:

  • ADD_NODE: рабочие и серверные узлы регистрируются с помощью планировщика;
  • БАРЬЕР: синхронные блокирующие сообщения между узлами;
  • HEARTBEAT: сигнал сердцебиения между узлами;
  • TERMINATE: сигнал выхода узла;
  • ACK: сообщение подтверждения,Тип ACK отображается только в том случае, если включен класс Resender..
  • ПУСТОЙ: толкать или тянуть;

Поэтому в Receive будут вызываться разные функции обработки для обработки разных типов сообщений:

  • ProcessTerminateCommand: обрабатывать TERMINATE;
  • ProcessAddNodeCommand: обработать ADD_NODE;
  • ProcessBarrierCommand: Process BARRIER (уже проанализировано выше);
  • ProcessHearbeat : Процесс HEARTBEAT;

4.2.2 Глобальные переменные в потоке

В потоке есть две переменные, потому что он находится за пределами во время (True) Loop, поэтому он принадлежит к глобальной переменной в потоке, что следует обратить внимание при чтении кода.

  • nodes : только планировщик будет использовать его при обработке ADD_NODE и сохранит все узлы, которыми в данный момент владеет планировщик;
  • recovery_nodes : используется планировщиком только при обработке ADD_NODE и хранит все узлы восстановления (восстановления и перезапущенные узлы), которые в данный момент принадлежат планировщику;

4.2.3 Конкретная реализация

Логика приема следующая:

  • Вызовите RecvMsg (производный класс будет реализован), чтобы получить последние новости;
  • Если установлена ​​выборка, сбросьте ее;
  • Если установлен механизм повторной передачи, он определяет, повторяется ли сообщение, и использует resender_->AddIncomming(msg) для обработки повторяющегося сообщения;
  • Сообщения управления процессом или сообщения данных;

Конкретный код выглядит следующим образом

void Van::Receiving() {
  Meta nodes;
  // 以下两个可以认为是全局变量
  Meta recovery_nodes;  // store recovery nodes 储存康复重启的节点
  recovery_nodes.control.cmd = Control::ADD_NODE; // 康复重启节点的control.cmd 都设置为 ADD_NODE
​
  while (true) {
    Message msg;
    int recv_bytes = RecvMsg(&msg); //利用receiver_ 变量拿到消息
    // For debug, drop received message
    if (ready_.load() && drop_rate_ > 0) {
      unsigned seed = time(NULL) + my_node_.id;
      if (rand_r(&seed) % 100 < drop_rate_) {
        LOG(WARNING) << "Drop message " << msg.DebugString();
        continue;
      }
    }
​
    CHECK_NE(recv_bytes, -1);
    recv_bytes_ += recv_bytes; //收到的字节数累加
    if (Postoffice::Get()->verbose() >= 2) {
      PS_VLOG(2) << msg.DebugString();
    }
    // duplicated message
    if (resender_ && resender_->AddIncomming(msg)) continue; //重传确认机制
​
    if (!msg.meta.control.empty()) { //如果是控制类型的消息
      // control msg
      auto& ctrl = msg.meta.control;
      if (ctrl.cmd == Control::TERMINATE) {
        ProcessTerminateCommand();
        break;
      } else if (ctrl.cmd == Control::ADD_NODE) {
        ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); //当执行到这个位置的时候继续跳转
      } else if (ctrl.cmd == Control::BARRIER) {
        ProcessBarrierCommand(&msg);
      } else if (ctrl.cmd == Control::HEARTBEAT) {
        ProcessHearbeat(&msg); // 发回Heartbeat的ACK
      } else {
        LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
      }
    } else { //非控制类型的消息处理方式
      ProcessDataMsg(&msg);
    }
  }
}

4.3 Обработка сообщений ADD_NODE

ADD_NODE — это управляющее сообщение, используемое рабочим/сервером для регистрации в планировщике.

4.3.1 Логика регистрации

Сначала вспомним основную идею регистрации.

  • Когда рабочий и серверный узлы связывают ip и порт, они отправляют на узел планировщикаADD_NODEсообщение.
  • Когда планировщик получает все рабочие и серверныеADD_NODEПосле сообщения отвечайте по очередиADD_NODEсообщение, обратите внимание, что ответ также имеет тот же типADD_NODEИнформация.
  • Каждый узел (планировщик, рабочий, сервер) ожидает завершения вышеуказанного процесса через атомарную переменную ready_ в этом процессе.

4.3.2 ProcessAddNodeCommand

Логика ProcessAddNodeCommand следующая.

  • Узнайте идентификатор тайм-аута пакета пульса и передайте его в dead_set.

  • Получить управляющую информацию в полученном сообщении.

  • Вызовите UpdateLocalID, где:

    • Если это новый узел, планировщик записывает этот новый узел.
    • Если узел будет перезапущен, информация о старом узле будет обновлена.
  • Если планировщик, то:

    • Вызовите ProcessAddNodeCommandAtScheduler, чтобы получить сообщения ADD_NODE всех рабочих процессов и серверов, а затем назначить идентификаторы узлов и ответить, то есть установить последний ранг всех узлов и отправить их всем рабочим процессам и серверам.
  • Если это не планировщик, это означает, что работа и сервер получили сообщение ADD_NODE, на которое ответил планировщик, тогда:

    • Если это существующий узел, новый узел не будет найден в linked_nodes_, и первый узел вызовет Connect, чтобы установить соединение с новым узлом.
    • Если он является новым узлом, все существующие узлы (не того же типа) соединяются.
    • Обновите информацию о глобальном узле в connect_nodes_, включая глобальный ранг (глобальный ранг локального узла и другая информация получаются здесь с помощью Receiver_thread_);
    • Наконец установите ready_=true, то есть нода тоже может работать, потому что на ней будет блокироваться основной поток.

Конкретный код выглядит следующим образом:

void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes,
                                Meta* recovery_nodes) {
  auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超时的id
  std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//转存到dead_set之中
  auto& ctrl = msg->meta.control; //拿到收到消息里面的control信息
​
  UpdateLocalID(msg, &dead_set, nodes, recovery_nodes);
​
  if (is_scheduler_) { // Scheduler 节点
    ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
  } else { // Worker & Server 节点
    for (const auto& node : ctrl.node) {
      std::string addr_str = node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
        // 现有节点会在自己连接之中查找这个新节点,发现现有连接中没有这个新节点
        // 如果是新节点,则会连接现有节点(非同类型)
        Connect(node); // 与新节点进行连接        
        connected_nodes_[addr_str] = node.id; // 加入已经连接的节点
      }
      if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
      if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
    }
    ready_ = true;
  }
}

4.3.3 UpdateLocalID

Функция этой функции заключается в обновлении информации об идентификаторе узла внутри узла.Она также делится на два случая.Логика функции следующая:

  • Если msg->meta.sender — это Meta::kEmpty, то есть он не установлен, то планировщик, обрабатывающий это сообщение, должен быть Планировщиком, который войдет в ветвь if.

    • Если количество control.nodes текущих узлов меньше, чем «количество настроенных серверов + количество настроенных воркеров», это означает, что это фаза запуска системы, и информация об узле текущего сообщения добавляется в контроль.узел.
    • В противном случае это означает, что система работает, и ее следует снова подключить после того, как некоторые узлы умирают и перезапускаются. Затем найдите идентификатор мертвого узла из узла control.node, а роль узла совпадает с текущим сообщением (того же типа), и назначьте этот идентификатор узла перезапущенному узлу. И обновить узлы->control.node и recovery_nodes.
  • Ниже приведена логика обычной обработки узлов:

    • То есть просмотрите всю информацию об узле, возвращенную планировщиком, цель состоит в том, чтобы найти узел, соответствующий его собственному IP-адресу и порту.
    • Если он найден, обновите информацию о локальном узле (поскольку информация node_id не устанавливается при запуске узла, это должно быть установлено планировщиком единообразно. Судя по комментариям, цель состоит в том, чтобы сделать возможной повторную регистрацию). Включите информацию о глобальном рейтинге.

Конкретный код выглядит следующим образом:

void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set,
                        Meta* nodes, Meta* recovery_nodes) {
  auto& ctrl = msg->meta.control;
  size_t num_nodes =
      Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
  // assign an id
  if (msg->meta.sender == Meta::kEmpty) { //如果sender未设定,则处理此message的一定是Scheduler
    CHECK(is_scheduler_);
    CHECK_EQ(ctrl.node.size(), 1); //msg中的control命令中的节点集合就是worker自己,所以就是1个节点
    if (nodes->control.node.size() < num_nodes) { //没有到齐
      nodes->control.node.push_back(ctrl.node[0]);
    } else { //如果所有work和server到齐了,就进入else
      // some node dies and restarts
      CHECK(ready_.load());
      for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) {
        const auto& node = nodes->control.node[i];
        if (deadnodes_set->find(node.id) != deadnodes_set->end() &&
            node.role == ctrl.node[0].role) {
          auto& recovery_node = ctrl.node[0];
          // assign previous node id
          recovery_node.id = node.id;
          recovery_node.is_recovery = true;
          nodes->control.node[i] = recovery_node;
          recovery_nodes->control.node.push_back(recovery_node);
          break;
        }
      }
    }
  }
​
  // update my id / 对普通的node,更新其rank,scheduler 节点不会起作用(因为找不到)。
  // schedule发给此work节点的消息,如果发现本地的ip和port和消息中的某个一点重合,那么就把本地节点的ID(初始化时候没有ID,只是等于Empty)改为schedule发过来的 node id。
  for (size_t i = 0; i < ctrl.node.size(); ++i) {
    const auto& node = ctrl.node[i];
    if (my_node_.hostname == node.hostname && my_node_.port == node.port) {
      if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) {
        my_node_ = node;
        std::string rank = std::to_string(Postoffice::IDtoRank(node.id));
#ifdef _MSC_VER
        _putenv_s("DMLC_RANK", rank.c_str());
#else
        setenv("DMLC_RANK", rank.c_str(), true);
#endif
      }
    }
  }
}

4.3.4 ProcessAddNodeCommandAtScheduler

ProcessAddNodeCommandAtScheduler запускается в планировщике и обрабатывает сообщения управляющего типа.

Для узла планировщика после того, как планировщик получает сообщения ADD_NODE всех рабочих процессов и серверов, он назначает идентификаторы узлов и отвечает, то есть необходимо установить последний глобальный ранг всех узлов и отправить его всем рабочим процессам и серверам.

  • После получения всех сообщений о регистрации рабочих и серверов (nodes->control.node.size() == num_nodes):

    • Сортировка узлов по комбинации IP + порт.
    • Планировщик устанавливает соединения со всеми зарегистрированными узлами, обновляет временную метку пульса и присваивает глобальный ранг всем подключенным узлам планировщика.
    • Отправьте сообщение ADD_NODE (содержащее всю информацию об узле в планировщике) всем рабочим процессам и серверам.
    • положитready_ = true; То есть планировщик находится в состоянии готовности, независимо от того, подтверждают ли воркер и сервер получение сообщения ADD_NODE.
    • На принимающей стороне (рабочий процесс и сервер) такая информация, как глобальный ранг каждого локального узла, получается принимающей стороной Receiver_thread_ (другие функции), которая представляет собой информацию об узлах, возвращаемую планировщиком.
  • если!recovery_nodes->control.node.empty(), который указывает поведение регистрации для обработки некоторых перезапущенных узлов:

    • Узнайте идентификатор тайм-аута пакета пульса и передайте его в dead_set.

    • Установите соединение с узлом перезапуска (поскольку получен ADD_NODE), поэтому устанавливайте соединение только с этим новым узлом перезапуска (в коде естьCHECK_EQ(recovery_nodes->control.node.size(), 1)чтобы подтвердить, что узел перезапуска — 1).

    • Обновите пульс перезапущенных узлов.

    • Поскольку добавляется новый узел перезапуска, отправка используется для достижения двух целей:

      • Отправьте сообщение ADD_NODE (содержащее всю информацию о текущем узле в планировщике) всем рабочим процессам и серверам восстановления.
      • Отправьте информацию об узле восстановления на работающий узел.
      • Таким образом, узел, получивший сообщение, соответственно установит соединение с новым узлом;

Конкретный код выглядит следующим образом:

void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes,
                                           Meta* recovery_nodes) {
  recovery_nodes->control.cmd = Control::ADD_NODE;
  time_t t = time(NULL);
  size_t num_nodes =
      Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
  // scheduler收到所有worker和server的ADD_NODE的消息后进行节点id分配并应答
  if (nodes->control.node.size() == num_nodes) { // 节点收集完全
    // sort the nodes according their ip and port, 根据IP和port给worker,server排个序
    std::sort(nodes->control.node.begin(), nodes->control.node.end(),
              [](const Node& a, const Node& b) {
                return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0;
              });
    // assign node rank
    for (auto& node : nodes->control.node) {
      // 建立连接、更新心跳时间戳,给 scheduler所有连接的节点分配全局 rank。
      std::string node_host_ip =
          node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { //如果ip:port不存在van_中的话
        CHECK_EQ(node.id, Node::kEmpty); //判断是不是初始化节点
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_); //如果是sever的话,就id产生一个id号,num_servers_初始化为0
        node.id = id; //将这个新节点的id赋值为id
        Connect(node); //连接这个新节点, 即建立一个socket, 然后senders_[id] = sender; 就是将目标id的socket存放起来后面使用
        Postoffice::Get()->UpdateHeartbeat(node.id, t);//更新心跳包
        connected_nodes_[node_host_ip] = id; //既然 worker, server 已经发message来了,scheduler要把这个节点作为已经链接的节点
      } else {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        shared_node_mapping_[id] = connected_nodes_[node_host_ip];
        node.id = connected_nodes_[node_host_ip];
      }
      if (node.role == Node::SERVER) num_servers_++;//更新rank
      if (node.role == Node::WORKER) num_workers_++;
    }
    nodes->control.node.push_back(my_node_); //把本节点放到里面
    nodes->control.cmd = Control::ADD_NODE;
    Message back;
    back.meta = *nodes;
    // 向所有的worker和server发送ADD_NODE消息
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      int recver_id = r;
      if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
        back.meta.recver = recver_id;
        back.meta.timestamp = timestamp_++;
        Send(back);
      }
    }
​
    ready_ = true; //scheduler已经准备好了
  } else if (!recovery_nodes->control.node.empty()) { // 节点没有收集完全
    auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超时的id
    std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//转存到dead_set
    // send back the recovery node
    CHECK_EQ(recovery_nodes->control.node.size(), 1);
    Connect(recovery_nodes->control.node[0]);
    Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t);
    Message back;
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      if (r != recovery_nodes->control.node[0].id &&
          dead_set.find(r) != dead_set.end()) {
        // do not try to send anything to dead node
        continue;
      }
      // only send recovery_node to nodes already exist
      // but send all nodes to the recovery_node
      back.meta =
          (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes;
      back.meta.recver = r;
      back.meta.timestamp = timestamp_++;
      Send(back);
    }
  }
}

Логика этой части процесса следующая:

                                                              +
    Scheduler                                                 |      Worker
                                                              |
        +                                                     |        +
        |                                                     |        |
        |                                                     |        |
        v                                                     |        |
Postoffice::Start +---->  Van::Start                          |        |
                             +                                |        |
                             |                                |        |
                             |                                |        |
                             v                                |        |
                          Connect--do nothing                 |        |
                             +                                |        v
                             |                                |
                             |                                |  Postoffice::Start +----->  Van::Start
                             |                                |                                +
                             v                                |                                |
                         receiver_thread_ +---+               |                                |
                             +                |               |                                v
                             |                |               |                             Connect--to scheduler
                             |                |               |                                +
                             |                |               |                                |
                             |                |               |                                |
                             |                |               |                                |
                             |                |               |                                v
                             |                |               |                          receiver_thread_  +----->+
                             |                |               |                                +                  |
                             |                |               |                                |                  |
                             |                |               |                                |                  |
                             |                |               |                                v                  |
                             |                |   <---------------------------------------+   Send                |
                             |                |               |   ADD_NODE                     +                  |
                             |                v               |                                |                  |
                             |                                |                                |                  |
                             |       ProcessAddNodeCommand    |                                |                  |
                             |                +               |                                |                  |
                             |                |               |                                |                  |
                             |                | All nodes OK  |                                |                  |
                             |                |               |                                |                  |
                             v                |               |                                |                  |
                                              | set rank      |                                |                  |
                      wait until ready        |               |                                |                  |
                             +                |               |                                |                  |
                             |                +---------------------------------------------------------------->  |
                             |                |               |  ADD_NODE response(nodes info) |                  |
                             |                |               |                                |         ProcessAddNodeCommand
                             |                |               |                                v                  |
                             |                |               |                                                   |
                             | <--------------+               |                         wait until ready          |
                             |    ready_ = true               |                                +                  |
                             |                                |                                |  <---------------+
       +-------------------+ v                                |                                |
       |                                                      |         +--------------------+ v
       |                                                      |         |
       v                                                      |         |
                                                              |         v
  Postoffice::Barrier                                         |
                                                              |   Postoffice::Barrier
                                                              +

Мобильный телефон выглядит следующим образом: планировщик слева и рабочий справа:

4.3.5 Последовательность вновь добавленных узлов

Процесс подключения можно разделить на 3 этапа:

Шаг 1: Когда рабочий/серверный узел инициализируется, он отправляет информацию о подключении к запланированному узлу, предполагая, что это узел 2;

if (!is_scheduler_) {
    // let the scheduler know myself
    Message msg;
    Node customer_specific_node = my_node_;
    customer_specific_node.customer_id = customer_id;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::ADD_NODE;
    msg.meta.control.node.push_back(customer_specific_node);
    msg.meta.timestamp = timestamp_++;
    Send(msg); //发送给schedular, 建立链接信息。
  }

Шаг 2: После того, как узел планировщика получит информацию, в ProcessAddNodeCommandAtScheduler он сначала установит соединение с узлом 2. Эта «информация о присоединении к узлу» будет передана всем рабочим узлам/серверным узлам, которые установили соединения с расписанием, а информация о запросе узла 2 на соединение будет помещена в метаинформацию.

    // assign node rank
    for (auto& node : nodes->control.node) {
      std::string node_host_ip =
          node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        node.id = id;
        Connect(node); // 连接这个新节点, 即建立一个socket, 然后senders_[id] = sender; 就是将目标id的socket存放起来后面使用
        Postoffice::Get()->UpdateHeartbeat(node.id, t);
        connected_nodes_[node_host_ip] = id;
      } else {
        int id = node.role == Node::SERVER
                     ? Postoffice::ServerRankToID(num_servers_)
                     : Postoffice::WorkerRankToID(num_workers_);
        shared_node_mapping_[id] = connected_nodes_[node_host_ip];
        node.id = connected_nodes_[node_host_ip];
      }
      if (node.role == Node::SERVER) num_servers_++;
      if (node.role == Node::WORKER) num_workers_++;
    }
    nodes->control.node.push_back(my_node_);
    nodes->control.cmd = Control::ADD_NODE;
​
    Message back;
    back.meta = *nodes;
    // 向所有已经和schedular建立连接的worker节点/server节点 广播此 "节点的加入信息“,并把 节点 2 请求连接的信息放入meta信息中。
    for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
      int recver_id = r;
      if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
        back.meta.recver = recver_id;
        back.meta.timestamp = timestamp_++;
        Send(back);
      }
    }
​

Шаг 3: После того, как существующий рабочий/серверный узел получит эту команду, он сформирует соединение с узлом 2 в ProcessAddNodeCommand.

   for (const auto& node : ctrl.node) {
      std::string addr_str = node.hostname + ":" + std::to_string(node.port);
      if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // 现有连接中没有这个新节点
        Connect(node); // 与新节点进行连接
        connected_nodes_[addr_str] = node.id;
      }
      if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
      if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
​

На этом этапе описывается весь процесс. После добавления каждого нового узла уже присоединенный узел установит соединение с новым узлом через запланированный узел.

4.4 Обработка сообщений HEARTBEAT

Далее мы анализируем механизм сердцебиения.

4.4.1 Механизм сердцебиения

Чтобы зафиксировать доступность сети, PS Lite разработала механизм сердцебиения. особенно:

  • Структура MAP поддерживается в единственном элементе PostOffice каждого узла, в котором хранится активная информация узлов, связанных с пульсом. Ключ — это номер узла, а значение — отметка времени последнего получения его сообщения HEARTBEAT.
  • Worker/Server записывает пульс только планировщика, а планировщик записывает пульс всех узлов. На основе метки времени и тайм-аута пульса можно вывести все мертвые узлы.
  • Каждый узел Worker/Server создаст новый поток пульса и отправит сообщение HEARTBEAT планировщику каждые PS_HEARTBEAT_INTERVAL секунд;
  • Когда узел планировщика получает его, он отвечает сообщением HEARTBEAT.
  • Планировщик отвечает и определяет, активен он или нет, основываясь на разнице между текущим временем и временем получения контрольного пакета.
  • Планировщик определит мертвый узел на основе метки времени пульсирующего узла. Если идентификатор вновь добавленного узла находится в контейнере dead_node, это означает, что узел восстановлен, и вновь добавленный узел соединен с существующим узлом через запланированный транзит.

детали следующим образом:

4.4.2 Структура данных

std::unordered_map heartbeats_ предназначен для хранения активной информации узла, связанного с пульсом. Ключ — это номер узла, а значение — отметка времени последнего получения его сообщения HEARTBEAT.

UpdateHeartbeat периодически обновляет пульс.

  void UpdateHeartbeat(int node_id, time_t t) {
    std::lock_guard<std::mutex> lk(heartbeat_mu_);
    heartbeats_[node_id] = t;
  }
  
  std::unordered_map<int, time_t> heartbeats_;  

4.4.3 Рабочий/Сервер отправляет пульс

В этих двух узлах запускается поток, и каждый рабочий/серверный узел отправляет планировщику сообщение HEARTBEAT каждые PS_HEARTBEAT_INTERVAL секунд:

    if (!is_scheduler_) {
      // start heartbeat thread
      heartbeat_thread_ =
          std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
    }

Специфическая функция сердцебиения:

void Van::Heartbeat() {
  const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL");
  const int interval = val ? atoi(val) : kDefaultHeartbeatInterval;
  while (interval > 0 && ready_.load()) {
    std::this_thread::sleep_for(std::chrono::seconds(interval));
    Message msg;
    msg.meta.recver = kScheduler;
    msg.meta.control.cmd = Control::HEARTBEAT;
    msg.meta.control.node.push_back(my_node_);
    msg.meta.timestamp = timestamp_++;
    Send(msg);
  }
}
​

4.4.4 Пульс обработки узла планировщика

После того, как узел планировщика получает сообщение post-HEARTBEAT, он отвечает сообщением HEARTBEAT. UpdateHeartbeat периодически обновляет пульс.

void Van::ProcessHearbeat(Message* msg) {
  auto& ctrl = msg->meta.control;
  time_t t = time(NULL);
  for (auto& node : ctrl.node) {
    Postoffice::Get()->UpdateHeartbeat(node.id, t);
    if (is_scheduler_) {
      Message heartbeat_ack;
      heartbeat_ack.meta.recver = node.id;
      heartbeat_ack.meta.control.cmd = Control::HEARTBEAT;
      heartbeat_ack.meta.control.node.push_back(my_node_);
      heartbeat_ack.meta.timestamp = timestamp_++;
      // send back heartbeat
      Send(heartbeat_ack);
    }
  }
}

4.4.5 Мертвые узлы

Когда планировщик обрабатывает сообщение ADD_NODE, он проверяет, есть ли мертвый узел, и определяет, активен ли он, основываясь на разнице между текущей меткой времени и полученной меткой времени пакета пульса.

std::vector<int> Postoffice::GetDeadNodes(int t) {
  std::vector<int> dead_nodes;
  if (!van_->IsReady() || t == 0) return dead_nodes;
​
  time_t curr_time = time(NULL);
  const auto& nodes = is_scheduler_
    ? GetNodeIDs(kWorkerGroup + kServerGroup)
    : GetNodeIDs(kScheduler);
  {
    std::lock_guard<std::mutex> lk(heartbeat_mu_);
    for (int r : nodes) {
      auto it = heartbeats_.find(r);
      if ((it == heartbeats_.end() || it->second + t < curr_time)
            && start_time_ + t < curr_time) {
        dead_nodes.push_back(r);
      }
    }
  }
  return dead_nodes;
}
​

Логика следующая:

+----------------------------------------------------+
| Scheduler                                          |
|                                                    |
|                                                    |
|                                                    |
| heartbeats_                                        |
|                                                    |
| receiver_thread_+-------->  ProcessHearbeat        |
|                            ^      +    ^  +        |
|                            |      |    |  |        |
|                            |      |    |  |        |
|                            |      |    |  |        |
+----------------------------------------------------+
                             |      |    |  |
                             |      |    |  |                        RESPONSE
                             |      |    |  +-------------------------------------+
                             |      |    |                                        |
                             |      |    +-------------------------------+        |
                             |      |                                    |        |
                  HEARTBEAT  |      | RESPONSE                HEARTBEAT  |        |
                             |      |                                    |        |
+-----------------------------------------+  +-----------------------------------------+
| Worker                     |      |     |  | Server                    |        |    |
|                            |      |     |  |                           |        |    |
|                            |      |     |  |                           |        |    |
|                            |      |     |  |                           |        |    |
| heartbeats_                |      |     |  |  heartbeats_              |        |    |
|                            +      |     |  |                           +        |    |
| heartbeat_thread_+----> Heartbeat |     |  |  heartbeat_thread_+-->  Heartbeat  |    |
|                                   |     |  |                                    |    |
|                                   v     |  |                                    v    |
| receiver_thread_ +--->  ProcessHearbeat |  | receiver_thread_  +-->  ProcessHearbeat |
|                                         |  |                                         |
|                                         |  |                                         |
|                                         |  |                                         |
+-----------------------------------------+  +-----------------------------------------+

4.5 Обработка сообщений TERMINATE

ProcessTerminateCommand обработает сообщение об окончании, установив для ready_ значение false.

Это указывает на то, что Ван находится в неправильном состоянии и не может продолжить обработку.

void Van::ProcessTerminateCommand() {
  PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
  ready_ = false;
}
​
inline bool IsReady() { return ready_; }

4.6 Обработка сообщений ACK

4.6.1 Механизм подтверждения

В распределенной системе связь также ненадежна, а потеря и задержка пакетов — это все сценарии, которые необходимо учитывать. PS Lite разработала класс Resender для повышения надежности связи, в котором был представлен механизм ACK. который:

  • Каждый узел ДОЛЖЕН ответить сообщением ACK на полученное сообщение без ACK/TERMINATE.
  • Каждый узел для каждого отправленного сообщения, отличного от ACK/TERMINATE, ДОЛЖЕН кэшировать его локально. Сохраненная структура данных представляет собой MAP, которая создает уникальные ключи на основе содержимого сообщения.
  • Каждый узел для полученного сообщения ACK должен удалить соответствующее сообщение из локального кеша в соответствии с ключом обратной связи.
  • Каждый узел запускает поток мониторинга, который проверяет локальный кеш каждые PS_RESEND_TIMEOUT миллисекунды. В соответствии с меткой времени отправки и текущим временем каждого сообщения найдите сообщения с истекшим временем ожидания для повторной передачи и накопите количество повторных попыток.

4.6.2 Класс отправителя

Определение выглядит следующим образом, где send_buff_ — это буфер отправки, который используется для хранения списка отправленных сообщений. acked_ — это сообщение, которое было подтверждено.

class Resender {
  std::thread* monitor_;
  std::unordered_set<uint64_t> acked_;
  std::atomic<bool> exit_{false};
  std::mutex mu_;
  int timeout_;
  int max_num_retry_;
  Van* van_;
  using Time = std::chrono::milliseconds;
  // the buffer entry
  struct Entry {
    Message msg;
    Time send;
    int num_retry = 0;
  };
  std::unordered_map<uint64_t, Entry> send_buff_;  
};
​

4.6.3 Мониторинг потоков

Поток и функция мониторинга следующие: когда он просыпается, находит отметку времени отправки и текущее время каждого сообщения из send_buff_ (локальный кеш), находит сообщение с истекшим временем ожидания для повторной отправки и накапливает количество повторных попыток. :

  monitor_ = new std::thread(&Resender::Monitoring, this);
​
  void Monitoring() {
    while (!exit_) {
      std::this_thread::sleep_for(Time(timeout_));
      std::vector<Message> resend;
      Time now = Now();
      mu_.lock();
      for (auto& it : send_buff_) {
        if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) {
          resend.push_back(it.second.msg);
          ++it.second.num_retry;
          CHECK_LT(it.second.num_retry, max_num_retry_);
        }
      }
      mu_.unlock();
​
      for (const auto& msg : resend) van_->Send(msg);
    }
  }
​

4.6.4 Буферизация при отправке

Когда Ван отправляет сообщение, если настроена повторная передача, вызывается функция AddOutgoing для добавления сообщения в буфер отправки.

int Van::Send(const Message& msg) {
  int send_bytes = SendMsg(msg);
  CHECK_NE(send_bytes, -1);
  send_bytes_ += send_bytes;
  if (resender_) resender_->AddOutgoing(msg);
  if (Postoffice::Get()->verbose() >= 2) {
    PS_VLOG(2) << msg.DebugString();
  }
  return send_bytes;
}
​

В буфер отправки добавляется следующая функция.

/**
   * \brief add an outgoining message
   *
   */
  void AddOutgoing(const Message& msg) {
    if (msg.meta.control.cmd == Control::ACK) return;
    CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString();
    auto key = GetKey(msg);
    std::lock_guard<std::mutex> lk(mu_);
    // already buffered, which often due to call Send by the monitor thread
    if (send_buff_.find(key) != send_buff_.end()) return;
​
    auto& ent = send_buff_[key];
    ent.msg = msg;
    ent.send = Now();
    ent.num_retry = 0;
  }
​

4.6.5 Очистить кеш

Следующая функция делает две вещи:

  • Проверьте, не дублируется ли это сообщение, полученное подтверждающее сообщение;
  • Если это сообщение подтверждения, оно удаляется из буфера отправки.
/**
   * \brief add an incomming message
   * \brief return true if msg has been added before or a ACK message
   */
  bool AddIncomming(const Message& msg) {
    // a message can be received by multiple times
    if (msg.meta.control.cmd == Control::TERMINATE) {
      return false;
    } else if (msg.meta.control.cmd == Control::ACK) {
      mu_.lock();
      auto key = msg.meta.control.msg_sig;
      auto it = send_buff_.find(key);
      if (it != send_buff_.end()) send_buff_.erase(it);
      mu_.unlock();
      return true;
    } else {
      mu_.lock();
      auto key = GetKey(msg);
      auto it = acked_.find(key);
      bool duplicated = it != acked_.end();
      if (!duplicated) acked_.insert(key);
      mu_.unlock();
      // send back ack message (even if it is duplicated)
      Message ack;
      ack.meta.recver = msg.meta.sender;
      ack.meta.sender = msg.meta.recver;
      ack.meta.control.cmd = Control::ACK;
      ack.meta.control.msg_sig = key;
      van_->Send(ack);
      // warning
      if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString();
      return duplicated;
    }
  }
​
​

4.7 Обработка сообщений данных

ProcessDataMsg используется для обработки сообщения с данными, отправленного воркером (то есть, воркер обновляет градиент на сервер).В частности, после получения соответствующего клиента, вызовите метод клиента для обработки и непосредственно преобразуйтеmsgпоставить в очередь на обработку.

Мы представим его в клиенте.

void Van::ProcessDataMsg(Message* msg) {
  // data msg
  int app_id = msg->meta.app_id;
  int customer_id =
      Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id;
  auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5);
  obj->Accept(*msg); // 这里给 Customer 添加消息
}

0x05 ZMQVan

ZMQVan — это реализация Van на основе zeromq, то есть низкоуровневые детали соединения реализованы с помощью библиотеки zmq (библиотека zmq — это библиотека с открытым исходным кодом, которая обеспечивает отличную инкапсуляцию сокетов, что делает программирование сокетов более простым, более лаконичнее и эффективнее).

5.1 Определения

ZMQVan определяется следующим образом:

ZMQVan наследуется от Van и добавляет к этому классу две переменные-члены:

  • unordered_map senders_ : senders_ — это коллекция, то есть коллекция сокетов, отправленных этим узлом, то есть сопоставление между идентификатором узла и сокетом. Например, если узел 8 хочет отправить сообщение узлу 9, просто найдите комбинацию (9, socket_9), а затем вызовите socket_9.send(message),
  • void *receiver_ = nullptr : это соединение сокета, полученное функцией Bind.Поскольку это приемник, существует только один сокет.

детали следующим образом:

class ZMQVan : public Van {
  void *context_ = nullptr;
  /**
   * \brief node_id to the socket for sending data to this node
   */
  std::unordered_map<int, void*> senders_;
  std::mutex mu_;
  void *receiver_ = nullptr;
};

5.2 Функционал фургона

Класс Van имеет следующие функции, которые будут вызываться ZMQVan или ZMQVan.

5.2.1 Отправка сообщения

Функция Send вызывает функцию SendMsg ZMQVan для отправки сообщения.После отправки, если установлен механизм ACK, будет вызван resender_->AddOutgoing.

int Van::Send(const Message& msg) {
  int send_bytes = SendMsg(msg);
  CHECK_NE(send_bytes, -1);
  send_bytes_ += send_bytes;
  if (resender_) resender_->AddOutgoing(msg);
  if (Postoffice::Get()->verbose() >= 2) {
    PS_VLOG(2) << msg.DebugString();
  }
  return send_bytes;
}

5.2.2 Метакласс

Мета инкапсулирует метаданные, отправителя, получателя, отметку времени, запрос или ответ и т. д.

/**
 * \brief meta info of a message
 */
struct Meta {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief an int head */
  int head;
  /** \brief the unique id of the application of messsage is for*/
  int app_id;
  /** \brief customer id*/
  int customer_id;
  /** \brief the timestamp of this message */
  int timestamp;
  /** \brief the node id of the sender of this message */
  int sender;
  /** \brief the node id of the receiver of this message */
  int recver;
  /** \brief whether or not this is a request message*/
  bool request;
  /** \brief whether or not a push message */
  bool push;
  /** \brief whether or not a pull message */
  bool pull;
  /** \brief whether or not it's for SimpleApp */
  bool simple_app;
  /** \brief an string body */
  std::string body;
  /** \brief data type of message.data[i] */
  std::vector<DataType> data_type;
  /** \brief system control message */
  Control control;
  /** \brief the byte size */
  int data_size = 0;
  /** \brief message priority */
  int priority = 0;
};

Чтобы уменьшить давление связи, ps-lite использует Protobuf для сжатия метаданных.

5.2.3 Сжатие метаданных

Это сжатие данных согласно protobuf.

void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) {
  // convert into protobuf
  PBMeta pb;
  pb.set_head(meta.head);
  if (meta.app_id != Meta::kEmpty) pb.set_app_id(meta.app_id);
  if (meta.timestamp != Meta::kEmpty) pb.set_timestamp(meta.timestamp);
  if (meta.body.size()) pb.set_body(meta.body);
  pb.set_push(meta.push);
  pb.set_pull(meta.pull);
  pb.set_request(meta.request);
  pb.set_simple_app(meta.simple_app);
  pb.set_priority(meta.priority);
  pb.set_customer_id(meta.customer_id);
  for (auto d : meta.data_type) pb.add_data_type(d);
  if (!meta.control.empty()) {
    auto ctrl = pb.mutable_control();
    ctrl->set_cmd(meta.control.cmd);
    if (meta.control.cmd == Control::BARRIER) {
      ctrl->set_barrier_group(meta.control.barrier_group);
    } else if (meta.control.cmd == Control::ACK) {
      ctrl->set_msg_sig(meta.control.msg_sig);
    }
    for (const auto& n : meta.control.node) {
      auto p = ctrl->add_node();
      p->set_id(n.id);
      p->set_role(n.role);
      p->set_port(n.port);
      p->set_hostname(n.hostname);
      p->set_is_recovery(n.is_recovery);
      p->set_customer_id(n.customer_id);
    }
  }
​
  // to string
  *buf_size = pb.ByteSize();
  *meta_buf = new char[*buf_size + 1];
  CHECK(pb.SerializeToArray(*meta_buf, *buf_size))
      << "failed to serialize protobuf";
}
​

5.2.3 Распаковка UnpackMeta

Разархивируйте его в соответствии с предварительно сгенерированным форматом PBMeta protobuf.

void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) {
  // to protobuf
  PBMeta pb;
  CHECK(pb.ParseFromArray(meta_buf, buf_size))
      << "failed to parse string into protobuf";
​
  // to meta
  meta->head = pb.head();
  meta->app_id = pb.has_app_id() ? pb.app_id() : Meta::kEmpty;
  meta->timestamp = pb.has_timestamp() ? pb.timestamp() : Meta::kEmpty;
  meta->request = pb.request();
  meta->push = pb.push();
  meta->pull = pb.pull();
  meta->simple_app = pb.simple_app();
  meta->priority = pb.priority();
  meta->body = pb.body();
  meta->customer_id = pb.customer_id();
  meta->data_type.resize(pb.data_type_size());
  for (int i = 0; i < pb.data_type_size(); ++i) {
    meta->data_type[i] = static_cast<DataType>(pb.data_type(i));
  }
  if (pb.has_control()) {
    const auto& ctrl = pb.control();
    meta->control.cmd = static_cast<Control::Command>(ctrl.cmd());
    meta->control.barrier_group = ctrl.barrier_group();
    meta->control.msg_sig = ctrl.msg_sig();
    for (int i = 0; i < ctrl.node_size(); ++i) {
      const auto& p = ctrl.node(i);
      Node n;
      n.role = static_cast<Node::Role>(p.role());
      n.port = p.port();
      n.hostname = p.hostname();
      n.id = p.has_id() ? p.id() : Node::kEmpty;
      n.is_recovery = p.is_recovery();
      n.customer_id = p.customer_id();
      meta->control.node.push_back(n);
    }
  } else {
    meta->control.cmd = Control::EMPTY;
  }
}

5.2.4 PackMetaPB

Судя по комментариям, PackMetaPB был представлен ByteDance и в основном используется для ibverbs_van.h, поэтому мы не будем проводить глубокое исследование.

void Van::PackMetaPB(const Meta& meta, PBMeta* pb) {
  pb->set_head(meta.head);
  if (meta.app_id != Meta::kEmpty) pb->set_app_id(meta.app_id);
  if (meta.timestamp != Meta::kEmpty) pb->set_timestamp(meta.timestamp);
  if (meta.body.size()) pb->set_body(meta.body);
  pb->set_push(meta.push);
  pb->set_request(meta.request);
  pb->set_simple_app(meta.simple_app);
  pb->set_priority(meta.priority);
  pb->set_customer_id(meta.customer_id);
  for (auto d : meta.data_type) pb->add_data_type(d);
  if (!meta.control.empty()) {
    auto ctrl = pb->mutable_control();
    ctrl->set_cmd(meta.control.cmd);
    if (meta.control.cmd == Control::BARRIER) {
      ctrl->set_barrier_group(meta.control.barrier_group);
    } else if (meta.control.cmd == Control::ACK) {
      ctrl->set_msg_sig(meta.control.msg_sig);
    }
    for (const auto& n : meta.control.node) {
      auto p = ctrl->add_node();
      p->set_id(n.id);
      p->set_role(n.role);
      p->set_port(n.port);
      p->set_hostname(n.hostname);
      p->set_is_recovery(n.is_recovery);
      p->set_customer_id(n.customer_id);
    }
  }
  pb->set_data_size(meta.data_size);
}

5.3 Производные функции ZMQVan

ZMQVan имеет следующие важные производные функции.

5.3.1 Bind

Логика привязки следующая:

  • Используйте zmq_bind(), чтобы связать сокет с узлом локальной сети (конечной точкой) и начать получать сообщения, отправленные на этот узел.
  • Информация об адресе узла представляет собой строку, состоящую из протокола ://, за которым следует адрес.
  • Функция Bind решит, следует ли включить режим ipc или режим tcp в соответствии с настроенной переменной «DMLC_LOCAL», чтобы настроить информацию об адресе узла.
  • Если он вызывается узлом расписания, порт указывать не нужно, но для работы и сервера нужно найти локально доступный порт самостоятельно.
  • При поиске порта устанавливается максимальное количество попыток.
  int Bind(const Node& node, int max_retry) override {
    receiver_ = zmq_socket(context_, ZMQ_ROUTER);
    int local = GetEnv("DMLC_LOCAL", 0);
    std::string hostname = node.hostname.empty() ? "*" : node.hostname;
    int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0);
    if (use_kubernetes > 0 && node.role == Node::SCHEDULER) {
      hostname = "0.0.0.0";
    }
    std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":";
    int port = node.port;
    unsigned seed = static_cast<unsigned>(time(NULL) + port);
    for (int i = 0; i < max_retry + 1; ++i) {
      auto address = addr + std::to_string(port);
      if (zmq_bind(receiver_, address.c_str()) == 0) break;
      if (i == max_retry) {
        port = -1;
      } else {
        port = 10000 + rand_r(&seed) % 40000;
      }
    }
    return port;
  }

5.3.2 Connect

Главное инициализировать Sender_, логика следующая:

  • Если соответствующий сокет найден, сокет закрывается.
  • Если обнаружится, что воркер отправляет в такой же вид, или сервер отправляет в такой же вид, а не сам себе (Scheduler может отправить сам себе), то он вернется.
  • Создайте сокет ZMQ и назначьте только что созданный сокет отправителю в виде непрозрачного указателя.
  • Если это планировщик, настройте сокет и привяжите его собственный идентификатор к сокету.
  • Подключите сокет отправителя к целевому адресу.
  • Сохраните сокет целевого id, то есть добавьте сокет в Sender_.

детали следующим образом:

void Connect(const Node& node) override {
    int id = node.id;
    auto it = senders_.find(id);
    if (it != senders_.end()) {
      zmq_close(it->second); // 如果找到了对应socket就关闭socket
    }
    // worker doesn't need to connect to the other workers. same for server
    if ((node.role == my_node_.role) && (node.id != my_node_.id)) {
      return;
    }
    void *sender = zmq_socket(context_, ZMQ_DEALER); //建立一个socket
​
    //如果本身是scheduler,则一开始就是知道自己的id = 1,所以这个if条件就是说把自己的id绑定到socket上
    if (my_node_.id != Node::kEmpty) {
      std::string my_id = "ps" + std::to_string(my_node_.id);
      zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size());
      const char* watermark = Environment::Get()->find("DMLC_PS_WATER_MARK");
      if (watermark) {
        const int hwm = atoi(watermark);
        zmq_setsockopt(sender, ZMQ_SNDHWM, &hwm, sizeof(hwm));
      }
    }
    // connect
    std::string addr = "tcp://" + node.hostname + ":" + std::to_string(node.port);
    if (GetEnv("DMLC_LOCAL", 0)) {
      addr = "ipc:///tmp/" + std::to_string(node.port);
    }
    if (zmq_connect(sender, addr.c_str()) != 0) { //将sender这个socket和目标地址连接
      LOG(FATAL) <<  "connect to " + addr + " failed: " + zmq_strerror(errno);
    }
    senders_[id] = sender; //将目标id的socket存放起来后面使用
}

5.3.3 SendMsg

Логика следующая:

  • Найдите ранее зарезервированный сокет из сохраненного sender_;
  • сжать мета;
  • отправить мета;
  • Отправлять данные сегментами в цикле;
  int SendMsg(const Message& msg) override {
    std::lock_guard<std::mutex> lk(mu_);
    // find the socket
    int id = msg.meta.recver;
    CHECK_NE(id, Meta::kEmpty);
    auto it = senders_.find(id);
    if (it == senders_.end()) {
      LOG(WARNING) << "there is no socket to node " << id;
      return -1;
    }
    void *socket = it->second;
​
    // send meta
    int meta_size; char* meta_buf;
    PackMeta(msg.meta, &meta_buf, &meta_size);
    int tag = ZMQ_SNDMORE;
    int n = msg.data.size();
    if (n == 0) tag = 0;
    zmq_msg_t meta_msg;
    zmq_msg_init_data(&meta_msg, meta_buf, meta_size, FreeData, NULL);
    while (true) {
      if (zmq_msg_send(&meta_msg, socket, tag) == meta_size) break;
      if (errno == EINTR) continue;
      return -1;
    }
    // zmq_msg_close(&meta_msg);
    int send_bytes = meta_size;
    // send data
    for (int i = 0; i < n; ++i) {
      zmq_msg_t data_msg;
      SArray<char>* data = new SArray<char>(msg.data[i]);
      int data_size = data->size();
      zmq_msg_init_data(&data_msg, data->data(), data->size(), FreeData, data);
      if (i == n - 1) tag = 0;
      while (true) {
        if (zmq_msg_send(&data_msg, socket, tag) == data_size) break;
        if (errno == EINTR) continue;
        return -1;
      }
      // zmq_msg_close(&data_msg);
      send_bytes += data_size;
    }
    return send_bytes;
  }
​

5.3.4 RecvMsg

RecvMsg должен принимать сообщения на привязанном порту.

При получении сообщения он будет оценивать количество сообщений, а затем выполнять другую обработку.

  int RecvMsg(Message* msg) override {
    msg->data.clear();
    size_t recv_bytes = 0;
    for (int i = 0; ; ++i) {
      zmq_msg_t* zmsg = new zmq_msg_t;
      CHECK(zmq_msg_init(zmsg) == 0) << zmq_strerror(errno);
      while (true) {
        if (zmq_msg_recv(zmsg, receiver_, 0) != -1) break;
        if (errno == EINTR) {
          std::cout << "interrupted";
          continue;
        }
        return -1;
      }
      char* buf = CHECK_NOTNULL((char *)zmq_msg_data(zmsg));
      size_t size = zmq_msg_size(zmsg);
      recv_bytes += size;
​
      if (i == 0) {
        // identify
        msg->meta.sender = GetNodeID(buf, size);
        msg->meta.recver = my_node_.id;
        CHECK(zmq_msg_more(zmsg));
        zmq_msg_close(zmsg);
        delete zmsg;
      } else if (i == 1) {
        // task
        UnpackMeta(buf, size, &(msg->meta));
        zmq_msg_close(zmsg);
        bool more = zmq_msg_more(zmsg);
        delete zmsg;
        if (!more) break;
      } else {
        // zero-copy
        SArray<char> data;
        data.reset(buf, size, [zmsg, size](char* buf) {
          zmq_msg_close(zmsg);
          delete zmsg;
        });
        msg->data.push_back(data);
        if (!zmq_msg_more(zmsg)) {
          break;
        }
      }
    }
    return recv_bytes;
  }
​

Функция GetNodeID

  /**
   * return the node id given the received identity
   * \return -1 if not find
   */
  int GetNodeID(const char* buf, size_t size) {
    if (size > 2 && buf[0] == 'p' && buf[1] == 's') {
      int id = 0;
      size_t i = 2;
      for (; i < size; ++i) {
        if (buf[i] >= '0' && buf[i] <= '9') {
          id = id * 10 + buf[i] - '0';
        } else {
          break;
        }
      }
      if (i == size) return id;
    }
    return Meta::kEmpty;
  }

0x06 Сводка

Завершаем подведение итогов:

Когда в почтовом отделении есть адресная книга, для вывоза и доставки объектов требуется грузовик.Фургон является коммуникационным модулем всего сервера параметров, и его характеристики следующие.

  • При создании экземпляра класса PostOffice экземпляр класса Van создается как переменная-член. Экземпляр Van имеет тот же жизненный цикл, что и экземпляр PostOffice (есть только один такой объект на узел);

  • Ван отвечает за конкретную связь между узлами. В частности, он отвечает за установление взаимосвязи между узлами (например, соединение между Worker и Scheduler) и открытие локального принимающего потока для отслеживания полученного сообщения.

  • Функция функции инициализации объекта Van состоит в выполнении различных настроек в соответствии с типом локального узла, чтобы запустить порт, установить соединение между локальным узлом и планировщиком, запустить поток приема сообщений, пульс поток и т. д., так что связь может быть выполнена.

  • Сервер параметров принимает/обрабатывает сообщения в фоновом потоке Receiver_thread_. В дополнение к сообщениям данных, которые передают параметры, управляющая информация между каждым узлом включает:

    • ADD_NODE: рабочие и серверные узлы регистрируются с помощью планировщика;
    • БАРЬЕР: синхронные блокирующие сообщения между узлами;
    • HEARTBEAT: сигнал сердцебиения между узлами;
    • TERMINATE: сигнал выхода узла;
    • ACK: сообщение подтверждения,Тип ACK отображается только в том случае, если включен класс Resender..
    • ПУСТОЙ: толкать или тянуть;

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

Если вы хотите получать своевременные новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.

ссылка 0xFF

Начало работы с распределенным машинным обучением — принцип логистической регрессии на основе сервера параметров

[Распределенный] Анализ экземпляров распределенных вычислений на основе ps-lite

анализ исходного кода ps-lite

Официальная краткая инструкция по использованию

Анализ исходного кода PS-Lite — KangRoger

Анализ исходного кода ps-lite - zybuluo

примечания к коду ps-lite-willzhang