0x00 сводка
Paracel — это среда распределенных вычислений, разработанная Douban, которая решает задачи машинного обучения на основе парадигмы сервера параметров: логистическая регрессия, SVD, матричная факторизация (BFGS, sgd, als, cg), LDA, Lasso... .
Paracel поддерживает параллелизм данных и моделей, предоставляет пользователям простой в использовании коммуникационный интерфейс и является более гибким, чем системы в стиле mapreduce. Paracel также поддерживает асинхронный режим обучения, что ускоряет сходимость итерационных задач. Кроме того, структура программы Paracel очень похожа на структуру последовательной программы, пользователи могут больше сосредоточиться на самом алгоритме и не должны уделять слишком много внимания распределенной логике.
Поскольку мы уже использовали ps-lite для ознакомления с основными функциями сервера параметров, в этой статье мы в основном сравниваем основные аспекты и некоторые ключевые технические моменты с ps-lite (у paracel нет механизма отказоустойчивости с открытым исходным кодом, он не маленький к сожалению), а не более подробный разбор как у ps-lite.
Для этой статьи основная логика ps-lite выглядит следующим образом:
Другие статьи из этой серии:
[Анализ исходного кода] Сервер параметров машинного обучения ps-lite (1) ----- PostOffice
[Анализ исходного кода] (3) сервера параметров машинного обучения ps-lite ----- Агент Заказчик
В этой статье во время синтаксического анализа будет удален некоторый код, не являющийся телом.
0x01 использовать
Давайте сначала посмотрим, как использовать его с помощью алгоритма LR, предоставленного исходным кодом.
1.1 Конфигурация и запуск
Из исходного кода, чтобы найти соответствующие части LR, ниже приведены некоторые необходимые конфигурации, в которых я сделал часть перевода, следует отметить, что несколько разных типов экземпляров могут быть запущены с помощью одной команды, и экземпляры запускаются исполняемая программа lr.
- Войдите в домашний каталог Paracel, чтобы войти в рабочий каталог Paracel.
cd paracel;
- Создание обучающего набора данных для классификации
python ./tool/datagen.py -m classification -o training.dat -n 2500 -k 100
- Настройка пути к библиотеке ссылок: Настройка пути к библиотеке ссылок
export LD_LIBRARY_PATH=your_paracel_install_path/lib
- Create a json file named
cfg.json
, see example in Parametersраздел ниже. Создайте файл конфигурации- Выполнить (4 рабочих процесса, локальный режим в следующем примере) Выполнить (4 рабочих процесса, 2 сервера параметров)
./prun.py -w 4 -p 2 -c cfg.json -m local your_paracel_install_path/bin/lr
Default parameters are set in a JSON format file. For example, we create a cfg.json as below(modify
your_paracel_install_path
):{
"training_input": "training.dat", тренировочный набор
"test_input": "training.dat", проверочный набор
«predict_input» : «training.dat», данные метки
"output" : "./lr_result/",
"update_file" : "your_paracel_install_path/lib/liblr_update.so",
"update_func": "lr_theta_update", функция обновления
"method" : "ipm",
"rounds" : 100,
"alpha" : 0.001,
"beta" : 0.01,
"debug" : false
}
1.2 Компиляция
Из make-файла видно, что lr_driver.cpp и lr.cpp компилируются вместе в исполняемый файл lr. Скомпилируйте update.cpp в библиотеку, которая будет загружена и вызвана сервером.
add_library(lr_update SHARED update.cpp) # 参数服务器如何更新
target_link_libraries(lr_update ${CMAKE_DL_LIBS})
install(TARGETS lr_update LIBRARY DESTINATION lib)
add_library(lr_method SHARED lr.cpp) # 算法代码
target_link_libraries(lr_method ${Boost_LIBRARIES} comm scheduler)
install(TARGETS lr_method LIBRARY DESTINATION lib)
add_executable(lr lr_driver.cpp) # 驱动代码
target_link_libraries(lr
${Boost_LIBRARIES}
comm scheduler lr_method)
install(TARGETS lr RUNTIME DESTINATION bin)
1.3 Градиентный спуск
Для LR есть четыре метода стохастического градиентного спуска для крупномасштабных глубоких нейронных сетей на выбор.
- dgd: distributed gradient descent learning
- ipm: iterative parameter mixtures learning
- downpour: asynchrounous gradient descent learning
- agd: slow asynchronous gradient descent learning
Выбираем алгоритм agd, чтобы научиться анализировать:Woohoo.ee CS.Berkeley.Amount/~Брехт/Боюсь…
1.4 Код водителя
Во-первых, давайте посмотрим на код драйвера lr_driver.cpp, логика такова:
- Настройте среду выполнения и связь.
- Чтение параметров анализа.
- Генерировать логистическую_регрессию, обучать, проверять, прогнозировать.
DEFINE_string(server_info,
"host1:7777PARACELhost2:8888",
"hosts name string of paracel-servers.\n");
DEFINE_string(cfg_file,
"",
"config json file with absolute path.\n");
int main(int argc, char *argv[])
{
// 配置运行环境和通信
paracel::main_env comm_main_env(argc, argv);
paracel::Comm comm(MPI_COMM_WORLD);
google::SetUsageMessage("[options]\n\t--server_info\n\t--cfg_file\n");
google::ParseCommandLineFlags(&argc, &argv, true);
// 读取分析参数
paracel::json_parser pt(FLAGS_cfg_file);
std::string training_input, test_input, predict_input, output, update_file, update_func, method;
try {
training_input = pt.check_parse<std::string>("training_input");
test_input = pt.check_parse<std::string>("test_input");
predict_input = pt.check_parse<std::string>("predict_input");
output = pt.parse<std::string>("output");
update_file = pt.check_parse<std::string>("update_file");
update_func = pt.parse<std::string>("update_func");
method = pt.parse<std::string>("method");
} catch (const std::invalid_argument & e) {
std::cerr << e.what();
return 1;
}
int rounds = pt.parse<int>("rounds");
double alpha = pt.parse<double>("alpha");
double beta = pt.parse<double>("beta");
bool debug = pt.parse<bool>("debug");
// 生成 logistic_regression,进行训练,验证,预测
paracel::alg::logistic_regression lr_solver(comm,
FLAGS_server_info,
training_input,
output,
update_file,
update_func,
method,
rounds,
alpha,
beta,
debug);
lr_solver.solve();
std::cout << "final loss: " << lr_solver.calc_loss() << std::endl;
lr_solver.test(test_input);
lr_solver.predict(predict_input);
lr_solver.dump_result();
return 0;
}
Из предыдущей конфигурации мы знаем, что часть обновления:
"update_file" : "your_paracel_install_path/lib/liblr_update.so",
"update_func" : "lr_theta_update",
Итак, мы получаем функцию обновления из alg/classification/logistic_regression/update.cpp следующим образом:
В частности, два параметра объединяются и возвращаются. Эта часть кода компилируется в библиотеку, загружается и запускается на сервере.
#include <vector>
#include "proxy.hpp"
#include "paracel_types.hpp"
using std::vector;
extern "C" {
extern paracel::update_result lr_theta_update;
}
vector<double> local_update(vector<double> a, vector<double> b) {
vector<double> r;
for(int i = 0; i < (int)a.size(); ++i) {
r.push_back(a[i] + b[i]);
}
return r;
}
paracel::update_result lr_theta_update = paracel::update_proxy(local_update);
1.5 Код алгоритма
1.5.1 Определение класса
logistic_regression — это определение класса, расположенное в файле lr.hpp. logistic_regression должен наследовать paracel::paralg, чтобы использовать его.
namespace paracel {
namespace alg {
class logistic_regression: public paracel::paralg {
public:
logistic_regression(paracel::Comm,
string,
string _input,
string output,
string update_file_name,
string update_func_name,
string = "ipm",
int _rounds = 1,
double _alpha = 0.002,
double _beta = 0.1,
bool _debug = false);
virtual ~logistic_regression();
double lr_hypothesis(const vector<double> &);
void dgd_learning(); // distributed gradient descent learning
void ipm_learning(); // by default: iterative parameter mixtures learning
void downpour_learning(); // asynchronous gradient descent learning
void agd_learning(); // slow asynchronous gradient descent learning
virtual void solve();
double calc_loss();
void dump_result();
void print(const vector<double> &);
void test(const std::string &);
void predict(const std::string &);
private:
void local_parser(const vector<string> &, const char);
void local_parser_pred(const vector<string> &, const char);
private:
string input;
string update_file, update_func;
std::string learning_method;
int worker_id;
int rounds;
double alpha, beta;
bool debug = false;
vector<vector<double> > samples, pred_samples;
vector<double> labels;
vector<double> theta;
vector<double> loss_error;
vector<std::pair<vector<double>, double> > predv;
int kdim; // not contain 1
};
} // namespace alg
} // namespace paracel
1.5.2 Основной код корпуса
solve является основным кодом, и для обучения выбираются различные методы стохастического градиентного спуска в соответствии с различными конфигурациями.
void logistic_regression::solve() {
auto lines = paracel_load(input);
local_parser(lines);
paracel_sync();
if(learning_method == "dgd") {
dgd_learning();
} else if(learning_method == "ipm") {
ipm_learning();
} else if(learning_method == "downpour") {
downpour_learning();
} else if(learning_method == "agd") {
agd_learning();
} else {
ERROR_ABORT("method do not support");
}
paracel_sync();
}
1.5.3 Алгоритм Agd
Сравнение алгоритмов узнаем в статье:
Следующий код в основном соответствует алгоритму работы один на один, а логика такова.
-
Сначала отправьте тета на сервер параметров;
-
Итеративное обучение:
- Прочитать последнюю тета с сервера параметров;
- тренировать;
- Отправьте результат расчета на сервер параметров;
-
получить последние результаты с сервера параметров;
void logistic_regression::agd_learning() {
int data_sz = samples.size();
int data_dim = samples[0].size();
theta = paracel::random_double_list(data_dim);
paracel_write("theta", theta); // first push // 首先把 theta 推送到参数服务器
vector<int> idx;
for(int i = 0; i < data_sz; ++i) {
idx.push_back(i);
}
paracel_register_bupdate(update_file, update_func);
double coff2 = 2. * beta * alpha;
vector<double> delta(data_dim);
unsigned time_seed = std::chrono::system_clock::now().time_since_epoch().count();
// train loop
for(int rd = 0; rd < rounds; ++rd) {
std::shuffle(idx.begin(), idx.end(), std::default_random_engine(time_seed));
theta = paracel_read<vector<double> >("theta"); // 从参数服务器读取最新的 theta
vector<double> theta_old(theta);
// traverse data
for(auto sample_id : idx) {
theta = paracel_read<vector<double> >("theta");
theta_old = theta;
double coff1 = alpha * (labels[sample_id] - lr_hypothesis(samples[sample_id]));
for(int i = 0; i < data_dim; ++i) {
double t = coff1 * samples[sample_id][i] - coff2 * theta[i];
theta[i] += t;
}
if(debug) {
loss_error.push_back(calc_loss());
}
for(int i = 0; i < data_dim; ++i) {
delta[i] = theta[i] - theta_old[i];
}
// 把计算结果推送到参数服务器
paracel_bupdate("theta", delta); // you could push a batch of delta into a queue to optimize
} // traverse
} // rounds
theta = paracel_read<vector<double> >("theta"); // last pull // 得到最终结果
}
Логическая схема lr выглядит следующим образом:
+------------+ +-------------------------------------------------+
| lr_driver | |logistic_regression |
| | | |
| +---------------------------------------> solve |
+------------+ lr_solver.solve() | + |
| | |
| | |
| | |
| +---------------------+-----------------------+ |
| | agd_learning | |
| | +-----------------------+ | |
| | | | | |
| | | v | |
| | | theta = paracel_read("theta") | |
| | | | | |
| | | | | |
| | | v | |
| | | | |
| | | delta[i] = theta[i] - theta_old[i] | |
| | | + | |
| | | | | |
| | | | | |
| | | v | |
| | | paracel_bupdate("theta", delta) | |
| | | + + | |
| | | | | | |
| | +-----------------------+ | | |
| +---------------------------------------------+ |
| | |
+-------------------------------------------------+
|
Worker |
+------------------------------------------------------------------------------------+
Server |
+---------------------+
| Server | |
| | |
| v |
| local_update |
| |
+---------------------+
1.6 Резюме
На данный момент мы знаем, как использовать Paracel. Реализация основана на драйвере в качестве ядра. Пользователям необходимо написать функции обновления и функции алгоритма. Но до глубокого понимания далеко.
В настоящее время у нас есть несколько вопросов, которые необходимо решить:
- Как Paracel запускает несколько работников для обучения?
- Как Paracel запустил сервер параметров?
- Как используется функция обновления?
Нам нужно продолжить исследование со стартовой частью.
0x02 начало
как уже упоминалось./prun.py -w 4 -p 2 -c cfg.json -m local your_paracel_install_path/bin/lr
это команда запуска, paracel входит в систему через prun.py, поэтому мы анализируем этот скрипт.
2.1 Python-скрипт prun.py
2.1.1 Основная функция корпуса
Ниже мы опускаем некоторый непредметный код, например параметры обработки, логика следующая:
- параметры обработки;
- Используйте init_starter, чтобы узнать, как запустить сервер, рабочий процесс и построить соответствующую строку;
- Используйте subprocess.Popen для запуска сервера, где исполнителем сервера является bin/start_server;
- Используйте os.system для запуска воркера;
if __name__ == '__main__':
optpar = OptionParser()
# 省略处理参数
(options, args) = optpar.parse_args()
nsrv = 1
nworker = 1
if options.parasrv_num:
nsrv = options.parasrv_num
if options.worker_num:
nworker = options.worker_num
if not options.method_server:
options.method_server = options.method
if not options.ppn_server:
options.ppn_server = options.ppn
if not options.mem_limit_server:
options.mem_limit_server = options.mem_limit
if not options.hostfile_server:
options.hostfile_server = options.hostfile
# 利用 init_starter 得到如何启动server,worker,构建出相应字符串
server_starter = init_starter(options.method_server,
str(options.mem_limit_server),
str(options.ppn_server),
options.hostfile_server,
options.server_group)
worker_starter = init_starter(options.method,
str(options.mem_limit),
str(options.ppn),
options.hostfile,
options.worker_group)
#initport = random.randint(30000, 65000)
#initport = get_free_port()
initport = 11777
start_parasrv_cmd_lst = [server_starter, str(nsrv), os.path.join(PARACEL_INSTALL_PREFIX, 'bin/start_server --start_host'), socket.gethostname(), ' --init_port', str(initport)]
start_parasrv_cmd = ' '.join(start_parasrv_cmd_lst)
# 利用 subprocess.Popen 启动server,其中server的执行程序是 bin/start_server
procs = subprocess.Popen(start_parasrv_cmd, shell=True, preexec_fn=os.setpgrp)
try:
serverinfo = paracelrun_cpp_proxy(nsrv, initport)
entry_cmd = ''
if args:
entry_cmd = ' '.join(args)
alg_cmd_lst = [worker_starter, str(nworker), entry_cmd, '--server_info', serverinfo, '--cfg_file', options.config]
alg_cmd = ' '.join(alg_cmd_lst)
# 利用 os.system 启动 worker
os.system(alg_cmd)
os.killpg(procs.pid, 9)
except Exception as e:
logger.exception(e)
os.killpg(procs.pid, 9)
2.1.2 функция стартера
Функция init_starter создает строку на основе конфигурации. Есть три способа запустить paracel:
The –m_сервер and -m options above refer to what type of cluster you use. Paracel support mesos clusters, mpi clusters and multiprocessers in a single machine.
Мы можем использовать знания из предыдущей статьи horovod, чтобы узнать, что mpirun может запускать несколько процессов.
В сочетании с предыдущей командной строкой./prun.py -w 4 -p 2 -c cfg.json -m local your_paracel_install_path/bin/lr
, вы можете знать, что локальный - это mpirun, поэтому paracel запускает 4 процесса lr через mpirun.
Конкретный код выглядит следующим образом:
def init_starter(method, mem_limit, ppn, hostfile, group):
'''Assemble commands for running paracel programs'''
starter = ''
if not hostfile:
hostfile = '~/.mpi/large.18'
if method == 'mesos':
if group:
starter = '%s/mrun -m %s -p %s -g %s -n ' % (PARACEL_INSTALL_PREFIX, mem_limit, ppn, group)
else:
starter = '%s/mrun -m %s -p %s -n ' % (PARACEL_INSTALL_PREFIX, mem_limit, ppn)
elif method == 'mpi':
starter = 'mpirun --hostfile %s -n ' % hostfile
elif method == 'local':
starter = 'mpirun -n '
else:
print 'method %s not supported.' % method
sys.exit(1)
return starter
2.2 Исполняемая программа start_server
Как упоминалось ранее, исполнитель сервера соответствует bin/start_server.
Давайте посмотрим на его сборку src/CMakeLists.txt, чтобы найти start_server.cpp.
add_library(comm SHARED comm.cpp) # 通信相关库
install(TARGETS comm LIBRARY DESTINATION lib)
add_library(scheduler SHARED scheduler.cpp # 调度
install(TARGETS scheduler LIBRARY DESTINATION lib)
add_library(default SHARED default.cpp) # 缺省库
install(TARGETS default LIBRARY DESTINATION lib)
# 这里可以看到start_server.cpp
add_executable(start_server start_server.cpp)
target_link_libraries(start_server ${Boost_LIBRARIES} ${CMAKE_DL_LIBS})
install(TARGETS start_server RUNTIME DESTINATION bin)
add_executable(paracelrun_cpp_proxy paracelrun_cpp_proxy.cpp)
target_link_libraries(paracelrun_cpp_proxy ${Boost_LIBRARIES} ${CMAKE_DL_LIBS})
install(TARGETS paracelrun_cpp_proxy RUNTIME DESTINATION bin)
2.3 Код сервера
src/start_server.cpp — это основной код сервера.
В сочетании с предыдущей командной строкой./prun.py -w 4 -p 2 -c cfg.json -m local your_paracel_install_path/bin/lr
, вы можете знать, что локальный — это mpirun, поэтому paracel запускает два процесса start_server через mpirun, то есть два сервера параметров.
#include <gflags/gflags.h>
#include "server.hpp"
DEFINE_string(start_host, "beater7", "host name of start node\n");
DEFINE_string(init_port, "7773", "init port");
int main(int argc, char *argv[])
{
google::SetUsageMessage("[options]\n\
--start_host\tdefault: balin\n\
--init_port\n");
google::ParseCommandLineFlags(&argc, &argv, true);
paracel::init_thrds(FLAGS_start_host, FLAGS_init_port); // join inside
return 0;
}
В файле include/server.hpp функция init_thrds запускает серию потоков, конкретная логика выглядит следующим образом.
- Соберите среду zmq;
- Сокет устанавливается для каждого потока;
- Создайте поток обработки сервера;
- Установить поток SSP;
- дождитесь окончания потока;
// init_host is the hostname of starter
void init_thrds(const paracel::str_type & init_host,
const paracel::str_type & init_port) {
// 构建 zmq 环境
zmq::context_t context(2);
zmq::socket_t sock(context, ZMQ_REQ);
paracel::str_type info = "tcp://" + init_host + ":" + init_port;
sock.connect(info.c_str());
char hostname[1024], freeport[1024];
size_t size = sizeof(freeport);
// hostname of servers
gethostname(hostname, sizeof(hostname));
paracel::str_type ports = hostname;
ports += ":";
// create sock in every thrd 为每个线程建立了socket
std::vector<zmq::socket_t *> sock_pt_lst;
for(int i = 0; i < paracel::threads_num; ++i) {
zmq::socket_t *tmp;
tmp = new zmq::socket_t(context, ZMQ_REP);
sock_pt_lst.push_back(tmp);
sock_pt_lst.back()->bind("tcp://*:*");
sock_pt_lst.back()->getsockopt(ZMQ_LAST_ENDPOINT, &freeport, &size);
if(i == paracel::threads_num - 1) {
ports += local_parse_port(paracel::str_type(freeport));
} else {
ports += local_parse_port(std::move(paracel::str_type(freeport))) + ",";
}
}
zmq::message_t request(ports.size());
std::memcpy((void *)request.data(), &ports[0], ports.size());
sock.send(request);
zmq::message_t reply;
sock.recv(&reply);
// 建立服务器处理线程 thrd_exec
paracel::list_type<std::thread> threads;
for(int i = 0; i < paracel::threads_num - 1; ++i) {
threads.push_back(std::thread(thrd_exec, std::ref(*sock_pt_lst[i])));
}
// 建立ssp线程 thrd_exec_ssp
threads.push_back(std::thread(thrd_exec_ssp, std::ref(*sock_pt_lst.back())));
// 等待线程结束
for(auto & thrd : threads) {
thrd.join();
}
for(int i = 0; i < paracel::threads_num; ++i) {
delete sock_pt_lst[i];
}
zmq_ctx_destroy(context);
} // init_thrds
./prun.py -w 4 -p 2 -c cfg.json -m local your_paracel_install_path/bin/lr
Соответствующая логическая схема запуска выглядит следующим образом:
prun.py
+
|
|
| +----------------+
| +--> | start_server |
v | +----------------+
server_starter = init_starter +--> mpirun -n 2 +----+
+ | +----------------+
| | | start_server |
| | | + |
| +--> | | |
v | | |
worker_starter = init_starter +--> mpirun -n 4 | | |
+ | v |
| | init_thrds |
| | + |
| | | |
+-------+----+--+-------+ | | |
| | | | | | |
| | | | | v |
v v v v | thrd_exec |
bin/lr bin/lr bin/lr bin/lr | + |
| | |
| | |
| | |
| v |
| thrd_exec_ssp |
+----------------+
2.4 Резюме
В настоящее время мы знаем, что и рабочий процесс, и сервер имеют несколько методов запуска, например, использование mpi для запуска нескольких процессов.
- Рабочая сторона запускает несколько процессов через driver.cpp в качестве основного тела.
- Сторона сервера использует start_server в качестве основного тела для запуска нескольких процессов, то есть несколько процессов (серверов параметров) образуют кластер.
Вышеописанное очень похоже на ps-lite.
Давайте погрузимся во внутренности этих двух ролей по отдельности.
0x03 Сервер в целом
Из предыдущего ps-lite мы знаем, что большинство серверов параметров используют хранилище KV для сохранения параметров, поэтому давайте сначала представим хранилище KV.
3,1 кВ хранения
Хранилище KV, используемое сервером, указано в include/kv_def.hpp.
#include "paracel_types.hpp"
#include "kv.hpp"
namespace paracel {
paracel::kvs<paracel::str_type, int> ssp_tbl; // 用来协助实现 SSP
paracel::kvs<paracel::str_type, paracel::str_type> tbl_store; // 主要的kv存储
}
Определение хранилища KV содержится в include/kv.hpp, ниже некоторый код опущен.
Можно видеть, что основная функция состоит в том, чтобы поддерживать таблицу памяти и предоставлять функции набора серий и функции получения серий.Когда необходимо вернуть значение и уникальность, для обработки используется хеш-функция.
template <class K, class V> struct kvs {
public:
bool contains(const K & k) {
return kvdct.count(k);
}
void set(const K & k, const V & v) {
kvdct[k] = v;
}
void set_multi(const paracel::dict_type<K, V> & kvdict) {
for(auto & kv : kvdict) {
set(kv.first, kv.second);
}
}
boost::optional<V> get(const K & k) {
auto fi = kvdct.find(k);
if(fi != kvdct.end()) {
return boost::optional<V>(fi->second);
} else return boost::none;
}
bool get(const K & k, V & v) {
auto fi = kvdct.find(k);
if(fi != kvdct.end()) {
v = fi->second;
return true;
} else {
return false;
}
}
paracel::list_type<V>
get_multi(const paracel::list_type<K> & keylst) {
paracel::list_type<V> valst;
for(auto & key : keylst) {
valst.push_back(kvdct.at(key));
}
return valst;
}
void get_multi(const paracel::list_type<K> & keylst,
paracel::list_type<V> & valst) {
for(auto & key : keylst) {
valst.push_back(kvdct.at(key));
}
}
void get_multi(const paracel::list_type<K> & keylst,
paracel::dict_type<K, V> & valdct) {
valdct.clear();
for(auto & key : keylst) {
auto it = kvdct.find(key);
if(it != kvdct.end()) {
valdct[key] = it->second;
}
}
}
// 这里使用了 hash 函数
// gets(key) -> value, unique
boost::optional<std::pair<V, paracel::hash_return_type> >
gets(const K & k) {
if(auto v = get(k)) {
std::pair<V, paracel::hash_return_type> ret(*v, hfunc(*v));
return boost::optional<
std::pair<V, paracel::hash_return_type>
>(ret);
} else {
return boost::none;
}
}
// compare-and-set, cas(key, value, unique) -> True/False
bool cas(const K & k, const V & v, const paracel::hash_return_type & uniq) {
if(auto r = gets(k)) {
if(uniq == (*r).second) {
set(k, v);
return true;
} else {
return false;
}
} else {
kvdct[k] = v;
}
return true;
}
paracel::dict_type<K, V> getall() {
return kvdct;
}
private:
//std::tr1::unordered_map<K, V> kvdct;
paracel::dict_type<K, V> kvdct;
paracel::hash_type<V> hfunc;
};
3.2 Логика обработки услуги
Поток thrd_exec реализует базовую логику обработки сервера параметров: он связан с различными командами, отправляемыми воркером (большинство из них обрабатывается для хранения KV), например:
- Если это команда «pull», используйте paracel::tbl_store.get(key, result), чтобы получить значение и вернуть его пользователю.
- Если это команда "push", используйте paracel::tbl_store.set(key, msg[2]) для вставки параметров в KV;
Следует отметить, что здесь используется определяемая пользователем функция обновления, а именно:
- dlopen_update_lambda используется для генерации функции обновления, установленной пользователем, и присвоения значения update_f.
- При обработке запросов типа «обновление» или «bupdate» используйте пользовательскую функцию обновления для обработки kv.
Некоторый код, не являющийся телом, был удален ниже.
Некоторый код, не являющийся телом, был удален ниже.
// thread entry
void thrd_exec(zmq::socket_t & sock) {
paracel::packer<> pk;
update_result update_f;
filter_result pullall_special_f;
filter_result remove_special_f;
// 这里使用了dlopen_update_lambda来对用户设置的update函数进行生成,赋值为 update_f
auto dlopen_update_lambda = [&](const paracel::str_type & fn, const paracel::str_type & fcn) {
void *handler = dlopen(fn.c_str(), RTLD_NOW | RTLD_LOCAL | RTLD_NODELETE);
auto local = dlsym(handler, fcn.c_str());
update_f = *(std::function<paracel::str_type(paracel::str_type, paracel::str_type)>*) local;
dlclose(handler);
};
// 主体逻辑
while(1) {
zmq::message_t s;
sock.recv(&s);
auto scrip = paracel::str_type(static_cast<const char *>(s.data()), s.size());
auto msg = paracel::str_split_by_word(scrip, paracel::seperator);
auto indicator = pk.unpack(msg[0]);
if(indicator == "pull") { // 如果是从参数服务器读取参数,则直接返回
auto key = pk.unpack(msg[1]);
paracel::str_type result;
auto exist = paracel::tbl_store.get(key, result); // 读取kv
if(!exist) {
paracel::str_type tmp = "nokey";
rep_send(sock, tmp);
} else {
rep_send(sock, result); // 返回
}
}
if(indicator == "pull_multi") { // 读取多个参数
paracel::packer<paracel::list_type<paracel::str_type> > pk_l;
auto key_lst = pk_l.unpack(msg[1]);
auto result = paracel::tbl_store.get_multi(key_lst);
rep_pack_send(sock, result);
}
if(indicator == "pullall") { // 读取所有参数
auto dct = paracel::tbl_store.getall();
rep_pack_send(sock, dct);
}
mutex.lock();
if(indicator == "push") { // 插入参数
auto key = pk.unpack(msg[1]);
paracel::tbl_store.set(key, msg[2]);
bool result = true;
rep_pack_send(sock, result);
}
if(indicator == "push_multi") { // 插入多个参数
paracel::packer<paracel::list_type<paracel::str_type> > pk_l;
paracel::dict_type<paracel::str_type, paracel::str_type> kv_pairs;
auto key_lst = pk_l.unpack(msg[1]);
auto val_lst = pk_l.unpack(msg[2]);
assert(key_lst.size() == val_lst.size());
for(int i = 0; i < (int)key_lst.size(); ++i) {
kv_pairs[key_lst[i]] = val_lst[i];
}
paracel::tbl_store.set_multi(kv_pairs); //插入kv
bool result = true;
rep_pack_send(sock, result);
}
if(indicator == "update" || indicator == "bupdate") { // 更新参数
if(msg.size() > 3) {
if(msg.size() != 5) {
ERROR_ABORT("invalid invoke in server end");
}
// open request func
auto file_name = pk.unpack(msg[3]);
auto func_name = pk.unpack(msg[4]);
dlopen_update_lambda(file_name, func_name);
} else {
if(!update_f) {
dlopen_update_lambda("../local/build/lib/default.so",
"default_incr_i");
}
}
auto key = pk.unpack(msg[1]);
// 这里使用用户的update函数来对kv进行处理
std::string result = kv_update(key, msg[2], update_f);
rep_send(sock, result);
}
if(indicator == "remove") { // 删除参数
auto key = pk.unpack(msg[1]);
auto result = paracel::tbl_store.del(key);
rep_pack_send(sock, result);
}
mutex.unlock();
} // while
} // thrd_exec
Упрощено, как показано:
+--------------------------------------------------------------------------------------+
| thrd_exec |
| |
| +---------------------------------> while(1) |
| | + |
| | | |
| | | |
| | +----------+----------+--------+--+------+----------+---------+---------+ |
| | | | | | | | | | |
| | | | | | | | | | |
| | | | | | | | | | |
| | | | | | | | | | |
| | | | | | | | | | |
| | v v v v v v v v |
| | |
| | pull pull_multi pullall push push_multi update bupdate remove |
| | + + + + + + + + |
| | | | | | | | | | |
| | | | | | | | | | |
| | | | | | | | | | |
| | | | | | | | | | |
| | v v v v v v v v |
| | +----------+----------+--------+----+----+----------+---------+---------+ |
| | | |
| | | |
| | | |
| | | |
| +-----------------------------------------+ |
| |
+--------------------------------------------------------------------------------------+
3.3 Резюме
На данный момент мы видим, что Paracel и ps-lite также очень похожи, сервер поддерживает хранилище, и сервер также может обрабатывать запросы клиентов.
0x04 Рабочий в целом
Рабочий процесс — это процесс, используемый для обучения алгоритма. Мы заранее знаем, что алгоритм должен наследовать paracel::paralg, чтобы использовать функцию сервера параметров.
namespace paracel {
namespace alg {
class logistic_regression: public paracel::paralg { .....
paracel::paralg можно рассматривать как API сервера параметров или прокси, мы рассмотрим ниже.
4.1 Базовый функциональный класс Paralg
Paralg — это базовый класс, предоставляющий основные функции Paracel, который можно понимать как класс API алгоритма или класс API внешней функции.
Мы даем только его переменные-члены и временно опускаем реализацию его функции. Наиболее важные из них:
- int stale_cache, clock, total_iters; требуется для синхронизации
- paracel::Comm worker_comm; Класс связи, такой как связь MPI
- int nworker = 1, количество рабочих
- bool ssp_switch = false; включать ли режим SSP
- parasrv *ps_obj; // Можно понимать как формальный параметр класса сервера.
class paralg {
private:
class parasrv { // 可以理解为是参数服务器类
using l_type = paracel::list_type<paracel::kvclt>;
using dl_type = paracel::list_type<paracel::dict_type<paracel::str_type, paracel::str_type> >;
public:
parasrv(paracel::str_type hosts_dct_str) {
// init dct_lst
dct_lst = paracel::get_hostnames_dict(hosts_dct_str);
// init srv_sz
srv_sz = dct_lst.size();
// init kvm
for(auto & srv : dct_lst) {
paracel::kvclt kvc(srv["host"], srv["ports"]);
kvm.push_back(std::move(kvc));
}
// init servers
for(auto i = 0; i < srv_sz; ++i) {
servers.push_back(i);
}
// init hashring
p_ring = new paracel::ring<int>(servers);
}
virtual ~parasrv() {
delete p_ring;
}
public:
dl_type dct_lst;
int srv_sz = 1;
l_type kvm;
paracel::list_type<int> servers; // 具体服务器列表
paracel::ring<int> *p_ring; // hash ring
}; // nested class parasrv
private:
int stale_cache, clock, total_iters; // 同步需要
int clock_server = 0;
paracel::Comm worker_comm; //通信类,比如 MPI 通信
paracel::str_type output;
int nworker = 1;
int rounds = 1;
int limit_s = 0;
bool ssp_switch = false;
parasrv *ps_obj; // 可以理解为是正式的参数服务器类。
paracel::dict_type<paracel::default_id_type, paracel::default_id_type> rm;
paracel::dict_type<paracel::default_id_type, paracel::default_id_type> cm;
paracel::dict_type<paracel::default_id_type, paracel::default_id_type> dm;
paracel::dict_type<paracel::default_id_type, paracel::default_id_type> col_dm;
paracel::dict_type<paracel::str_type, paracel::str_type> keymap;
paracel::dict_type<paracel::str_type, boost::any> cached_para;
paracel::update_result update_f;
int npx = 1, npy = 1;
}
4.2 Вывод
Написание программы Paracel требует создания подкласса базового класса paralg и переопределения метода виртуального решения. Некоторые из них являются параллельными интерфейсами SPMD iterfaces.
Из предыдущей реализации LR видно, что paracel::paralg нужно наследовать.
class logistic_regression: public paracel::paralg
То есть пользовательская функция решения может напрямую вызывать функцию Paralg для выполнения основной функции.
Возьмем в качестве примера paracel::paracel_read, мы видим, что используется функция parasrv.kvm, и мы продолжим вводить parasrv в будущем.
template <class V>
V paracel_read(const paracel::str_type & key,
int replica_id = -1) {
if(ssp_switch) { // 如果应用ssp,应该如何处理。我们下文就将具体介绍ssp如何处理
V val;
if(clock == 0 || clock == total_iters) {
cached_para[key] = boost::any_cast<V>(ps_obj->
kvm[ps_obj->p_ring->get_server(key)].
pull<V>(key));
val = boost::any_cast<V>(cached_para[key]);
} else if(stale_cache + limit_s > clock) {
val = boost::any_cast<V>(cached_para[key]);
} else {
while(stale_cache + limit_s < clock) {
stale_cache = ps_obj->
kvm[clock_server].pull_int(paracel::str_type("server_clock"));
}
cached_para[key] = boost::any_cast<V>(ps_obj->
kvm[ps_obj->p_ring->get_server(key)].
pull<V>(key));
val = boost::any_cast<V>(cached_para[key]);
}
return val;
}
// 否则直接返回
return ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull<V>(key);
}
Логика работы следующая:
+---------------------------------------------------------------------------+
| Algorithm |
| ^ +------------------------------v |
| | | |
| | | |
| | v |
| | +----------------------------+------------------------------+ |
| | | paracel_read | |
| | | | |
| | | ps_obj+>kvm[ps_obj+>p_ring+>get_server(key)].pull<V>(key) | |
| | | | |
| | +----------------------------+------------------------------+ |
| | | |
| | | |
| | | |
| | v |
| | Compute |
| | + |
| | | |
| | | |
| | v |
| | +---------------------------+-------------------------------+ |
| | | paracel_bupdate | |
| | | ps_obj->kvm[indx].bupdate | |
| | | | |
| | +---------------------------+-------------------------------+ |
| | | |
| | | |
| | | |
| | | |
| +-----<--------------------------+ |
| |
+---------------------------------------------------------------------------+
4.3 Резюме
Механизм рабочей стороны также похож на ps-lite, который делает запросы к серверу посредством таких операций, как чтение и извлечение.
0x05 Ring Hash
В статье Мушена кольцевой хеш связан с согласованностью данных, отказоустойчивостью, масштабируемостью и другими механизмами, такими как:
Сервер параметров использует традиционную согласованность данныхСогласованное хешированиеАлгоритм, ключ параметра и идентификатор узла сервера вставляются в хэш-кольцо.
Но, к сожалению, ps-lite не предоставляет эту часть кода.Хотя у paracel есть кольцевой хеш, он не полный.Douban не обладает отказоустойчивостью и согласованностью с открытым исходным кодом. Мы можем учиться и анализировать только на основе существующего кода.
5.1 Принцип
Это всего лишь общее объяснение, студенты, которым оно нужно, могут отправиться в Интернет для поиска подробных статей.
Чтобы объяснить с помощью технических терминов, ключевым техническим моментом согласованного хеширования является хеширование соответствующего ключа в пространство с 2^32 сегментами в соответствии с обычно используемым алгоритмом хеширования, то есть 0 ~ (2^32)-1 для числовое пространство. Мы можем соединить эти числа «голова к хвосту» и представить себе замкнутый цикл.
Чтобы понять в просторечии, ключевой момент: при развертывании сервера пространство серийных номеров сервера было настроено как фиксированное и очень большое число 1.2^32 (больше не нужно менять). Серверы могут быть назначены как 1Любой порядковый номер в 2^32. Таким образом, кластер серверов может исправить большинство правил алгоритма (поскольку пространство порядковых номеров является важным параметром алгоритма), поэтому перед лицом изменений, таких как увеличение емкости, должны быть в порядке только «правила распределения». -настроен в соответствии с фактической емкостью системы. Таким образом, влияние на общую систему невелико.
5.2 Определения
Ring — это класс реализации хеш-кольца, основная функция которого — добавить сервер в хеш-кольцо и вывести сервер из кольца.
// T rep type of server name
template <class T>
class ring {
public:
ring(paracel::list_type<T> names) {
for(auto & name : names) {
add_server(name);
}
}
ring(paracel::list_type<T> names, int cp) : replicas(cp) {
for(auto & name : names) {
add_server(name);
}
}
void add_server(const T & name) {
//std::hash<paracel::str_type> hfunc;
paracel::hash_type<paracel::str_type> hfunc;
std::ostringstream tmp;
tmp << name;
auto name_str = tmp.str();
for(int i = 0; i < replicas; ++i) { //对每一个副本进行处理
std::ostringstream cvt;
cvt << i;
auto n = name_str + ":" + cvt.str();
auto key = hfunc(n); // 依据name生成一个key
srv_hashring_dct[key] = name; //添加value
srv_hashring.push_back(key); //往list添加内容
}
// sort srv_hashring
std::sort(srv_hashring.begin(), srv_hashring.end());
}
void remove_server(const T & name) {
//std::hash<paracel::str_type> hfunc;
paracel::hash_type<paracel::str_type> hfunc;
std::ostringstream tmp;
tmp << name;
auto name_str = tmp.str();
for(int i = 0; i < replicas; ++i) { // 对每个副本进行处理
std::ostringstream cvt;
cvt << i;
auto n = name_str + ":" + cvt.str();
auto key = hfunc(n);// 依据name生成一个key
srv_hashring_dct.erase(key);// 删除value
auto iter = std::find(srv_hashring.begin(), srv_hashring.end(), key);
if(iter != srv_hashring.end()) {
srv_hashring.erase(iter); // 删除list中的内容
}
}
}
// TODO: relief load of srv_hashring_dct[srv_hashring[0]]
template <class P>
T get_server(const P & skey) {
//std::hash<P> hfunc;
paracel::hash_type<P> hfunc;
auto key = hfunc(skey);// 依据name生成一个key
auto server = srv_hashring[paracel::ring_bsearch(srv_hashring, key)];//获取server
return srv_hashring_dct[server];
}
private:
int replicas = 32;
// 分别用list和dict存储
paracel::list_type<paracel::hash_return_type> srv_hashring;
paracel::dict_type<paracel::hash_return_type, T> srv_hashring_dct;
};
5.3 Использование
Мы используем paracel_read, чтобы увидеть, что вызывающая последовательность
- Сначала используйте ps_obj->p_ring->get_server(key) для получения сервера параметров, соответствующего этому ключу (то есть извлеките сервер параметров из кольцевого хэша);
- Затем получить значение, соответствующее этому ключу, с этого сервера;
V paracel_read(const paracel::str_type & key,
int replica_id = -1) {
......
ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull<V>(key);
}
5.4 Резюме
В этом отличие от ps-lite, которое заключается в использовании кольцевого хеширования для обеспечения согласованности данных, отказоустойчивости и т. д., например, для добавления серверов в хеш-кольцо и удаления серверов из кольца.
0x06 параметр интерфейса сервера parasrv
Давайте разберем текущую логику и взглянем комплексно.
6.1 Конструкция parasrv интерфейса сервера параметров
Как использовать кольцевой хеш, нужно начать с parasrv.
Мы знаем, что paralg — это базовый класс API, который имеет следующие определения в paralg и создает ps_obj, который является экземпляром типа parasrv.
Примечание. На рабочей стороне используются следующие типы.
// paralg 内代码
parasrv *ps_obj; // 成员变量定义,参数服务器接口
paralg(paracel::str_type hosts_dct_str,
paracel::Comm comm,
paracel::str_type _output = "",
int _rounds = 1,
int _limit_s = 0,
bool _ssp_switch = false) : worker_comm(comm),
output(_output),
nworker(comm.get_size()),
rounds(_rounds),
limit_s(_limit_s),
ssp_switch(_ssp_switch) {
ps_obj = new parasrv(hosts_dct_str); // 构建参数服务器,一个parasrv的实例
init_output(_output);
clock = 0;
stale_cache = 0;
clock_server = 0;
total_iters = rounds;
if(worker_comm.get_rank() == 0) {
paracel::str_type key = "worker_sz";
(ps_obj->kvm[clock_server]).
push_int(key, worker_comm.get_size()); // 初始化时钟服务器
}
paracel_sync(); // mpi barrier同步一下
}
6.2 Определение параметра интерфейса сервера parasrv
Определение parasrv выглядит следующим образом, где p_ring — экземпляр кольца, используйтеp_ring = new paracel::ring<int>(servers)
чтобы завершить сборку.
Где p_ring — хэш кольца, а kvm — конкретный список хранения kv.
class parasrv {
using l_type = paracel::list_type<paracel::kvclt>;
using dl_type = paracel::list_type<paracel::dict_type<paracel::str_type, paracel::str_type> >;
public:
parasrv(paracel::str_type hosts_dct_str) {
// 初始化host信息,srv大小,kvm,servers,ring hash
// init dct_lst
dct_lst = paracel::get_hostnames_dict(hosts_dct_str);
// init srv_sz
srv_sz = dct_lst.size();
// init kvm
for(auto & srv : dct_lst) {
paracel::kvclt kvc(srv["host"], srv["ports"]);
kvm.push_back(std::move(kvc));
}
// init servers
for(auto i = 0; i < srv_sz; ++i) {
servers.push_back(i);
}
// init hashring
p_ring = new paracel::ring<int>(servers); // 构建
}
virtual ~parasrv() {
delete p_ring;
}
public:
dl_type dct_lst;
int srv_sz = 1;
l_type kvm; // 具体KV存储接口
paracel::list_type<int> servers;
paracel::ring<int> *p_ring; // ring hash
}; // nested class parasrv
kvm инициализируется следующим образом:
// init kvm
for(auto & srv : dct_lst) {
paracel::kvclt kvc(srv["host"], srv["ports"]);
kvm.push_back(std::move(kvc));
}
6.3 Интерфейс управления хранилищем кВ
kvclt — это абстракция управления kv.
Извлечение только части кода означает поиск соответствующего сервера для взаимодействия.
namespace paracel {
struct kvclt {
public:
kvclt(paracel::str_type hostname,
paracel::str_type ports) : host(hostname), context(1) {
ports_lst = paracel::str_split(ports, ',');
conn_prefix = "tcp://" + host + ":";
}
template <class V, class K>
bool pull(const K & key, V & val) { // 从参数服务器拉取
if(p_pull_sock == nullptr) {
p_pull_sock.reset(create_req_sock(ports_lst[0]));
}
auto scrip = paste(paracel::str_type("pull"), key); // paracel::str_type
return req_send_recv(*p_pull_sock, scrip, val);
}
template <class K, class V>
bool push(const K & key, const V & val) { // 往参数服务器推送
if(p_push_sock == nullptr) {
p_push_sock.reset(create_req_sock(ports_lst[1]));
}
auto scrip = paste(paracel::str_type("push"), key, val);
bool stat;
auto r = req_send_recv(*p_push_sock, scrip, stat);
return r && stat;
}
template <class V>
bool req_send_recv(zmq::socket_t & sock,
const paracel::str_type & scrip,
V & val) {
zmq::message_t req_msg(scrip.size());
std::memcpy((void *)req_msg.data(), &scrip[0], scrip.size());
sock.send(req_msg);
zmq::message_t rep_msg;
sock.recv(&rep_msg);
paracel::packer<V> pk;
if(!rep_msg.size()) {
ERROR_ABORT("paracel internal error!");
} else {
std::string data = paracel::str_type(
static_cast<char*>(rep_msg.data()),
rep_msg.size());
if(data == "nokey") return false;
val = pk.unpack(data);
}
return true;
}
private:
paracel::str_type host;
paracel::list_type<paracel::str_type> ports_lst;
paracel::str_type conn_prefix;
zmq::context_t context;
std::unique_ptr<zmq::socket_t> p_contains_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_pull_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_pull_multi_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_pullall_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_push_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_push_multi_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_update_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_bupdate_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_bupdate_multi_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_remove_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_clear_sock = nullptr;
std::unique_ptr<zmq::socket_t> p_ssp_sock = nullptr;
}; // struct kvclt
} // namespace paracel
Таким образом, текущая общая логика выглядит следующим образом:
Таким образом, текущая общая логика выглядит следующим образом:
+------------------+ worker + server
| paralg | |
| | |
| | |
| parasrv *ps_obj | |
| + | | +------------------+
| | | | | start_server |
+------------------+ | | |
| | | |
| | | |
v | | |
+------------+-----+ +------------------+ +---------+ | | thrd_exec |
| parasrv | |kvclt | | kvclt | | | |
| | | | | | | | |
| | | host | | | | | thrd_exec_ssp |
| servers | | | | | | | |
| | | ports_lst | | | | | |
| kvm +-----------> | |.....| | | | ssp_tbl |
| | | context | | | | | |
| p_ring | | | | | | | |
| + | | conn_prefix | | | | | tbl_store |
| | | | | | | | | |
+------------------+ | p_pull_sock+---+ | | | | |
| | | | | | | | |
| | p_push_sock | | | | | | |
| | + | | | | | | |
v | | | | | | | | |
+------------+------+ +------------------+ | +---------+ | | |
| ring | | | | +---+---+----------+
| | | | | ^ ^
| | | | | | |
| srv_hashring | | +-----------------------+ |
| | +------------------------------------+
| srv_hashring_dct | |
| | |
+-------------------+ +
Телефон такой:
0xEE Личная информация
★★★★★★Думая о жизни и технологиях★★★★★★
Публичный аккаунт WeChat:мысли Росси
ссылка 0xFF
PARACEL: упрощение распределенного машинного обучения
Сервер параметров — новый убийца распределенного машинного обучения