Практика обратного отслеживания образцов в режиме реального времени в крупномасштабном машинном обучении

Большие данные

задний план

Автор занимается разработкой сервера понимания контента, в основном обрабатывающего текст, видео и аудио, и производящего соответствующие функции с помощью технологий NLP и CV, которые предоставляются рекомендательным, поисковым и рекламным службам и используются для обучения модели для отзыва и сортировки в соответствующих сценариях.

срок

  • Функции в реальном времени: то есть функции, которые меняются с течением времени, такие как последовательность экспозиции пользователя, просматривающего короткое видео за последние пять минут, и текущие функции изображения прямой трансляции. Такие функции обычно обновляются в режиме реального времени. время с использованием фреймворков потоковых вычислений, таких как flink, и функций реального времени. Оно хранится непосредственно в хранилище kv и динамически обновляется в реальном времени через flink.
  • Автономные функции: то есть функции, которые не меняются с течением времени в режиме реального времени, такие как возраст пользователя, пол и встраивание короткой обложки видео. Такие функции обычно генерируются один раз или обновляются один раз в день. Демографические характеристики, такие как возраст и пол, как правило, периодически пересчитываются, а рассчитанные результаты сохраняются в hdfs, а затем записываются в kv через spark или mapreduce; и такие функции, как встраивание короткого изображения обложки видео, после создания после этого будут напрямую записывается в kv, а копия также будет храниться в автономном хранилище данных.
  • Образцы в реальном времени: чтобы соответствовать требованиям к производительности во время онлайн-обслуживания, офлайн-функции и функции в реальном времени будут храниться в kv (например, redis, hbase). В качестве RawSample для последующего обучения модели этот образец объединяется с внешние журналы воздействия, кликов и конверсий для создания соответствующей метки и, наконец, получения образца в реальном времени, необходимого для обучения модели.

необходимость

  • После того, как новые функции на стороне понимания контента будут доставлены в рекомендацию, поиск и рекламу, благодаря выборкам в реальном времени, необходимо выполнить накопление функций в Интернете, что обычно занимает более семи дней. довольно длинный, что влияет на итеративную скорость бизнеса по пониманию контента.Поэтому необходимо отследить исторические данные о функциях, а затем присоединиться к образцам в реальном времени, чтобы быстрее начать обучение автономной модели, проверить влияние функций в автономном режиме и улучшить общую производительность.экспериментальная скорость.

основная логика

структура данных

  • Офлайн-функция/Офлайн-функция
int64 featureId=1;
Feature feature=2;
  • Функция в реальном времени/Функция в реальном времени
int64 featureId=1;
Feature feature=2;
int64 timestamp=3;
  • Уведомление:

FeatureId может быть не типа int64, а типа string.В настоящее время MurmurHash64 используется для хеширования строки и преобразования ее в int64 для повышения эффективности объединения; конечно, такая же операция требуется при обработке реальных образцы времени.

  • Живой образец/RawSample
int64 timestamp=1;
repeated features=2;

Образец в реальном времени может получить значение функции, соответствующее featureId, соответствующему функции в реальном времени и функции в автономном режиме, благодаря удобству функций, которое является основой для объединения образца и функции в реальном времени.

логика соединения

Обработка функций в реальном времени может быть получена

int64 timestamp=1;
repeated features=2;
int64 joinFeatureId=3;
  • Если вам нужно внедрить офлайн-функции пробного присоединения в реальном времени, вы можете напрямую присоединиться через featureId.
  • Когда необходимо реализовать выборки в реальном времени для объединения функций в реальном времени, функции в реальном времени необходимо агрегировать в последовательность в соответствии с идентификатором функции в определенном интервале времени, а затем время должно быть выровнено по началу. или конец временного интервала (например, целое число или час), а затем Используйте это время в сочетании с featureId в качестве ключа. Он также выравнивает время выборки в реальном времени, используя комбинацию времени и joinFeatureId в качестве ключа.При присоединении сравнивает отметку времени выборки в реальном времени с отметкой времени в последовательности функций в реальном времени и выбирает функцию с самая большая временная метка, которая не существует, и записывает ее в режиме реального времени.

Проблемы и решения

огромный размер выборки

  • В рекомендуемом сценарии автономное хранилище hdfs, занимаемое выборками в реальном времени, составляет 450 ГБ в час, что близко к 350 Вт, а размер одной выборки превышает 100 КБ. перекос данных, задача spark, скорее всего, завершится сбоем. Массивы, хранящиеся в protobuf, кодируются в строки с использованием base64, а затем сохраняются в формате быстрого сжатия.Стоимость сериализации и десериализации также довольно высока.

первая яма

  • Преобразование rdd в PairRdd во время декодирования.В это время накладные расходы на перемешивание и сериализацию очень велики, исполнитель выглядит oom, а задача spark аварийно завершает работу.
val rawRdd = sc.textFile(rawSamplePath) //读取实时实时样本
val pairRdd = rawRdd.map(row=>{
    //base64解码成二进制数组
    //讲二进制数组反序列化为pb对象
    //从pb对象中获取用于join的featureId
    (featureId,pb对象)
}

Самая большая проблема в этом процессе заключается в том, что после чтения файла выборки в реальном времени на hdfs количество разделов rdd равно количеству файлов hdfs, которых на данный момент 80. При декодировании выполняется десериализация и генерация парыRdd в то же время 80 разделов очень велики, легко происходит перекос данных, что в конечном итоге приводит к сбою задачи spark, и в то же время исполнитель, применяемый spark, не может быть полностью использован, и общая скорость будет очень медленной. . Решение:

val rawRdd = sc.textFile(rawSamplePath).repartition(1000) //读取实时实时样本
val pairRdd = rawRdd.map(row=>{
    //base64解码成二进制数组
    //讲二进制数组反序列化为pb对象
    //从pb对象中获取用于join的featureId
    (featureId,pb对象)
}

После прочтения файла hdfs выполните переразбиение, степень параллелизма будет больше, пропорция перекоса данных будет намного меньше, а давление перетасовки, декодирования и десериализации одного исполнителя будет значительно уменьшено, и будет почти нет исполнителя om.

Загадка присоединения

После создания PairRdd, соответствующего функции реального времени, и функции, если соединение выполняется напрямую, данные будут искажены из-за большого масштаба функции в реальном времени и неравномерного распределения соответствующего идентификатора функции, что приведет к сбою. всей искровой задачи.

первое решение

  • Выборки в реальном времени агрегируются в соответствии с featureId, сортируются в соответствии с количеством выборок в реальном времени, соответствующих разным featureId, а функции в реальном времени, соответствующие первым N featureId головы, объединяются в широковещательном режиме. явление перекоса в среднем хвосте значительно меньше.

Проблемы с первым решением

  • Для разных featureIds их распределение в выборках в реальном времени очень разное, а у некоторых в лучшем случае меньше 100, но задача spark по-прежнему падает.В настоящее время трудно подтвердить, сколько featureIds в голове используется для широковещания присоединиться.

Второе решение

  • Поскольку объект выборки в реальном времени очень большой, при использовании featureId для объединения небольшая асимметрия данных приведет к сбою задачи spark, тогда мы назначим уникальный идентификатор каждому объекту выборки в реальном времени, сгенерируем uniqId и use PairRdd(featureId, uniqId) Соединение с функциями в реальном времени. После завершения соединения получается PairRdd(uniqId, Feature>. Соединение PairRdd(uniqId, rawSample) выполняется на образцах в реальном времени, поэтому перекос данных.

Яма, обнаруженная в процессе реализации

  • В процессе генерации uniqId сначала был выбран случайный метод uuid.При присоединении было обнаружено, что результат соединения парыRdd, последние два ключа которого являются uniqId, пуст. Это связано с тем, что сгенерированный rdd для выборок в реальном времени слишком большой для кэширования, он будет пересчитан, и, естественно, uniqId будет другим.
  • Затем попробуйте сохранить rdd.Поскольку rdd слишком большой, выберите StorageLevel.MEMORY_AND_DISK_SER, но искра все равно вылетает.Это должно быть потому, что rdd слишком велик.Во время записи на жесткий диск и процесса чтения, io, сериализации , а десериализация обходятся дорого. , что приводит к oom.
  • Наконец, из RawSample получается reqId, и MurmurHash64 используется в качестве reqId для создания 64-битного uiqId, представляющего уникальный идентификатор RawSample.

Суммировать

Текущее решение, ограниченное уровнем разработки искры автора, все еще очень грубое, особенно для двух частей хранения и сериализации, есть еще много мест для глубокого копания, а также необходимо следить за механизмом хранения и сериализации spark и После более глубокого изучения сериализации protobuf проводится соответствующая оптимизация.