задний план
Автор занимается разработкой сервера понимания контента, в основном обрабатывающего текст, видео и аудио, и производящего соответствующие функции с помощью технологий NLP и CV, которые предоставляются рекомендательным, поисковым и рекламным службам и используются для обучения модели для отзыва и сортировки в соответствующих сценариях.
срок
- Функции в реальном времени: то есть функции, которые меняются с течением времени, такие как последовательность экспозиции пользователя, просматривающего короткое видео за последние пять минут, и текущие функции изображения прямой трансляции. Такие функции обычно обновляются в режиме реального времени. время с использованием фреймворков потоковых вычислений, таких как flink, и функций реального времени. Оно хранится непосредственно в хранилище kv и динамически обновляется в реальном времени через flink.
- Автономные функции: то есть функции, которые не меняются с течением времени в режиме реального времени, такие как возраст пользователя, пол и встраивание короткой обложки видео. Такие функции обычно генерируются один раз или обновляются один раз в день. Демографические характеристики, такие как возраст и пол, как правило, периодически пересчитываются, а рассчитанные результаты сохраняются в hdfs, а затем записываются в kv через spark или mapreduce; и такие функции, как встраивание короткого изображения обложки видео, после создания после этого будут напрямую записывается в kv, а копия также будет храниться в автономном хранилище данных.
- Образцы в реальном времени: чтобы соответствовать требованиям к производительности во время онлайн-обслуживания, офлайн-функции и функции в реальном времени будут храниться в kv (например, redis, hbase). В качестве RawSample для последующего обучения модели этот образец объединяется с внешние журналы воздействия, кликов и конверсий для создания соответствующей метки и, наконец, получения образца в реальном времени, необходимого для обучения модели.
необходимость
- После того, как новые функции на стороне понимания контента будут доставлены в рекомендацию, поиск и рекламу, благодаря выборкам в реальном времени, необходимо выполнить накопление функций в Интернете, что обычно занимает более семи дней. довольно длинный, что влияет на итеративную скорость бизнеса по пониманию контента.Поэтому необходимо отследить исторические данные о функциях, а затем присоединиться к образцам в реальном времени, чтобы быстрее начать обучение автономной модели, проверить влияние функций в автономном режиме и улучшить общую производительность.экспериментальная скорость.
основная логика
структура данных
- Офлайн-функция/Офлайн-функция
int64 featureId=1;
Feature feature=2;
- Функция в реальном времени/Функция в реальном времени
int64 featureId=1;
Feature feature=2;
int64 timestamp=3;
- Уведомление:
FeatureId может быть не типа int64, а типа string.В настоящее время MurmurHash64 используется для хеширования строки и преобразования ее в int64 для повышения эффективности объединения; конечно, такая же операция требуется при обработке реальных образцы времени.
- Живой образец/RawSample
int64 timestamp=1;
repeated features=2;
Образец в реальном времени может получить значение функции, соответствующее featureId, соответствующему функции в реальном времени и функции в автономном режиме, благодаря удобству функций, которое является основой для объединения образца и функции в реальном времени.
логика соединения
Обработка функций в реальном времени может быть получена
int64 timestamp=1;
repeated features=2;
int64 joinFeatureId=3;
- Если вам нужно внедрить офлайн-функции пробного присоединения в реальном времени, вы можете напрямую присоединиться через featureId.
- Когда необходимо реализовать выборки в реальном времени для объединения функций в реальном времени, функции в реальном времени необходимо агрегировать в последовательность в соответствии с идентификатором функции в определенном интервале времени, а затем время должно быть выровнено по началу. или конец временного интервала (например, целое число или час), а затем Используйте это время в сочетании с featureId в качестве ключа. Он также выравнивает время выборки в реальном времени, используя комбинацию времени и joinFeatureId в качестве ключа.При присоединении сравнивает отметку времени выборки в реальном времени с отметкой времени в последовательности функций в реальном времени и выбирает функцию с самая большая временная метка, которая не существует, и записывает ее в режиме реального времени.
Проблемы и решения
огромный размер выборки
- В рекомендуемом сценарии автономное хранилище hdfs, занимаемое выборками в реальном времени, составляет 450 ГБ в час, что близко к 350 Вт, а размер одной выборки превышает 100 КБ. перекос данных, задача spark, скорее всего, завершится сбоем. Массивы, хранящиеся в protobuf, кодируются в строки с использованием base64, а затем сохраняются в формате быстрого сжатия.Стоимость сериализации и десериализации также довольно высока.
первая яма
- Преобразование rdd в PairRdd во время декодирования.В это время накладные расходы на перемешивание и сериализацию очень велики, исполнитель выглядит oom, а задача spark аварийно завершает работу.
val rawRdd = sc.textFile(rawSamplePath) //读取实时实时样本
val pairRdd = rawRdd.map(row=>{
//base64解码成二进制数组
//讲二进制数组反序列化为pb对象
//从pb对象中获取用于join的featureId
(featureId,pb对象)
}
Самая большая проблема в этом процессе заключается в том, что после чтения файла выборки в реальном времени на hdfs количество разделов rdd равно количеству файлов hdfs, которых на данный момент 80. При декодировании выполняется десериализация и генерация парыRdd в то же время 80 разделов очень велики, легко происходит перекос данных, что в конечном итоге приводит к сбою задачи spark, и в то же время исполнитель, применяемый spark, не может быть полностью использован, и общая скорость будет очень медленной. . Решение:
val rawRdd = sc.textFile(rawSamplePath).repartition(1000) //读取实时实时样本
val pairRdd = rawRdd.map(row=>{
//base64解码成二进制数组
//讲二进制数组反序列化为pb对象
//从pb对象中获取用于join的featureId
(featureId,pb对象)
}
После прочтения файла hdfs выполните переразбиение, степень параллелизма будет больше, пропорция перекоса данных будет намного меньше, а давление перетасовки, декодирования и десериализации одного исполнителя будет значительно уменьшено, и будет почти нет исполнителя om.
Загадка присоединения
После создания PairRdd, соответствующего функции реального времени, и функции, если соединение выполняется напрямую, данные будут искажены из-за большого масштаба функции в реальном времени и неравномерного распределения соответствующего идентификатора функции, что приведет к сбою. всей искровой задачи.
первое решение
- Выборки в реальном времени агрегируются в соответствии с featureId, сортируются в соответствии с количеством выборок в реальном времени, соответствующих разным featureId, а функции в реальном времени, соответствующие первым N featureId головы, объединяются в широковещательном режиме. явление перекоса в среднем хвосте значительно меньше.
Проблемы с первым решением
- Для разных featureIds их распределение в выборках в реальном времени очень разное, а у некоторых в лучшем случае меньше 100, но задача spark по-прежнему падает.В настоящее время трудно подтвердить, сколько featureIds в голове используется для широковещания присоединиться.
Второе решение
- Поскольку объект выборки в реальном времени очень большой, при использовании featureId для объединения небольшая асимметрия данных приведет к сбою задачи spark, тогда мы назначим уникальный идентификатор каждому объекту выборки в реальном времени, сгенерируем uniqId и use PairRdd(featureId, uniqId) Соединение с функциями в реальном времени. После завершения соединения получается PairRdd(uniqId, Feature>. Соединение PairRdd(uniqId, rawSample) выполняется на образцах в реальном времени, поэтому перекос данных.
Яма, обнаруженная в процессе реализации
- В процессе генерации uniqId сначала был выбран случайный метод uuid.При присоединении было обнаружено, что результат соединения парыRdd, последние два ключа которого являются uniqId, пуст. Это связано с тем, что сгенерированный rdd для выборок в реальном времени слишком большой для кэширования, он будет пересчитан, и, естественно, uniqId будет другим.
- Затем попробуйте сохранить rdd.Поскольку rdd слишком большой, выберите StorageLevel.MEMORY_AND_DISK_SER, но искра все равно вылетает.Это должно быть потому, что rdd слишком велик.Во время записи на жесткий диск и процесса чтения, io, сериализации , а десериализация обходятся дорого. , что приводит к oom.
- Наконец, из RawSample получается reqId, и MurmurHash64 используется в качестве reqId для создания 64-битного uiqId, представляющего уникальный идентификатор RawSample.
Суммировать
Текущее решение, ограниченное уровнем разработки искры автора, все еще очень грубое, особенно для двух частей хранения и сериализации, есть еще много мест для глубокого копания, а также необходимо следить за механизмом хранения и сериализации spark и После более глубокого изучения сериализации protobuf проводится соответствующая оптимизация.