Как написать собственную модель оценки для Cloud TPU

Google искусственный интеллект TensorFlow GitHub

Автор: Лак Лакшманан, технический руководитель Google Cloud Platform (@lak_gcp)

Источник | Публичный аккаунт TensorFlow

Блоки тензорной обработки (TPU) ускоряют различные рабочие нагрузки машинного обучения в Google и доступны клиентам Google Cloud. Вы можете найти версии лучших моделей изображений с поддержкой TPU, таких как ResNet и AmoebaNet, в магазине эталонных моделей Cloud TPU; вы также можете использовать мощную библиотеку Tensor2Tensor для выполнения задач суммирования текста и ответов на вопросы на TPU. В этих руководствах вы узнаете, как использовать многие из самых популярных эталонных моделей Cloud TPU.

Примечание. Ссылки на корзиныGitHub.com/tensorflow/…

Ссылка на учебникcloud.Google.com/topology/docs/graph…

Но что, если у вас есть собственная модель TensorFlow? В этой статье я расскажу вам о процессе написания пользовательского оценщика для работы в Cloud TPU. Попутно я укажу на предостережения и предложу лучшие практики. Вы можете найти полный код этого решения на GitHub; в этой статье перечислены только соответствующие фрагменты кода.

Примечание: ссылка на полный код для решенияGitHub.com/Googlecloud…

Пользовательские оценки TensorFlow содержат оценки базового класса, передаваемые как функции модели:

1    def train_and_evaluate(output_dir, nsteps):    
2        estimator = tf.estimator.Estimator(    
3                            model_fn = model_fn,    
4                            model_dir = output_dir) 

Функция модели принимает функции, метки и шаблоны и возвращает EstimatorSpec. Например, модельная функция для задачи классификации изображений может содержать

1    def model_fn(features, labels, mode):
2        # write the model to compute predictions, loss, etc. from the model
3
4        return tf.estimator.EstimatorSpec(
5                    mode=mode,    
6                    predictions={"probabilities": probabilities, 
7                                         "classid": class_int, "class": class_str},
8                    loss=loss,
9                    train_op=train_op, 
10                  eval_metric_ops=evalmetrics,
11                  export_outputs={'classes': tf.estimator.export.PredictOutput(
12                            {"probabilities": probabilities, "classid": class_int, 
13                             "class": class_str})}
14            )

Пакет tf.contrib.tpu в TensorFlow предоставляет классы-оболочки, помогающие писать код таким образом, чтобы запускать его на процессорах, графических процессорах и облачных TPU. Давайте посмотрим, как написать пользовательскую оценку таким образом, не зависящим от акселератора.

1. Преобразование входных данных в записи TF

Облачные TPU настолько быстры, что, если вы не будете осторожны, в вашем обучении может преобладать чтение и запись («ввод» и «выдача») данных и контрольные точки хранилища. Заставлять TPU ждать ввода/вывода расточительно, поэтому мы сделаем несколько вещей, чтобы максимально использовать время, которое TPU тратит на вычисления.

Во-первых, следует избегать синтаксического анализа и обработки данных во входной функции оценщика, а вместо этого заранее преобразовывать данные в записи TF. Пакетная обработка записей TF проще, чем один файл изображения, поскольку сами записи содержат теги, что уменьшает количество небольших файлов, которые система должна считывать. Я использую Apache Beam для этого преобразования. Вы можете найти скрипты для чтения JPEG и записи TF-записей в официальном магазине TPU. Вы можете запускать программы Apache Beam в масштабе Cloud Dataflow, но если ваш источник данных в настоящее время не находится в Google Cloud, вы можете выполнять эту программу только локально на большой виртуальной машине (обязательно установите apache-beam с pip).

Примечание: JPEG и запись ссылки на записи TFGitHub.com/tensorflow/…

Записи TF являются словарями. Для классификации изображений важны две записи, записанные описанным выше конвейером: «изображение/класс/метка» (в int64) и «изображение/закодированное» (состоящее из содержимого файла JPEG).

2. Напишите функцию ввода для чтения записей TF.

Как и в случае с любым оценщиком, вам нужно написать входные функции для чтения этих записей TF. Использование Dataset API значительно упрощает эту задачу, но есть несколько предостережений. В ходе презентации я укажу на эти вопросы.

Вот моя функция ввода:

1    def make_input_fn(pattern, mode, num_cores=8, transpose_input=False):
2        def _set_shapes(batch_size, images, labels):
3            """Statically set the batch_size dimension."""
4                if transpose_input:    
5                    images.set_shape(images.get_shape().merge_with(
6                        tf.TensorShape([None, None, None, batch_size])))
7                    labels.set_shape(labels.get_shape().merge_with(
6                        tf.TensorShape([batch_size])))
9                else:
10                    images.set_shape(images.get_shape().merge_with(
11                        tf.TensorShape([batch_size, None, None, None])))
12                    labels.set_shape(labels.get_shape().merge_with(
13                        tf.TensorShape([batch_size])))
14                return images, labels
15
16        def _input_fn(params):
17            batch_size = params['batch_size']
18            is_training = (mode == tf.estimator.ModeKeys.TRAIN)
19
20            # read the dataset
21            dataset = tf.data.Dataset.list_files(pattern, shuffle=is_training)
22            if is_training:
23                dataset = dataset.repeat()
24            def fetch_dataset(filename):
25                buffer_size = 8 * 1024 * 1024 # 8 MiB per file
26                dataset = tf.data.TFRecordDataset(filename, buffer_size=buffer_size)
27                return dataset
28            dataset = dataset.apply(
29                tf.contrib.data.parallel_interleave(
30                    fetch_dataset, cycle_length=64, sloppy=True))
31            dataset = dataset.shuffle(1024)
32
33            # augment and batch
34            dataset = dataset.apply(
35                tf.contrib.data.map_and_batch(
36                    read_and_preprocess, batch_size=batch_size,
37                    num_parallel_batches=num_cores, drop_remainder=True
38                ))
39
40           if transpose_input:
41               dataset = dataset.map(
42                   lambda images, labels: (tf.transpose(images, [1, 2, 3, 0]), labels),
43                   num_parallel_calls=num_cores)
44
45            # assign static shape
46            dataset = dataset.map(
47                functools.partial(_set_shapes, batch_size)
48            )
49
50            # prefetch data while training
51            dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE) 
52            return dataset
53
54        return _input_fn 

Обратите внимание, что входная функция принимает параметр params. По сути, это будет аргумент командной строки, переданный тренеру, чтобы мы могли извлечь подробности о наборе данных, такие как время обучения и оценочные изображения.

batch_size является особенным, поскольку TPU имеет несколько ядер, а batch_size устанавливается оценщиком TPU и является эффективным размером пакета. Вы должны возвращать записи batch_size полными, а не частично заполненными партиями. Поскольку вы циклически повторяете обучающие данные бесконечно, эта проблема не возникает во время обучения. Но это означает, что проще всего округлить оценочный набор данных до числа, кратного количеству ядер. Если число ядер равно 8, а в наборе данных оценки 1026 изображений, для оценки можно использовать только первые 1024 изображения. Оставшиеся 2 изображения отбрасываются. (У нас также есть способ обработки последних оставшихся частичных пакетов в Cloud TPU, поэтому я не буду здесь вдаваться в подробности.)

Как и при любом распределенном обучении, вы должны убедиться, что каждый рабочий процесс видит свое подмножество данных, которое обрабатывается путем параллельного чередования всех файлов и переупорядочения записей внутри самого буфера.

Общим требованием для задач классификации изображений является дополнение исходных данных путем добавления таких методов, как случайное кадрирование и переворачивание. Я делаю это с помощью функции read_and_preprocess. Обратите внимание, что я применил эту функцию к отдельным записям TF и ​​создал 8 параллельных пакетов, отбрасывая все оставшиеся записи (опять же, это не окажет никакого влияния во время обучения, поскольку вы будете повторять обучение бесконечно).

Далее выполняется транспонирование данных. Оказывается, перенос данных в TPU для сохранения размера пакета может значительно повысить производительность. Таким образом, мы можем сделать это по мере необходимости. Если мы запускаем программу на GPU или CPU, флаг transpose_input становится ложным.

Для TPU требуются тензоры статических размеров. Хотя мы позаботились о том, чтобы сохранить это (отбрасывая оставшиеся пакеты), более распространенной практикой является написание API набора данных для ядра TensorFlow. Поэтому мы вызываем функцию, чтобы изменить размер пакета в наборе данных с None на размер пакета.

Последняя операция оптимизации имеет решающее значение. Нам нужно выполнить предварительную выборку данных. Другими словами, когда TPU обрабатывает пакет записей, мы ищем и получаем следующий пакет через поток ввода-вывода. Таким образом, мы можем максимально использовать TPU (или GPU) без какого-либо влияния на CPU.

3. Обработка записей перехода

Функция ввода (см. выше) устанавливает способ обработки ввода, но фактический анализ также выполняется с помощью метода read_and_preprocess(). Этот метод выглядит так:

1    def read_and_preprocess(example_data):
2            parsed = tf.parse_single_example(example_data, {
3                'image/encoded': tf.FixedLenFeature((), tf.string, ''),
4                'image/class/label': tf.FixedLenFeature([], tf.int64, 1), 
5            }) 
6            image_bytes = tf.reshape(parsed['image/encoded'], shape=[])
7            label = tf.cast(
8                tf.reshape(parsed['image/class/label'], shape=[]), dtype=tf.int32) - 1
9
10            # end up with pixel values that are in the -1, 1 range
11            image = tf.image.decode_jpeg(image_bytes, channels=NUM_CHANNELS)
12            image = tf.image.convert_image_dtype(image, dtype=tf.float32) # 0-1
13            image = tf.expand_dims(image, 0) # resize_bilinear needs batches
14
15            image = tf.image.resize_bilinear(
16                image, [HEIGHT + 10, WIDTH + 10], align_corners=False)
17            image = tf.squeeze(image)  # remove batch dimension
18            image = tf.random_crop(image, [HEIGHT, WIDTH, NUM_CHANNELS])
19            image = tf.image.random_flip_left_right(image)
20            image = tf.image.random_brightness(image, max_delta=63.0 / 255.0)
21            image = tf.image.random_contrast(image, lower=0.2, upper=1.8)
22
23
24            #pixel values are in range [0,1], convert to [-1,1]
25            image = tf.subtract(image, 0.5)
26            image = tf.multiply(image, 2.0)
27            return image, label 

Здесь есть две важные оговорки. Первый — использовать parse_single_example, так как мы вызываем эту функцию из map(), она будет вызываться для одной записи TF. Мы извлекаем из записей соответствующую информацию (закодированные изображения и метки), которые затем используем для построения необходимых тензоров. Второе замечание: данные должны быть числовыми. Например, я не могу вернуть строку метки, потому что TPU может обрабатывать только числовые данные. Нам нужно вычислить индекс метки в конвейере предварительной обработки, на этом этапе метки будут только целыми числами.

4. Обеспечить функцию ввода

После обучения модели вам необходимо развернуть эту модель и обслуживать ее через TF Serving. Следующий код такой же, как и для любого другого оценщика:

1    def serving_input_fn():    
2            # Note: only handles one image at a time     
3            feature_placeholders = {'image_bytes':    
4                                                    tf.placeholder(tf.string, shape=())}    
5            image, _ = read_and_preprocess(    
6                    tf.squeeze(feature_placeholders['image_bytes']))    
7            features = {    
8                'image': tf.expand_dims(image, 0)    
9            }    
10          return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)

TPU оптимизирован для пакетного вывода; если ваш вариант использования требует онлайн-прогнозирования, в настоящее время его лучше всего обслуживать через ЦП или ГП (в зависимости от размера пакета и сложности модели). При написании функции ввода я предполагал, что передаю только одно изображение, поэтому на самом деле имел в виду CPU/GPU.

5. Функция модели

Функции модели должны создавать и возвращать TPUEstimatorSpec. Реализация выглядит следующим образом:

1    def image_classifier(features, labels, mode, params):
2        image = features
3        if isinstance(features, dict):
4            image = features['image']
5
6        ylogits, nclasses = cnn_model(image, mode, params)
7
8        probabilities = tf.nn.softmax(ylogits)
9        class_int = tf.cast(tf.argmax(probabilities, 1), tf.int32)
10      class_str = tf.gather(LIST_OF_LABELS, class_int)
11
12      if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL:
13            loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
14                logits=ylogits, labels=tf.one_hot(labels, nclasses)))
15
16            def metric_fn(class_int, labels):
17                return {'accuracy': tf.metrics.accuracy(class_int, labels)}
18            evalmetrics = (metric_fn, [class_int, labels])
19
20            if mode == tf.estimator.ModeKeys.TRAIN:
21                # this is needed for batch normalization, but has no effect otherwise
22                update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
23                optimizer = tf.train.AdamOptimizer(learning_rate=params['learning_rate'])
24                if params['use_tpu']:
25                    optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer) # TPU change 1
26                with tf.control_dependencies(update_ops):
27                    train_op = optimizer.minimize(loss, tf.train.get_global_step())
28            else:
29                train_op = None
30        else:
31            loss = None
32            train_op = None
33            evalmetrics = None
34
35        return tf.contrib.tpu.TPUEstimatorSpec(  # TPU change 2
36                    mode=mode,
37                    predictions={"probabilities": probabilities,
38                                        "classid": class_int, "class": class_str},
39                    loss=loss,
40                    train_op=train_op,
41                    eval_metrics=evalmetrics,
42                    export_outputs={'classes': tf.estimator.export.PredictOutput(
43                            {"probabilities": probabilities, "classid": class_int,
44                              "class": class_str})}
45        ) 

Входящие функции могут быть изображениями (мои входные функции для обучения и оценки) или словарями (мои входные функции). Я проверяю и извлекаю изображение из функций.

Затем я вызываю фактическую математическую функцию модели на изображении. Это должен быть обычный код TensorFlow с использованием tf.layers. Пожалуйста, просмотрите полный исходный код для его формы.

Поскольку это проблема классификации, я использую softmax и вычисляю целочисленные и строковые метки на основе логита каждой категории, затем использую argmax и собираю. Я также рассчитал кросс-энтропийную потерю, которая аналогична любой другой оценке.

Одно из отличий состоит в том, что общему оценщику необходимо использовать метрики оценки в качестве словаря, тогда как TPUEstimator нужна функция, которую можно вызвать на управляющем ЦП или ТПУ. Поэтому способ указать метрику eval немного отличается.

Если вы используете TPU, используемый оптимизатор должен быть обернут в CrossShardOptimizer. Это распределяет задачи оптимизации между разными ядрами.

Операция обучения состоит в том, чтобы свести к минимуму эти потери оптимизации между осколками. Пожалуйста, используйте оптимизатор.минимизировать() вместо Layers.optimize_loss().

Объединяя все вышеперечисленные операции, система возвращает спецификацию оценщика TPU.

6. Цикл обучения и оценки

Возможно, вы знакомы с циклом train_and_evaluate для оценщиков. Жаль, что этот цикл (пока) не работает эффективно с TPU. К счастью, создать собственный цикл не так уж сложно, что дает вам больше контроля над тем, как часто и что проверять (вспомните сценарий, в котором вы хотите свести к минимуму переключение контекста и накладные расходы ввода-вывода, вызванные слишком частыми проверками).

1    def train_and_evaluate(output_dir, hparams):
2        STEPS_PER_EVAL = 1000    
3        max_steps = hparams['train_steps'] 
4        eval_batch_size = min(1024, hparams['num_eval_images']) 
5        eval_batch_size = eval_batch_size - eval_batch_size % 8  # divisible by num_cores 
6        tf.logging.info('train_batch_size=%d  eval_batch_size=%d  max_steps=%d', 
7                                hparams['train_batch_size'], 
8                                eval_batch_size, 
9                                max_steps)
10
11        # TPU change 3
12        if hparams['use_tpu']:    
13            tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
14                hparams['tpu'],
15                zone=hparams['tpu_zone'],
16                project=hparams['project'])
17            config = tf.contrib.tpu.RunConfig(
18                cluster=tpu_cluster_resolver,
19                model_dir=output_dir,    
20                save_checkpoints_steps=STEPS_PER_EVAL,
21                tpu_config=tf.contrib.tpu.TPUConfig(
22                    iterations_per_loop=STEPS_PER_EVAL,
23                    per_host_input_for_training=True))
24        else:
25            config = tf.contrib.tpu.RunConfig()
26
27        estimator = tf.contrib.tpu.TPUEstimator(  # TPU change 4
28            model_fn=image_classifier,
29            config=config,
30            params=hparams,
31            model_dir=output_dir,
32            train_batch_size=hparams['train_batch_size'],
33            eval_batch_size=eval_batch_size,
34            use_tpu=hparams['use_tpu']
35        )

Во-первых, извлеките некоторые аргументы командной строки и используйте их, чтобы указать максимальное количество шагов и размеры пакетов для обучения и оценки.

Далее находим ТПУ. Если вы создали свой собственный облачный TPU в Google Compute Engine, он может быть назван. Предположим, это имя («tpu») передается в качестве аргумента командной строки. Если вы используете Cloud ML Engine, такие вещи, как имена и регионы TPU, выводятся автоматически. Обязательно делайте это, только если установлен флаг use_tpu. Если пользователь запускает программу на ЦП или ГП, просто создайте пустой файл RunConfig.

Затем создайте TPUEstimator с функцией модели, конфигурацией, параметрами и размером пакета. Как только оценщик создан, мы можем войти в реальный цикл обучения и оценки:

1        # load last checkpoint and start from there
2        current_step = load_global_step_from_checkpoint_dir(output_dir)
3        steps_per_epoch = hparams['num_train_images'] // hparams['train_batch_size']
4        tf.logging.info('Training for %d steps (%.2f epochs in total). Current'
5                               ' step %d.',
6                               max_steps,
7                               max_steps / steps_per_epoch,
8                               current_step)
9
10        start_timestamp = time.time()  # This time will include compilation time 
11
12        while current_step < hparams['train_steps']:
13            # Train for up to steps_per_eval number of steps.
14            # At the end of training, a checkpoint will be written to --model_dir.
15            next_checkpoint = min(current_step + STEPS_PER_EVAL, max_steps)
16            estimator.train(input_fn=train_input_fn, max_steps=next_checkpoint)
17            current_step = next_checkpoint
18            tf.logging.info('Finished training up to step %d. Elapsed seconds %d.',
19                                    next_checkpoint, int(time.time() - start_timestamp))
20
21            # Evaluate the model on the most recent model in --model_dir.
22            # Since evaluation happens in batches of --eval_batch_size, some images
23            # may be excluded modulo the batch size. As long as the batch size is
24            # consistent, the evaluated images are also consistent.
25            tf.logging.info('Starting to evaluate at step %d', next_checkpoint)
26            eval_results = estimator.evaluate(
27                input_fn=eval_input_fn,
28                steps=hparams['num_eval_images'] // eval_batch_size)
29            tf.logging.info('Eval results at step %d: %s', next_checkpoint, eval_results)
30
31        elapsed_time = int(time.time() - start_timestamp)
32        tf.logging.info('Finished training up to step %d. Elapsed seconds %d.',
33                                max_steps, elapsed_time) 

Оценщик TensorFlow работает, выполняя теплый перезапуск из ранее существующей контрольной точки. Мы можем загрузить контрольную точку, предоставленную в выходном каталоге для репликации. Затем мы пошагово выполняем шаги обучающих данных train_batch_size за один раз, пока не будет достигнуто указанное максимальное количество шагов.

В примерах в этой статье я оценил каждую контрольную точку в полном наборе данных для оценки, но, очевидно, вы можете уменьшить вычислительные затраты на это обучение.

7. Экспортируйте модель для использования

Наконец, закончив обучение, я экспортирую сохраненную модель. Вы можете использовать TF Serving или Cloud ML Engine, чтобы развернуть сохраненные модели и делать прогнозы.

1        # export similar to Cloud ML Engine / TF Serving convention
2        tf.logging.info('Starting to export model.')
3        estimator.export_savedmodel(
4            export_dir_base=os.path.join(output_dir, 'export/exporter'),
5            serving_input_receiver_fn=serving_input_fn)

На данный момент у нас есть пользовательская модель оценщика, которую можно обучить в Cloud TPU. Модели записываются таким образом (например, с использованием флага use_tpu и предоставлением транспонирования в качестве опции), и один и тот же код поддерживает множество различных аппаратных средств, включая процессоры и графические процессоры. Таким образом, нашу модель оценки можно использовать для всех трех типов оборудования.

Следующие шаги:

Загрузите код, сопровождающий эту статью, с GitHub и попробуйте его. Запустите лабораторию кода, чтобы узнать, как запустить ResNet на TPU для обучения ваших собственных данных (без написания кода).

Примечание: ссылка на кодGitHub.com/Googlecloud…

Запуск ссылок ResNet на TPUкод labs.developers.google.com/code labs/vote…

Пройдите курс специализации «Машинное обучение с TensorFlow» на Coursera; этот курс представляет собой пошаговое введение в концепции TensorFlow и обучение, настройку и развертывание моделей машинного обучения в масштабе Google Cloud.

Примечание. Машинное обучение со ссылкой на TensorFlowwoohoo.course RA.org/специальный чат…