[анализ исходного кода] сервер параметров машинного обучения ps-lite(4) ----- реализация узла приложения

машинное обучение глубокое обучение задняя часть

0x00 сводка

Эта статья является четвертой частью сервера параметров, знакомящей с KVWorker, KVServer.

KVWorker и KVServer — это абстракции узлов Server/Worker соответственно, которые запускаются Van ---> Customer ---> recv_handle_ как часть движка.

В этой статье сначала будут представлены некоторые базовые классы поддержки, затем представлен базовый класс SimpleApp сервера/рабочего и, наконец, представлена ​​конкретная реализация сервера/рабочего.

Общая блок-схема заранее испорчена следующим образом:

0x01 базовый класс

Сначала нам нужно ввести некоторые базовые классы.

1.1 Range

Роль класса Range состоит в том, чтобы определить, на каком сервере находится извлекаемый параметр, на основе этого диапазона и диапазона ключа, соответствующего серверу.

Класс Range предоставляет следующие функции:

  • Позиции двух uint64 для begin() и end();
  • size() получает размер этого диапазона, т.е. end_ - begin_;
class Range {
 public:
  Range() : Range(0, 0) {}
  Range(uint64_t begin, uint64_t end) : begin_(begin), end_(end) { }
​
  uint64_t begin() const { return begin_; }
  uint64_t end() const { return end_; }
  uint64_t size() const { return end_ - begin_; }
 private:
  uint64_t begin_;
  uint64_t end_;
};

1.2 TreadsafeQueue

TreadsafeQueue — это очередь, которую могут читать несколько потоков. Она обеспечивает безопасность потоков благодаря взаимодействию блокировок и условных величин и используется в качестве очереди сообщений.

/**
 * \brief thread-safe queue allowing push and waited pop
 */
class ThreadsafePQueue {
 public:
  ThreadsafePQueue() { }
  ~ThreadsafePQueue() { }
​
  /**
   * \brief push an value into the end. threadsafe.
   * \param new_value the value
   */
  void Push(Message new_value) {
    mu_.lock();
    queue_.push(std::move(new_value));
    mu_.unlock();
    cond_.notify_all();
  }
​
  /**
   * \brief wait until pop an element from the beginning, threadsafe
   * \param value the poped value
   */
  void WaitAndPop(Message* value) { // 等待队列不为空,按照优先级pop message
    std::unique_lock<std::mutex> lk(mu_);
    cond_.wait(lk, [this]{return !queue_.empty();});
    *value = std::move(queue_.top());
    queue_.pop();
  }
​
 private:
  class Compare {
   public:
    bool operator()(const Message &l, const Message &r) {
      return l.meta.priority <= r.meta.priority;
    }
  };
  mutable std::mutex mu_; //数据同步互斥变量
  std::priority_queue<Message, std::vector<Message>, Compare> queue_; // message优先队列
  std::condition_variable cond_; //队列不为空条件变量
};

0x02 SimpleApp

2.1 Обзор

SimpleApp — это базовый класс, который создает унифицированную абстракцию функций узла приложения.

  • Предоставляет базовые функции отправки и простые функции обработки сообщений (запрос, ожидание, ответ).
  • Типы сообщений: int head и string body.
  • Он имеет 2 производных класса. KVServer и KVWorker.

2.2 Определения

2.2.1 Класс поддержки

SimpleData определяет базовый формат запроса и ответа.

struct SimpleData {
  /** \brief the int head */
  int head;
  /** \brief the string body */
  std::string body;
  /** \brief sender's node id */
  int sender;
  /** \brief the associated timestamp */
  int timestamp;
  /** \brief sender's customer id */
  int customer_id;
};

2.2.2 Переменные-члены

SimpleApp в основном имеет следующие переменные-члены:

  • Customer* obj_ : Клиент этого Приложения, который управляет соединением запроса;
  • Handle request_handle_ : функция обработки запроса;
  • Handle response_handle_ : функция обработки ответа;
  • set_request_handle, set_response_handle: набор членовrequest_handle_, response_handle_. Когда клиент вызывает SimpleApp::Process, он определяет, является ли это запросом или ответом, в соответствии с индикаторной переменной в message.meta и вызывает соответствующий дескриптор для обработки;
class SimpleApp {
 public:
  /**
   * \brief constructor
   * @param app_id the app id, should match with the remote node app with which this app
   * @param customer_id the customer_id, should be node-locally unique
   * is communicated
   */
  explicit SimpleApp(int app_id, int customer_id);
​
  /** \brief deconstructor */
  virtual ~SimpleApp() { delete obj_; obj_ = nullptr; }
​
  /**
   * \brief send a request to a remote node
   *
   * \param req_head request head
   * \param req_body request body
   * \param recv_id remote node id
   *
   * @return the timestamp of this request
   */
  virtual inline int Request(int req_head, const std::string& req_body, int recv_id);
​
  /**
   * \brief wait until a request is finished
   *
   * \param timestamp
   */
  virtual inline void Wait(int timestamp) { obj_->WaitRequest(timestamp); }
​
​
  /**
   * \brief send back a response for a request
   * \param recv_req the received request
   * \param the response body
   */
  virtual inline void Response(const SimpleData& recv_req, const std::string& res_body = "");
​
  /**
   * \brief the handle to proces a received request/respoonse
   *
   * \param recved the received request or response
   * \param app this pointer
   */
  using Handle = std::function<void(const SimpleData& recved, SimpleApp* app)>;
​
  /**
   * \brief set the request handle
   * \param request_handle the request handle
   */
  virtual inline void set_request_handle(const Handle& request_handle) {
    CHECK(request_handle) << "invalid request handle";
    request_handle_ = request_handle;
  }
​
  /**
   * \brief set the response handle
   * \param response_handle the response handle
   */
  virtual inline void set_response_handle(const Handle& response_handle) {
    CHECK(response_handle) << "invalid response handle";
    response_handle_ = response_handle;
  }
​
  /**
   * \brief returns the customer
   */
  virtual inline Customer* get_customer() { return obj_; }
​
 protected:
  /** \brief empty construct */
  inline SimpleApp() : obj_(nullptr) {
    request_handle_ = [](const SimpleData& recved, SimpleApp* app) {
      app->Response(recved);
    };
    response_handle_ = [](const SimpleData& recved, SimpleApp* app) { };
  }
​
  /** \brief process a received message */
  virtual inline void Process(const Message& msg);
​
  /** \brief ps internal object */
  Customer* obj_;
​
 private:
  /** \brief request handle */
  Handle request_handle_;
  /** \brief request handle */
  Handle response_handle_;
};

2.3 Функциональная функция

Вот три простые функции:

Просьба позвонить Ване, чтобы отправить сообщение.

inline int SimpleApp::Request(int req_head, const std::string& req_body, int recv_id) {
  // setup message
  Message msg;
  msg.meta.head = req_head;
  if (req_body.size()) msg.meta.body = req_body;
  int ts = obj_->NewRequest(recv_id);
  msg.meta.timestamp = ts;
  msg.meta.request = true;
  msg.meta.simple_app = true;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = obj_->customer_id();
​
  // send
  for (int r : Postoffice::Get()->GetNodeIDs(recv_id)) {
    msg.meta.recver = r;
    Postoffice::Get()->van()->Send(msg);
  }
  return ts;
}

Ответ — это призыв к Вану ответить на сообщение.

inline void SimpleApp::Response(const SimpleData& req, const std::string& res_body) {
  // setup message
  Message msg;
  msg.meta.head = req.head;
  if (res_body.size()) msg.meta.body = res_body;
  msg.meta.timestamp = req.timestamp;
  msg.meta.request = false;
  msg.meta.simple_app = true;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = req.customer_id;
  msg.meta.recver = req.sender;
​
  // send
  Postoffice::Get()->van()->Send(msg);
}

Функция Process определяет, является ли это запросом или ответом, в соответствии с индикаторной переменной в message.meta и вызывает соответствующий дескриптор для обработки.

inline void SimpleApp::Process(const Message& msg) {
  SimpleData recv;
  recv.sender    = msg.meta.sender;
  recv.head      = msg.meta.head;
  recv.body      = msg.meta.body;
  recv.timestamp = msg.meta.timestamp;
  recv.customer_id = msg.meta.customer_id;
  if (msg.meta.request) { // 判断是request还是response,调用相应handle处理
    CHECK(request_handle_);
    request_handle_(recv, this);
  } else {
    CHECK(response_handle_);
    response_handle_(recv, this);
  }
}

0x03 KVServer

KVServer — это абстракция узла Сервер, его рольББ,обрабатывать информацию,вернуть результатТри шага, основные функции:

  • Поддерживать данные о парах ключ-значение;

  • Обрабатывать и отвечать на push- и pull-запросы клиентов;

    • функцияrequest_handle_Обработать запрос:

      • При вызове KVServer::Process будет вызыватьсяrequest_handle_.
      • request_handle_По умолчаниюKVServerDefaultHandle.
    • функцияResponseдля возврата данных;

3.1 Определения

request_handle_ — это функция обработки запросов, которую необходимо настроить.

  • В этой функции обратного вызова пользователю необходимо реализовать различные функции оптимизатора.Алгоритм обновления градиента веса модели и операция возврата веса модели.
  • Вы можете напрямую обратиться к версии KVServerDefaultHandle по умолчанию, реализованной ps-lite.
/**
 * \brief A server node for maintaining key-value pairs
 */
template <typename Val>
class KVServer : public SimpleApp {
 public:
  /**
   * \brief constructor
   * \param app_id the app id, should match with \ref KVWorker's id
   */
  explicit KVServer(int app_id) : SimpleApp() {
    using namespace std::placeholders;
    obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
  }
​
  /** \brief deconstructor */
  virtual ~KVServer() { delete obj_; obj_ = nullptr; }
​
  /**
   * \brief the handle to process a push/pull request from a worker
   * \param req_meta meta-info of this request
   * \param req_data kv pairs of this request
   * \param server this pointer
   */
  using ReqHandle = std::function<void(const KVMeta& req_meta,
                                       const KVPairs<Val>& req_data,
                                       KVServer* server)>;
  void set_request_handle(const ReqHandle& request_handle) {
    CHECK(request_handle) << "invalid request handle";
    request_handle_ = request_handle;
  }
​
  /**
   * \brief response to the push/pull request
   * \param req the meta-info of the request
   * \param res the kv pairs that will send back to the worker
   */
  void Response(const KVMeta& req, const KVPairs<Val>& res = KVPairs<Val>());
​
 private:
  /** \brief internal receive handle */
  void Process(const Message& msg);
  /** \brief request handle */
  ReqHandle request_handle_; // 需要用户自己实现
};
​

3.2 Функциональная функция

3.2.1 Response

Response()Он предназначен для отправки ответной информации вызываемому работнику. По сравнению с SimpleApp обнаружено, что KVServer имеет новую обработку для головы и тела.

Следует отметить, что функция Response должна определяться пользователем.request_handle_позвонить, т.е.request_handle_Обработайте полученное сообщение, а затем вызовите Response, чтобы ответить рабочему процессу.

template <typename Val>
void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) {
  Message msg;
  msg.meta.app_id = obj_->app_id();
  msg.meta.customer_id = req.customer_id;
  msg.meta.request     = false;
  msg.meta.push        = req.push;
  msg.meta.pull        = req.pull;
  msg.meta.head        = req.cmd;
  msg.meta.timestamp   = req.timestamp;
  msg.meta.recver      = req.sender;
  if (res.keys.size()) {
    msg.AddData(res.keys);
    msg.AddData(res.vals);
    if (res.lens.size()) {
      msg.AddData(res.lens);
    }
  }
  Postoffice::Get()->van()->Send(msg);
}

3.2.2 Process

Process()Зарегистрировано в объекте Customer, когда принимающий поток объекта Customer получает сообщение, он вызываетProcess()Обработайте данные.

Process()Внутренняя логика такова:

  • Извлеките метаданные сообщения и создайте KVMeta.
  • Видно, что в Process нет обслуживания данных KV.
  • Процесс вызывает request_handle_ (объект функции std::function), реализованный пользователем для обработки данных.
  • В callback-функции request_handle_ пользователю нужно реализовать различные оптимизаторыАлгоритм обновления градиента веса модели и операция возврата веса модели.
template <typename Val>
void KVServer<Val>::Process(const Message& msg) {
  if (msg.meta.simple_app) {
    SimpleApp::Process(msg); return;
  }
  KVMeta meta;
  meta.cmd       = msg.meta.head;
  meta.push      = msg.meta.push;
  meta.pull      = msg.meta.pull;
  meta.sender    = msg.meta.sender;
  meta.timestamp = msg.meta.timestamp;
  meta.customer_id = msg.meta.customer_id;
  KVPairs<Val> data;
  int n = msg.data.size();
  if (n) {
    CHECK_GE(n, 2);
    data.keys = msg.data[0];
    data.vals = msg.data[1];
    if (n > 2) {
      CHECK_EQ(n, 3);
      data.lens = msg.data[2];
      CHECK_EQ(data.lens.size(), data.keys.size());
    }
  }
  CHECK(request_handle_);
  request_handle_(meta, data, this);
}

3.2.3 Примеры функций

KVServerDefaultHandle — это пример, предоставленный ps-lite для демонстрации того, как поддерживать KV, обрабатывать сообщения и возвращать запросы.

Здесь поддерживается хэш-таблица unordered_map, в которой записываются ключ и значение, и она отвечает на запросы push и pull.

Используйте хранилище std::unordered_map для сохранения параметров сервера, обновления параметров хранилища при отправке запроса и извлечении параметров при получении запроса;

/**
 * \brief an example handle adding pushed kv into store
 */
template <typename Val>
struct KVServerDefaultHandle {
  void operator()(
      const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) {
    size_t n = req_data.keys.size();
    KVPairs<Val> res;
    if (!req_meta.pull) {
      CHECK_EQ(n, req_data.vals.size());
    } else {
      res.keys = req_data.keys; res.vals.resize(n);
    }
    for (size_t i = 0; i < n; ++i) {
      Key key = req_data.keys[i];
      if (req_meta.push) {
        store[key] += req_data.vals[i];
      }
      if (req_meta.pull) {
        res.vals[i] = store[key];
      }
    }
    server->Response(req_meta, res);
  }
  std::unordered_map<Key, Val> store;
};
​

3.2.4 Процесс

Затем мы продолжаем разбираться с процессом уточнения, описанным выше.

  • Рабочий узел или серверный узел будут выполняться в начале программы.Postoffice::start().

  • Postoffice::start()инициализирует информацию об узле и вызоветVan::start().

  • Каждый узел прослушивает локальный порт; подключенный узел уже подключен при запуске.

  • Van::start()Запустите локальный поток специально для получения информации о сокете, используйтеVan::Receiving()непрерывно контролировать полученное сообщение.

    • receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
      
  • Van::Receiving()Получив пост-сообщение, выполняйте разные действия по разным командам. Для сообщений данных, если требуется дальнейшая обработка, будет вызываться ProcessDataMsg:

    • Найдите клиента по идентификатору приложения в сообщении (каждая задача приложения будет привязана к пользовательскому классу), то есть сообщение будет отправлено в поток recv разных клиентов в соответствии с идентификатором клиента.
    • передать сообщениеCustomer::Acceptфункция.
  • Функция Customer::Accept() добавляет сообщение в очередьrecv_queue_;

  • Сам объект Customer также запускает принимающий поток.recv_thread_, используя Customer::Receive() :

    • непрерывно отrecv_queue_Очередь для получения сообщений.
    • Если (!recv.meta.request) означает ответ, тоtracker_[req.timestamp].second++
    • Позвоните зарегистрированному пользователюrecv_handle_Функция обрабатывает сообщение.
  • Для рабочих зарегистрированrecv_handle_даKVWorker::Process()функция. Поскольку сообщения, полученные рабочим потоком recv, в основном представляют собой пары KV, извлеченные с сервера, этоProcess()В основном для получения пары KV в сообщении;

  • Для Сервера он зарегистрированrecv_handle_даKVServer::Process()функция.

  • Потому что здесь мы KVServer, а сервер принимает KV-пару, проталкиваемую воркерами, которую нужно обработать, поэтомуProcess()Пользователь в функции, вызываемой черезKVServer::set_request_handle()Переданный объект функции.

  • В определяемой пользователем функции request_handle_, если вам нужно отправить ответ рабочему процессу, вызовите KVServer::Response.

Текущая логика показана ниже: на шаге 8 recv_handle_ указывает на KVServer::Process или KVWorker::Process (эта секция server, поэтому она соответствует KVServer::Process). На шаге 10 верните ответ работнику.

            +--------------------------+
            | Van                      |
            |                          |
Request +----------->  Receiving       |
            |  1           +           |             +---------------------------+
            |              |           |             | Postoffice                |
            |              | 2         |             |                           |
            |              v           | GetCustomer |                           |
            |        ProcessDataMsg <------------------> unordered_map customers_|
            |              +           |      3      |                           |
            |              |           |             +---------------------------+
            +--------------------------+
                           |
                           | 4
                           |
 +------------------------------------+
 | Customer                |          |
 |                         |          |
 |                         v          |
 |                      Accept        |
 |                         +          |
 |                         |          |
 |                         | 5        |
 |                         v          |
 |                    recv_queue_     |                +------------------+
 |                         +          |                |KVWorker          |
 |                         | 6        |     +--------> |                  |
 |                         |          |     |    8     |         Process  |
 |                         v          |     |          +------------------+
 |  recv_thread_ +---> Receiving      |     |
 |                         +          |     |
 |                         | 7        |     |
 |                         |          |     |          +------------------+
 |                         v          |     |          |KVServer          |
 |                    recv_handle_+---------+--------> |                  |
 |                                    |          8     |         Process  |
 +------------------------------------+                |           +      |
                                                       +------------------+
                                                                   |
                                                                   |  9
                                                                   v
                                                       +-----------+-------+
                                                       | request_handle_   |
                                    10                 |                   |
Response <----------------------------------------------------+ Response   |
                                                       |                   |
                                                       +-------------------+
​

0x04 KVWorker

4.1 Обзор

KVWorker используется для передачи и извлечения пар ключ-значение на серверные узлы, которые представляют собой различные параметры, которые необходимо обрабатывать параллельно в процессе алгоритма.

  • Операции push и pull в Worker асинхронно возвращают идентификатор, а затем используют идентификатор для блокировки и ожидания, то есть синхронной операции.
  • Или передайте обратный вызов для последующих операций при асинхронном вызове.

4.2 Определения

Основные переменные KVWorker:

  • std::unordered_map> recv_kvs : Полученный результат извлечения: значение kv ;
  • std::unordered_map callbacks: функция обратного вызова, выполняемая после получения всех ответов на запрос;
  • Slicer slicer_ : переменная функции среза по умолчанию, эта функция нарезает KVPairs в соответствии с диапазоном каждого сервера при вызове функции отправки;

Основная функция:

  • Функция ZPush с нулевым копированием
  • ZPull функция вытягивания с нулевым копированием
  • Функция реорганизации ключей AddPullCB
  • Обработчик сообщений процесса
  • Обработчик среза по умолчанию DefaultSlicer
  • set_slicer: устанавливает элемент slicer_, который нарезает KVPairs в соответствии с диапазоном каждого сервера при вызове функции отправки;
/**
 * \brief A worker node that can \ref Push (\ref Pull) key-value pairs to (from) server
 * nodes
 *
 * \tparam Val the type of value, which should be primitive types such as
 * int32_t and float
 */
template<typename Val>
class KVWorker : public SimpleApp {
 public:
  /** avoid too many this-> */
  using SimpleApp::obj_; // Customer 对象
  /**
   * \brief callback function for \ref Push and \ref Pull
   *
   * It is called by the data receiving thread of this instance when the push or
   * pull is actually finished. Namely the kv pairs have already written into
   * servers' data structure or the kv pairs have already pulled back.
   */
  using Callback = std::function<void()>;
​
  /**
   * \brief constructor
   *
   * \param app_id the app id, should match with \ref KVServer's id
   * \param customer_id the customer id which is unique locally
   */
  explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
    using namespace std::placeholders;
    slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
    obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
  }
​
  /** \brief deconstructor */
  virtual ~KVWorker() { delete obj_; obj_ = nullptr; }
​
  using SlicedKVs = std::vector<std::pair<bool, KVPairs<Val>>>;
  /**
   * \brief a slicer partitions a key-value list according to the key ranges
   * \param send the kv list for partitioning
   * \param ranges the key ranges, ranges[i] is the key range of server i
   * \param sliced the sliced lists. slices[i] should only contains keys in
   * ranges[i] and the according values
   */
  using Slicer = std::function<void(
      const KVPairs<Val>& send, const std::vector<Range>& ranges,
      SlicedKVs* sliced)>;
​
  /**
   * \brief set a user-defined slicer
   */
  void set_slicer(const Slicer& slicer) {
    CHECK(slicer); slicer_ = slicer;
  }
​
 private:
  /**
   * \brief add a callback for a request. threadsafe.
   * @param cb callback
   * @param timestamp the timestamp of the request
   */
  void AddCallback(int timestamp, const Callback& cb) {
    if (!cb) return;
    std::lock_guard<std::mutex> lk(mu_);
    callbacks_[timestamp] = cb;
  }
​
  /** \brief data buffer for received kvs for each timestamp */
  std::unordered_map<int, std::vector<KVPairs<Val>>> recv_kvs_; // 收到的 kv value 
  /** \brief callbacks for each timestamp */
  std::unordered_map<int, Callback> callbacks_; // 收到 request 的所有 response 之后执行的回调函数
  /** \brief lock */
  std::mutex mu_;
  /** \brief kv list slicer */
  Slicer slicer_; // 默认 slice 函数变量
};

4.3 Функциональная функция

4.3.1 Push & ZPush

Поскольку Push вызывает ZPush, давайте представим их вместе.

Метод Push в основном:

  • Отправьте данные (список KV) на соответствующий серверный узел;

  • Список KV отправляется по разделам в соответствии с диапазоном ключей, поддерживаемым каждым сервером;

  • Push — это прямой асинхронный возврат. Если вы хотите узнать, какой результат возвращается, вы можете:

    • Используйте Wait для ожидания, то есть используйтеtracker_Чтобы записать количество отправленных запросов и соответствующее количество ответных запросов, когда отправленная сумма равна полученной сумме, это означает, что каждый запрос был успешно отправлен, чтобы достичьСинхронныйЦель;
    • Используйте обратный вызов, чтобы вы могли перезвонить, когда закончите.

Метод ZPush:

  • Используйте метод NewRequest объекта obj_ (тип Customer) для записи количества отправленных запросов и соответствующего количества запросов ответа, а также возврата метки времени;
  • Установите обратный вызов, соответствующий метке времени;
  • Создайте объект KVPair, используя переданные параметры, и вызовите Send, чтобы отправить объект;
  int Push(const std::vector<Key>& keys,
           const std::vector<Val>& vals,
           const std::vector<int>& lens = {},
           int cmd = 0,
           const Callback& cb = nullptr,
           int priority = 0) {
    return ZPush(
        SArray<Key>(keys), SArray<Val>(vals), SArray<int>(lens), cmd, cb,
        priority);
  }
  
  int ZPush(const SArray<Key>& keys,
            const SArray<Val>& vals,
            const SArray<int>& lens = {},
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = obj_->NewRequest(kServerGroup);
    AddCallback(ts, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.lens = lens;
    kvs.priority = priority;
    Send(ts, true, false, cmd, kvs);
    return ts;
  }  
​

Как позвонить можно посмотреть в его комментарии:

   * Sample usage: the following codes push two KV pairs `{1, (1.1, 1.2)}` and `{3,
   * (3.1,3.2)}` to server nodes, where the value is a length-2 float vector
   * \code
   *   KVWorker<float> w;
   *   std::vector<Key> keys = {1, 3};
   *   std::vector<float> vals = {1.1, 1.2, 3.1, 3.2};
   *   w.Push(keys, vals);
   * \endcode

4.3.2 Pull

Метод pull примерно аналогичен логике push:

  • Привяжите функцию обратного вызова, чтобы скопировать данные и получить метку времени.
  • Вытяните val_vector с сервера в соответствии с key_vector,
  • Наконец, верните временную метку,
  • Эта функция не блокируется, вы можете подождать с помощью worker.Wait(timestamp);
  int Pull(const std::vector<Key>& keys,
           std::vector<Val>* vals,
           std::vector<int>* lens = nullptr,
           int cmd = 0,
           const Callback& cb = nullptr,
           int priority = 0) {
    SArray<Key> skeys(keys);
    int ts = AddPullCB(skeys, vals, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = skeys;
    kvs.priority = priority;
    Send(ts, false, true, cmd, kvs);
    return ts;
  }
​

4.3.3 ZPull

Логика та же, что и у Pull, но процесс копирования в систему опущен. Поэтому необходимо убедиться, что вызывающая сторона не изменит key_vector до завершения ZPull;

  int ZPull(const SArray<Key>& keys,
            SArray<Val>* vals,
            SArray<int>* lens = nullptr,
            int cmd = 0,
            const Callback& cb = nullptr,
            int priority = 0) {
    int ts = AddPullCB(keys, vals, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.priority = priority;
    Send(ts, false, true, cmd, kvs);
    return ts;
  }
​

4.3.4 Send

Push()иPull()в конце концов позвонитSend()функция,Send()Нарезка KVPairs, потому что каждый сервер сохраняет только часть параметров, поэтому нарезанные SlicedKVpairs будут отправлены на разные серверы.

Если пропустить, обратный вызов будет вызван напрямую.

В противном случае траверс отправляется.

template <typename Val>
void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) {
  // slice the message
  SlicedKVs sliced;
  slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced);
​
  // need to add response first, since it will not always trigger the callback
  int skipped = 0;
  for (size_t i = 0; i < sliced.size(); ++i) {
    if (!sliced[i].first) ++skipped;
  }
  obj_->AddResponse(timestamp, skipped);
  if ((size_t)skipped == sliced.size()) {
    RunCallback(timestamp);
  }
​
  for (size_t i = 0; i < sliced.size(); ++i) {
    const auto& s = sliced[i];
    if (!s.first) continue;
    Message msg;
    msg.meta.app_id = obj_->app_id();
    msg.meta.customer_id = obj_->customer_id();
    msg.meta.request     = true;
    msg.meta.push        = push;
    msg.meta.pull        = pull;
    msg.meta.head        = cmd;
    msg.meta.timestamp   = timestamp;
    msg.meta.recver      = Postoffice::Get()->ServerRankToID(i);
    msg.meta.priority    = kvs.priority;
    const auto& kvs = s.second;
    if (kvs.keys.size()) {
      msg.AddData(kvs.keys);
      msg.AddData(kvs.vals);
      if (kvs.lens.size()) {
        msg.AddData(kvs.lens);
      }
    }
    Postoffice::Get()->van()->Send(msg);
  }
}
​

4.3.5 DefaultSlicer

Функция сегментации может быть переписана пользователем, по умолчаниюDefaultSlicer, каждый SlicedKVPairs упаковывается в объект Message, а затем используетсяvan::send()Отправить.

в соответствии сstd::vector& rangesИнформация о диапазоне фрагментации, которая будет фрагментировать отправляемые данные. Текущее значение по умолчанию — использоватьPostoffice::GetServerKeyRangesдля разделения диапазона осколков.

template <typename Val>
void KVWorker<Val>::DefaultSlicer(
    const KVPairs<Val>& send, const std::vector<Range>& ranges,
    typename KVWorker<Val>::SlicedKVs* sliced) {
  sliced->resize(ranges.size());
​
  // find the positions in msg.key
  size_t n = ranges.size();
  std::vector<size_t> pos(n+1);
  const Key* begin = send.keys.begin();
  const Key* end = send.keys.end();
  for (size_t i = 0; i < n; ++i) {
    if (i == 0) {
      pos[0] = std::lower_bound(begin, end, ranges[0].begin()) - begin;
      begin += pos[0];
    } else {
      CHECK_EQ(ranges[i-1].end(), ranges[i].begin());
    }
    size_t len = std::lower_bound(begin, end, ranges[i].end()) - begin;
    begin += len;
    pos[i+1] = pos[i] + len;
​
    // don't send it to servers for empty kv
    sliced->at(i).first = (len != 0);
  }
  CHECK_EQ(pos[n], send.keys.size());
  if (send.keys.empty()) return;
​
  // the length of value
  size_t k = 0, val_begin = 0, val_end = 0;
  if (send.lens.empty()) {
    k = send.vals.size() / send.keys.size();
    CHECK_EQ(k * send.keys.size(), send.vals.size());
  } else {
    CHECK_EQ(send.keys.size(), send.lens.size());
  }
​
  // slice
  for (size_t i = 0; i < n; ++i) {
    if (pos[i+1] == pos[i]) {
      sliced->at(i).first = false;
      continue;
    }
    sliced->at(i).first = true;
    auto& kv = sliced->at(i).second;
    kv.keys = send.keys.segment(pos[i], pos[i+1]);
    if (send.lens.size()) {
      kv.lens = send.lens.segment(pos[i], pos[i+1]);
      for (int l : kv.lens) val_end += l;
      kv.vals = send.vals.segment(val_begin, val_end);
      val_begin = val_end;
    } else {
      kv.vals = send.vals.segment(pos[i]*k, pos[i+1]*k);
    }
  }
}
​
​

4.3.6 PushPull & ZPushPull

Это совокупность толкать и тянуть вместе.

  int PushPull(const std::vector<Key>& keys,
               const std::vector<Val>& vals,
               std::vector<Val>* outs,
               std::vector<int>* lens = nullptr,
               int cmd = 0,
               const Callback& cb = nullptr,
               int priority = 0) {
    CHECK_NOTNULL(outs);
    if (outs->empty())
      outs->resize(vals.size());
    else
      CHECK_EQ(vals.size(), outs->size());
​
    SArray<Key> skeys(keys);
    SArray<Val> svals(vals);
    auto souts = new SArray<Val>(outs->data(), outs->size());
    SArray<int>* slens = lens ?
        new SArray<int>(lens->data(), lens->size()) : nullptr;
    int ts = ZPushPull(skeys, svals, souts, slens, cmd,
        [this, cb, souts, slens]() {
          delete souts;
          delete slens;
          if (cb) cb();
        }, priority);
    return ts;
  }
​
  int ZPushPull(const SArray<Key>& keys,
                const SArray<Val>& vals,
                SArray<Val>* outs,
                SArray<int>* lens = nullptr,
                int cmd = 0,
                const Callback& cb = nullptr,
                int priority = 0) {
    int ts = AddPullCB(keys, outs, lens, cmd, cb);
    KVPairs<Val> kvs;
    kvs.keys = keys;
    kvs.vals = vals;
    kvs.priority = priority;
    if (lens)
      kvs.lens = *lens;
    Send(ts, true, true, cmd, kvs);
    re
​

4.3.7 Обратный вызов

Настройки некоторых callback-функций упоминались ранее, давайте посмотрим, как их использовать.

4.3.7.1 Настройки

Мы видим, что для каждой метки времени установлена ​​функция обратного вызова, которая, в свою очередь, представляет собой список функций обратного вызова.

После отправки каждого запроса в этом списке регистрируется callback-функция.

  using Callback = std::function<void()>;
  
  /** \brief callbacks for each timestamp */
  std::unordered_map<int, Callback> callbacks_;  // 回调函数列表
  
  void AddCallback(int timestamp, const Callback& cb) {
    if (!cb) return;
    std::lock_guard<std::mutex> lk(mu_);
    callbacks_[timestamp] = cb; // 添加回调函数
  }
​
4.3.7.2 AddPullCB

Это функция обратного вызова, которая получает ответ после извлечения, который используется для копирования возвращенных данных.

Однако что делать, если несколько серверов должны вернуться? Будь то push или pull, только после получения всех ответов значение, полученное с каждого сервера, будет заполнено в локальномvalsвнутри.

template <typename Val>
template <typename C, typename D>
int KVWorker<Val>::AddPullCB(
    const SArray<Key>& keys, C* vals, D* lens, int cmd,
    const Callback& cb) {
  int ts = obj_->NewRequest(kServerGroup);
  AddCallback(ts, [this, ts, keys, vals, lens, cb]() mutable {
      mu_.lock();
      auto& kvs = recv_kvs_[ts];
      mu_.unlock();
​
      // do check
      size_t total_key = 0, total_val = 0;
      for (const auto& s : kvs) { // 进行有效性验证
        Range range = FindRange(keys, s.keys.front(), s.keys.back()+1);
        CHECK_EQ(range.size(), s.keys.size())
            << "unmatched keys size from one server";
        if (lens) CHECK_EQ(s.lens.size(), s.keys.size());
        total_key += s.keys.size();
        total_val += s.vals.size();
      }
      CHECK_EQ(total_key, keys.size()) << "lost some servers?";
​
      // fill vals and lens
      std::sort(kvs.begin(), kvs.end(), [](
          const KVPairs<Val>& a, const KVPairs<Val>& b) {
                  return a.keys.front() < b.keys.front();
        });
      CHECK_NOTNULL(vals);
      if (vals->empty()) {
        vals->resize(total_val);
      } else {
        CHECK_EQ(vals->size(), total_val);
      }
      Val* p_vals = vals->data();
      int *p_lens = nullptr;
      if (lens) {
        if (lens->empty()) {
          lens->resize(keys.size());
        } else {
          CHECK_EQ(lens->size(), keys.size());
        }
        p_lens = lens->data();
      }
      for (const auto& s : kvs) { // 拷贝返回的数据
        memcpy(p_vals, s.vals.data(), s.vals.size() * sizeof(Val));
        p_vals += s.vals.size();
        if (p_lens) {
          memcpy(p_lens, s.lens.data(), s.lens.size() * sizeof(int));
          p_lens += s.lens.size();
        }
      }
​
      mu_.lock();
      recv_kvs_.erase(ts);
      mu_.unlock();
      if (cb) cb();
    });
​
  return ts;
}
​
4.3.7.3 Эксплуатация

Это найти функцию обратного вызова на основе метки времени, запустить ее и удалить.

Когда звонить, он будет вызываться в процессе, скоро его представим.

template <typename Val>
void KVWorker<Val>::RunCallback(int timestamp) {
  mu_.lock();
  auto it = callbacks_.find(timestamp);
  if (it != callbacks_.end()) {
    mu_.unlock();
​
    CHECK(it->second);
    it->second();
​
    mu_.lock();
    callbacks_.erase(it);
  }
  mu_.unlock();
}
​

4.3.8 Process

Если это ответ Pull, значения, возвращаемые каждым полученным ответом, будут сохранены в первую очередь.recv_kvs_внутри,recv_kvs_[ts].push_back(kvs);

Будь то push или pull, только после получения всех ответов значение, полученное с каждого сервера, будет заполнено в локальномvalsвнутри.

template <typename Val>
void KVWorker<Val>::Process(const Message& msg) {
  if (msg.meta.simple_app) {
    SimpleApp::Process(msg); return;
  }
  // store the data for pulling
  int ts = msg.meta.timestamp;
  if (msg.meta.pull) {
    CHECK_GE(msg.data.size(), (size_t)2);
    KVPairs<Val> kvs;
    kvs.keys = msg.data[0];
    kvs.vals = msg.data[1];
    if (msg.data.size() > (size_t)2) {
      kvs.lens = msg.data[2];
    }
    mu_.lock();
    recv_kvs_[ts].push_back(kvs);
    mu_.unlock();
  }
​
  // finished, run callbacks,只有在收到了所有的Response之后
  if (obj_->NumResponse(ts) == Postoffice::Get()->num_servers() - 1)  {
    RunCallback(ts); // 在这里调用了 RunCallback。
  }
}
​

0x05 Сводка

Наконец, давайте завершим это процессом обмена сообщениями и посмотрим, как в нем используются различные части. Общая блок-схема выглядит следующим образом:

  1. У самородков проблема с парсингом упорядоченного списка, нет времени на настройку, временно добавить 0 последовательность

  2. Рабочий узел хочет отправить сообщение, поэтому вызывается метод Send.

  3. Метод Send вызовет NewRequest клиента для создания нового запроса.

  4. Postoffice::start()инициализирует информацию об узле и вызоветVan::start().

  5. Метод Send вызывает метод отправки Van для сетевого взаимодействия.

  6. После прохождения по сети процесс поступает на сервер, для сервера это Запрос, который вызывает Прием Вана.Van::Receiving()Получив пост-сообщение, выполняйте разные действия по разным командам. Для сообщений данных, если требуется дальнейшая обработка, будет вызываться ProcessDataMsg.

  7. Продолжите вызов ProcessDataMsg Вана, затем вызовите GetCustomer.

  8. GetCustomer вызовет Postoffice для соответствующей обработки customers_.

  9. Клиент будет использовать Accept для обработки сообщения.

  10. Функция Customer::Accept() добавляет сообщение в очередьrecv_queue_.

  11. Сам объект Customer также запускает принимающий поток.recv_thread_, используя Customer::Receive() :

    1. непрерывно отrecv_queue_Очередь для получения сообщений.
    2. Если (!recv.meta.request) означает ответ, тоtracker_[req.timestamp].second++
    3. Позвоните зарегистрированному пользователюrecv_handle_Функция обрабатывает сообщение.
  12. Van::Receiving()Позвоните зарегистрированному пользователюrecv_handle_Функция обрабатывает сообщение.

  13. Для сервера он зарегистрированrecv_handle_даKVServer::Process()функция.

  14. Функция Process вызывает request_handle_ для продолжения обработки, генерирует Response и возвращает его Worker.

  15. Ответ передается рабочему процессу по сети.

  16. Бег возвращается к рабочему, к рабочему фургону. Для работника это Запрос, который вызывает Прием Вана. (Следующая последовательность операций аналогична Серверу)

  17. Van::Receiving()Получив пост-сообщение, выполняйте разные действия по разным командам. Для сообщений данных, если требуется дальнейшая обработка, будет вызываться ProcessDataMsg.

  18. Клиент будет использовать Accept для обработки сообщения.

  19. Функция Customer::Accept() добавляет сообщение в очередьrecv_queue_.

  20. Здесь есть развязка, новой нитьюrecv_thread_иметь дело с.

  21. Сам объект Customer запустил новый потокrecv_thread_, используя Customer::Receive() .

  22. Для Worker зарегистрированrecv_handle_даKVWorker::Process()функция.

  23. позвонитьKVWorker::Process()Функция обрабатывает ответное сообщение Response.

+---------------------+       +------------------------+   Worker   +  Server            +--------------------------+
| KVWorker            |  1    |  Van                   |      3     |                    | Van                      |
|          Send  +--------+--------------->  send +-----------------+----->  Request +----------->  Receiving       |
|                     |   |   |                        |                                 |              +           |
|                     |   |   |       Receiving   <---------+       |           4        |              |           |             +---------------------------+
|                     |   |   |           +            |    |       |                    |              |           |             | Postoffice                |
|    Process          |   |   |           | 16         |    |       |                    |              | 5         |             |                           |
|      ^              |   |   |           v            |    | 15    |                    |              v           | GetCustomer |                           |
|      |              |   |   |     ProcessDataMsg     |    |       |                    |        ProcessDataMsg <------------------> unordered_map customers_|
|      |              |   |   |           +            |    |       |                    |              +           |      6      |                           |
|      |              |   |   |           |            |    |       |                    |              |           |             +---------------------------+
+---------------------+   |   +------------------------+    |       |                    +--------------------------+
       |                  |               |                 |       |                                   |
       |                  |2              |  17             |       |                                   | 7
       |                  |               |                 |       |                                   |
       |     +---------------------------------------+      |       |         +------------------------------------+
       |     | Customer   |               |          |      |       |         | Customer                |          |
       |     |            |               v          |      |       |         |                         |          |
       |     |            v                          |      |       |         |                         v          |
       |     |        NewRequest        Accept       |      |       |         |                      Accept        |
       |     |                            +          |      |       |         |                         +          |
       |     |                            |  18      |      |       |         |                         |          |
       |     |                            |          |      |       |         |                         | 8        |
       |     |                            v          |      |       |         |                         v          |
       |     |                      revc_queue_      |      |       |         |                    recv_queue_     |
       |     |                            +          |      |       |         |                         +          |
    22 |     |                            |  19      |      |       |         |                         | 9        |
       |     |                            |          |      |       |         |                         |          |
       |     |                  20        v          |      |       |         |                10       v          |
       |     | recv_thread_ +-------> Receving       |      |       |         |  recv_thread_ +---> Receiving      |
       |     |                            |          |      |       |         |                         +          |
       |     |                            |  21      |      |       |         |                         | 11       |
       |     |                            |          |      |       |         |                         |          |                +------------------+
       |     |                            v          |      |       |         |                         v          |                |KVServer          |
       +---------------------------+ recv_handle     |      |       |         |                    recv_handle_+------------------> |                  |
             |                                       |      |       |         |                                    |          12    |         Process  |
             +---------------------------------------+      |       |         +------------------------------------+                |           +      |
                                                            |       |                                                               +------------------+
                                                            |       |                                                                           |
                                                            |       |                                                                           |  13
                                                            |       |                                                                           v
                                                            |       |                                                               +-----------+-------+
                                                            |       |                                                               | request_handle_   |
                                                            |       |                                            14                 |                   |
                                                            +<-----------+   Response <----------------------------------------------------+ Response   |
                                                                    |                                                               |                   |
                                                                    |                                                               +-------------------+
                                                                    +
​

Телефон такой:

На данный момент введение ps-lite завершено, и далее будет представлен сервер параметров Douban Paracel, так что следите за обновлениями.

0xEE Личная информация

★★★★★★Думая о жизни и технологиях★★★★★★

Публичный аккаунт WeChat:мысли Росси

Если вы хотите вовремя получать новости о статьях, написанных отдельными лицами, или хотите видеть технические материалы, рекомендованные отдельными лицами, обратите внимание.

ссылка 0xFF

Самое полное понимание PS-lite в истории

Внедрение инфраструктуры сервера параметров машинного обучения с нуля (2)