предисловие
Последние два дня я думал о проектах искрового глубокого обучения и искрового обучения, но я чувствую, что они неудовлетворительны. При обучении необходимо транслировать данные на каждую ноду для параллельного обучения, что в принципе теряет свою практическую ценность (данные транзакций будут больше, чем память одной ноды), а комбинация spark-deep-learning и tf кластер еще не реализован. Поэтому на этот раз мы обратились к проекту Yahoo TensorFlowOnSpark, исходный код которого уже давно открыт. После краткого просмотра его исходного кода принцип в целом ясен, он записан здесь, и я надеюсь, что он может помочь читателям.
Анализ работы кода TensorFlowOnSpark
Откройте examples/mnist/spark/mnist_spark/mnist_dist.py из проекта,
Первым шагом является создание SparkContext через pyspark.Этот процесс фактически запускает кластер Spark.Что касается того, как запустить искру через python и общаться друг с другом, вы можете обратиться к моей статье за подробностями:Как PySpark настраивает рабочие команды Python.
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
Второй шаг — принять некоторые параметры командной строки, которые я не буду публиковать.
Третий шаг — использовать стандартный API pyspark для получения данных изображения из HDFS и формирования кадра данных/rdd:
dataRDD = images.map(lambda x: toNumpy(str(x[0])))
Тогда пора переходить к делу и запускать tf кластер:
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
sc в TFCluster.run - это sparkcontext, а функция mnist_dist.map_fun содержит ваш бизнес-код tensorflow. В этом примере это код модели minist. О конкретном коде кода модели мы поговорим позже. Давайте сначала посмотрим на метод TFCluster.run:
cluster_template = {}
cluster_template['ps'] = range(num_ps)
cluster_template['worker'] = range(num_ps, num_executors)
Вышеупомянутое предназначено для определения количества серверов параметров и воркеров.Эти два понятия связаны с tf. Затем будет запущен Сервер:
server = reservation.Server(num_executors)
server_addr = server.start()
Запустите сервер на стороне драйвера, в основном для мониторинга рабочего процесса tf, запущенного стороной исполнителя искры, для координации.
# start TF nodes on all executors
logging.info("Starting TensorFlow on executors")
cluster_meta = {
'id': random.getrandbits(64),
'cluster_template': cluster_template,
'num_executors': num_executors,
'default_fs': defaultFS,
'working_dir': working_dir,
'server_addr': server_addr
}
Приведенный выше код получает полную информацию, необходимую для запуска кластера tf. Предполагается, что вы можете пойти в Google, как вручную настроить кластер tf, и тогда вы сможете глубже понять, как TensorFlowOnSpark собирает какие параметры заранее.
nodeRDD = sc.parallelize(range(num_executors), num_executors)
# start TF on a background thread (on Spark driver) to allow for feeding job
def _start():
nodeRDD.foreachPartition(TFSparkNode.run(map_fun,
tf_args,
cluster_meta,
tensorboard,
queues,
background=(input_mode == InputMode.SPARK)))
t = threading.Thread(target=_start)
t.start()
# wait for executors to register and start TFNodes before continuing
logging.info("Waiting for TFSparkNodes to start")
cluster_info = server.await_reservations()
logging.info("All TFSparkNodes started")
Первый фрагмент кода выше на самом деле гарантирует, что задачи cluster_size запущены, каждая задача соответствует разделу, и каждый раздел фактически имеет только один элемент, который является номером рабочего процесса. Запустите соответствующий рабочий процесс tf (включая ps) с помощью foreach на разделе. Мы снова видим предпоследнюю строку кода, предыдущий сервер, он будет блокировать выполнение кода, пока не будут запущены все рабочие tf. Здесь мы также видим, что искровой исполнитель может запускать несколько рабочих процессов tf.
Теперь давайте перейдем к TFSparkNode.run, в котором содержится логика запуска tf worker, помните, что эти коды уже выполнялись в экзекьюторе.
def run(fn, tf_args, cluster_meta, tensorboard, queues, background):
"""
Wraps the TensorFlow main function in a Spark mapPartitions-compatible function.
"""
def _mapfn(iter):
Во-первых, определяется функция _mapfn.Ее параметром является iter.Этот iter бесполезен.Это предыдущий рабочий номер и имеет только один элемент. Основная функция этой функции — запустить tf worker (PS) и запустить код пользователя:
client = reservation.Client(cluster_meta['server_addr'])
cluster_info = client.get_reservations()
В процессе запуска будет запущен клиент, который подключится к серверу, о котором мы упоминали ранее, и сообщит об успешном запуске.
if job_name == 'ps' or background:
# invoke the TensorFlow main function in a background thread
logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on background process".format(job_name, task_index, worker_num))
p = multiprocessing.Process(target=fn, args=(tf_args, ctx))
p.start()
# for ps nodes only, wait indefinitely in foreground thread for a "control" event (None == "stop")
if job_name == 'ps':
queue = TFSparkNode.mgr.get_queue('control')
done = False
while not done:
msg = queue.get(block=True)
logging.info("Got msg: {0}".format(msg))
if msg == None:
logging.info("Terminating PS")
TFSparkNode.mgr.set('state', 'stopped')
done = True
queue.task_done()
else:
# otherwise, just run TF function in the main executor/worker thread
logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on foreground thread".format(job_name, task_index, worker_num))
fn(tf_args, ctx)
logging.info("Finished TensorFlow {0}:{1} on cluster node {2}".format(job_name, task_index, worker_num))
Это определит, является ли это ps или рабочим. Если он работает в фоновом режиме, он будет выполняться непосредственно через multiprocessing.Process, о котором мы упоминали в прошлом году.mnist_dist.map_fun
метод, в то время какmnist_dist.map_fun
По сути, он содержит логический код сеанса tf. Конечно, хотя модель и запускается в это время, она используется при сборе данных.queue.get(block=True)
На данный момент данные еще не поступили, поэтому он будет заблокирован. Стоит отметить, что код здесь будет отправлен на python из spark.
Выполнить в рабочем.
После получения объекта кластера мы можем вызвать метод train для реального обучения, которое, по сути, начинает подачу данных:
if args.mode == "train":
cluster.train(dataRDD, args.epochs)
Входитьcluster.train
Посмотрите, вы введете следующий код:
unionRDD.foreachPartition(TFSparkNode.train(self.cluster_info, self.cluster_meta, qname))
Здесь данные будут передаваться каждому воркеру TF по разделам (путем вызова метода train):
def _train(iter):
queue = mgr.get_queue(qname)
....
for item in iter:
count += 1
queue.put(item, block=True)
....
queue.join()
Здесь будет получена очередь tf, а затем пропущена в нее через iter (то есть тренировочные данные, содержащиеся в собственно spark rdd), если она заполнена, то будет заблокирована.
Пока не завершится общий процесс. Теперь вернемся назад и посмотрим на наш бизнес-код.mnist_dist.map_fun
, этот метод фактически выполняется на каждом воркере tf:
if job_name == "ps":
server.join()
elif job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % task_index,
cluster=cluster)):
Просто сделайте вывод, если это так, остановитесь здесь, в противном случае выполните работу по построению модели. В tf.device.. пришло время определить модель, стандартный код tf:
# Variables of the hidden layer
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
tf.summary.histogram("hidden_weights", hid_w)
Разумеется, в примере кода TensorFlowOnSpark используется Supervisor:
if args.mode == "train":
sv = tf.train.Supervisor(is_chief=(task_index == 0),
logdir=logdir,
init_op=init_op,
summary_op=None,
saver=saver,
global_step=global_step,
stop_grace_secs=300,
save_model_secs=10)
with sv.managed_session(server.target) as sess:
step = 0
tf_feed = TFNode.DataFeed(ctx.mgr, args.mode == "train")
batch_xs, batch_ys = feed_dict(tf_feed.next_batch(batch_size))
TNFode.DataFeed предоставляет удобный способ получения больших объемов данных, поэтому вам не нужно беспокоиться об очередях. Обучив необходимое количество, вы можете прекратить обучение:
if sv.should_stop() or step >= args.steps:
tf_feed.terminate()
Теперь весь процесс должен быть понятнее.