Введение
OpenMLDBЭто проект базы данных с открытым исходным кодом, оптимизированный для сценариев ИИ, в котором реализован вычислительный движок для автономных сценариев MPP и онлайн-сценариев OLTP, совместимых с данными и вычислениями. Движок MPP может быть реализован на основе Spark, а производительность может быть улучшена в несколько раз за счет расширения исходного кода Spark. В этой статье в основном объясняется, как OpenMLDB решает проблему перекоса данных окна на основе Spark.
задний план
Перекос данных — это распространенное явление в сценариях обработки больших данных, вызванное чрезмерным объемом данных в определенном разделе. Перекос данных приведет к огромному разрыву между временем вычислений перекошенного раздела и других разделов.Другими словами, вычислительные задачи перекошенного раздела данных серьезно не соответствуют его ресурсам процессора. В итоге будет ситуация ожидания на один больше одного.После завершения расчета нескольких разделов с малым объемом данных ждут наклонного раздела с большим объемом данных.Только после расчета наклонного раздел завершен, результат можно вывести. Это огромная катастрофа для эффективности.
В расчете функций машинного обучения задействовано множество оконных вычислений. При расчете окна, если объем данных одного ключа слишком велик, это также приведет к слишком большому количеству данных в определенном разделе, что приведет к проблеме перекоса данных. Однако традиционные решения по оптимизации секционирования данных, такие как добавление префиксов к данным и последующее секционирование, не подходят для сценариев оконных вычислений. Это приведет к тому, что окончательный результат расчета будет неправильным в сценарии расчета окна. Поэтому OpenMLDB предлагает схему оптимизации на основе Spark для асимметричного разделения данных окна.После расширения данных окна асимметричные данные повторно секционируются в соответствии с ключом раздела и временным интервалом.
Введение в перекос данных
SELECT sum(Amount) OVER w AS sum,
FROM input
WINDOW w as (PARTITION By Gender Order By Time ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
В данных на приведенном выше рисунке из-за того, что первичный ключ «Пол» имеет только два значения, автономное вычисление может в лучшем случае разделить данные только на два раздела, то есть степень параллелизма равна только 2. В это время одни и те же ресурсы раздела имеют большую разницу в объеме данных вычислительных задач. При последующем расчете время расчета раздела, в котором находится «мужчина», должно быть больше, чем время расчета раздела, в котором находится «женщина». Когда объем данных в наклонном разделе становится больше, этот временной разрыв будет продолжать увеличиваться. А поскольку в базовом исполнении spark есть только один поток на партицию, за весь цикл stage работают только два потока, а многие другие потоки всегда простаивают, что также приведет к серьезным потерям производительности.
Традиционное решение для искажения данных
Для оптимизации искаженных данных решение основной проблемы заключается в повторном разбиении искаженных данных и распределении огромных блоков данных в одном искаженном разделе на несколько небольших разделов данных. Таким образом достигается цель разделения больших данных для повышения эффективности вычислений.
Среди распространенных стратегий перераспределения данных есть стратегии перераспределения путем добавления различных префиксов к ключу секции, а также стратегии перераспределения путем добавления еще нескольких столбцов в качестве ключа секции. Однако эти простые схемы перераспределения вызовут ошибки расчета окна.
Если принята простая схема оптимизации разделов с префиксом данных и последующим разделением, данные, изначально находящиеся в одном разделе, будут разделены на разные разделы. Оконный расчет предполагает скользящие значения между данными, поэтому, если данные в разделе просто разделены, оконный расчет не сможет получить исходные соседние данные, что приведет к ошибкам в конечном результате расчета.
Схема оптимизации наклона окна OpenMLDB
вся идея
Общая идея нашего решения заключается в дальнейшем обеспечении правильности блоков данных каждого переразбиения при расчете окна на основе вышеупомянутого переразбиения искаженных данных. Метод, принятый в схеме, заключается в выполнении определенного расширения данных окна в каждом перераспределенном блоке данных в соответствии с количеством столбцов данных, которые окно должно скользить.
При оптимизации для разделения данных обычно используется стратегия переразбиения + добавление окна. Идея состоит в том, чтобы использовать стратегию "пространство во времени". Преимущество заключается в том, что время расчета мало, а производительность высока. Недостатком является то, что дополнительные данные окна вызовут определенную избыточность данных, что приведет к большему использованию памяти. .
Технические детали этого решения подробно описаны ниже Конкретная реализация решения по оптимизации наклона в основном разделена на пять шагов, на примере следующего SQL.
SELECT SUM(Amount) OVER W1 AS sum
FROM InputTable
WINDOW W1 AS (PARTITION BY Gender ORDER BY Time ROWS PRECEDING 2 AND CURRENT ROW)
Шаг 1: Оценка данных — распределение данных ключей разделов статистического окна
На этом этапе требуется оценка общих данных и подсчитываются некоторые соответствующие показатели, такие как разделительная линия разделения данных и количество данных в разделе. Параметры вводятся следующим образом.
имя параметра | объяснять |
---|---|
Quantile | Разделение данных определяется входящим параметром «Квантиль», и мы используем механизм n-деления, квантиль = 4 представляет собой квартиль (не обязательно гарантирующий строгое квартирование). В соответствии с параметром «Квантиль» мы можем разделить разделительную линию «процентиль_i» на разные значения, а разные блоки данных можно разделить в соответствии со значением данных относительно разделительной линии. |
PRECENTILE | По колонке ("Время") (значение Order By в окне SQL) делится разделительная линия блока данных, PERCENTILE_i - i-я разделительная линия, а данные в колонке ("Time") соответствуют до (PERCENTILE_i, PERCENTILE_i+1] — i-й блок данных. Особый случай: первый блок данных — (0, PERCENTILE_1], последний блок — (PERCENTILE_n, бесконечность) |
Вообще говоря, первым шагом оценки данных является подсчет и расчет различных показателей данных, а после статистики - оценка и обработка данных, но, поскольку это включает в себя обход всего объема данных, наступит время -потребление. У нас также есть дополнительная оптимизация для этого, мы поддерживаем пропуск статистической части на первом шаге путем чтения предварительно обработанной таблицы распределения. Таким образом, статистические задачи можно выполнять рано утром или когда бизнес-обработка не требуется, а результаты данных можно подсчитывать, чтобы пользователи не ждали слишком долго на первом этапе, когда им нужно выполнить логику обработки.
// Use skew config
val distributionDf = ctx.getSparkSession.read.parquet(ctx.getConf.windowSkewOptConfig)
logger.info("Load distribution dataframe")
Шаг 2: Маркировка данных - Отметьте номер переразбиения
На этом этапе данные разделяются в соответствии со статистическими результатами данных в таблице распределения, и («PART_ID») и («EXPANDED_ROW») помечаются на разделенных данных как метки разделов после повторного разделения различных блоков данных и являются ли они маркерами для дополненных данных.
В первоначальном Присоединении мы использовали Широковещательное Присоединение, чтобы повысить эффективность Присоединения. Широковещательное соединение — это тип соединения в Spark, который позволяет избежать перетасовки. Как правило, широковещательное соединение можно использовать при объединении большой и малой таблиц. Оно передает данные маленькой таблицы на каждый вычислительный узел Executor, а затем агрегирует их. через карту, чтобы избежать перетасовки данных. В нашей таблице таблица распределения намного меньше таблицы ввода, поэтому мы можем просто использовать широковещательное соединение.
После соединения можно получить разделительную линию данных, а когда PERCENTILE_i является i-й разделительной линией, данные, соответствующие (PERCENTILE_i, PERCENTILE_i+1], являются i-м блоком данных, а результат делится по фиксированной стратегии Разделите результат и сгенерируйте новую метку раздела - "PART_ID". Данные таблицы вводятся следующим образом.
имя столбца | объяснять |
---|---|
PART_ID | Представляет идентификатор перераспределения. В AddColumnTable строки с одинаковым идентификатором PART_ID и ключом раздела принадлежат новому разделу. Например, две строки с идентификатором = 1 и идентификатором = 3 принадлежат одному разделу. |
EXPANDED_ROW | Указывает, является ли текущая строка данными расширенного окна, значение по умолчанию — false. На следующих шагах значение этого столбца вновь развернутых данных окна равно true. |
Третий шаг: расширение данных - расширить окно данных для данных разных блоков
Расширение данных окна является основной частью оптимизации наклона окна OpenMLDB. Из-за большого количества данных для облегчения понимания ниже показана только «мужская» часть данных.
В конкретной реализации мы расширяем все данные окна для каждого блока данных, который необходимо расширить, то есть путем обхода каждый переразбиваемый блок данных, который необходимо расширить, расширяется до первых данных. Схема процесса выглядит следующим образом, темный цвет представляет собой проходимую в данный момент секцию, а светлый цвет представляет данные окна, которые необходимо дополнить текущим разделом.
1. Отфильтруйте данные, которые необходимо расширить
Для времени 1 и 3 первый перераспределенный блок данных с «PART_ID» = 1 будет пропущен, поскольку это первый блок данных без дополнительных данных.
Для второго перераспределенного блока данных, время которого равно 5 и "PART_ID" = 2, будут удалены все данные перед текущим блоком данных, то есть блок данных с "PART_ID" = 1.
Для третьего перераспределенного блока данных, время которого равно 7, "PART_ID" = 3 то же самое, и все данные перед текущим блоком данных вынимаются, то есть первый и второй блоки данных берутся как расширенные. данные.
Последующий четвертый блок данных перераспределения также аналогичен предыдущему, и все необходимые данные удаляются, поэтому здесь он не повторяется.
2. Меняем ID отфильтрованных данных и делаем Union
После извлечения данных нам также нужно изменить ("EXPANDED_ROW") на true, что представляет данные расширенного окна. После модификации («EXPANDED_ROW») нам нужно только постоянно объединяться с исходной таблицей AddColumn, и мы завершили расширение данных окна блока данных. Взяв в качестве примера второй блок данных, в приведенной ниже таблице объединения разные цвета представляют разные перераспределенные блоки данных.Можно видеть, что после фильтрации и объединения второй блок данных был расширен данными.
Для других окон блока данных метод расширения такой же, как и для второго метода блока данных.После фильтрации и расширения выполните Unoin с предыдущей таблицей Union.
Ниже показана окончательная таблица объединения, полученная после того, как четвертый блок данных расширит данные в окне.
Шаг 4. Разбиение данных — перераспределение на основе ключа перераспределения
Хотя раньше мы использовали разные цветовые блоки для обозначения разных перераспределенных данных, на самом деле на четвертом этапе мы действительно перераспределили данные.На нижнем уровне мы полагаемся на функцию перераспределения в Spark для перераспределения данных. После третьего шага мы можем получить окончательную Union Table.На данный момент нам нужно только переразбить по ключу раздела («Gender») и («PART_ID»), а данные можно разбить по разным исполнителям.
Шаг 5: Расчет данных — расчет секционированных данных
На третьем шаге мы знаем, что эти столбцы данных с «EXPANDED_ROW» = false являются недавно добавленными данными окна, и в фактическом вычислении им не нужно участвовать в вычислении. Следовательно, необходимо только выполнить расчет окна по данным "EXPANDED_ROW" = true, и, наконец, можно получить результат расчета.
Стоит отметить, что, поскольку базовый механизм обработки OpenMLDB разработан и разработан независимо, внутренняя логика расчета окна также реализуется OpenMLDB. Соответствующий код размещен ниже для объяснения.
repartitionDf.rdd.mapPartitionsWithIndex {
case (partitionIndex, iter) =>
val computer = WindowAggPlanUtil.createComputer(partitionIndex, hadoopConf, sparkFeConfig, windowAggConfig)
windowAggIter(computer, iter, sparkFeConfig, windowAggConfig)
}
Для repartitionDf, сгенерированного на четвертом шаге, мы вызываем метод Spark mapPartitionsWithIndex во внешнем слое. После этого для каждого раздела OpenMLDB строит компьютерный вычислительный блок для обработки следующего расчета окна. После этого официально выполняется расчет окна и вызывается метод windowAggIter.
InputIter.flatMap(row => {
if (lastRow != null) {
computer.checkPartition(row, lastRow)
}
lastRow = row
val orderKey = computer.extractKey(row)
val expandedFlag = row.getBoolean(config.expandedFlagIdx)
if (!isValidOrder(orderKey)) {
None
} else if (!expandedFlag) {
Some(computer.compute(row, orderKey, config.keepIndexColumn, config.unionFlagIdx))
} else {
computer.bufferRowOnly(row, orderKey)
None
}
})
В методе windowAggIter мы выполняем операцию flatMap над входящим итератором InputIter, а затем проверяем, неправильно ли классифицированы данные в партиции.Если есть неправильно классифицированная строка, окно будет сброшено. Затем, после проверки того, что с orderKey нет проблем, он оценит расширенный флаг ("EXPANDED_ROW") на приведенном выше рисунке. Если это правда, это доказывает, что текущая строка является расширенными данными, поэтому компьютерный вычислительный блок выполняет только операция bufferRowOnly и кэширует расширенные данные.Данные окна хранятся в памяти, которая используется для данных, которые фактически необходимо вычислить позже. Если оно ложно, расширенный флаг в это время также имеет значение ложно, и компьютерный вычислительный блок выполнит реальный расчет.В методе вычисления ранее кэшированные данные будут прочитаны и рассчитаны, а затем будет возвращена обработанная строка. Метод вычисления реализован внутри c, и заинтересованные студенты могут проверить его.OpenMLDBсоответствующий исходный код.
Сравнительный тест производительности
Тест производительности Benchmark использует общедоступный набор данных Kaggle, который является набором данных конкурса «Продолжительность поездки на такси в Нью-Йорке».
SELECT
sum(vendor_id) over w as w_sum_vendor_id,
max(vendor_id) over w as w_max_vendor_id,
min(vendor_id) over w as w_min_vendor_id,
avg(vendor_id) over w as w_avg_vendor_id,
sum(pickup_longitude) over w as w_sum_pickup_longitude,
max(pickup_longitude) over w as w_max_pickup_longitude,
min(pickup_longitude) over w as w_min_pickup_longitude,
avg(pickup_longitude) over w as w_avg_pickup_longitude
FROM taxi_skew_all
WINDOW w as (partition by vendor_id order by pickup_datetime ROWS BETWEEN 10000 PRECEDING AND CURRENT ROW)
При сравнении версии с открытым исходным кодом SparkSQL и версии с открытым исходным кодом OpenMLDB для тестирования результаты тестирования следующие.
вычислительная машина | занимает много времени, чтобы вычислить |
---|---|
SparkSQL(Spark 3.0.0) | 950.98s |
OpenMLDB, оптимизация наклона не включена | 224.76s |
OpenMLDB, включить оптимизацию перекоса, количество перекосов разделов 2 | 140.74s |
OpenMLDB, включить оптимизацию перекоса, количество перекосов разделов 4 | 94.44s |
Можно видеть, что движок OpenMLDB по-прежнему имеет повышение производительности более чем в 4 раза по сравнению с движком Spark при различных коэффициентах наклона, даже если оптимизация наклона не включена.Это улучшение производительности в основном достигается за счет эффективного движка OpenMLDB. гарантировать. После того, как OpenMLDB включил оптимизацию наклона окна, отрегулировав количество повторных разбиений, по сравнению с OpenMLDB без оптимизации наклона, производительность может быть повышена примерно на 60–140 %.
Суммировать
OpenMLDB реализует оптимизацию перекоса данных при оконных вычислениях за счет расширения оконных данных и стратегии перераспределения данных. Стратегия обычно принимает идею пространства-в-времени, то есть наклонные данные, изначально сосредоточенные в одном разделе, расширяют данные окна в пространстве для хранения, а затем распределяют данные по нескольким разделам для параллельных вычислений, тем самым увеличивая параллелизм вычислений, в обмен на более короткое время вычислений, и в конечном итоге добиться значительного повышения эффективности. Кроме того, в ходе проверки данных мы обнаружили, что чем более асимметрично распределение, тем выше производительность OpenMLDB. В целом, для сценариев перекоса данных при оконных вычислениях оптимизация перекоса данных, реализованная OpenMLDB, дает хороший эффект.
В этом документе представлена распространенная проблема перекоса данных в скользящем окне, анализируется схема реализации OpenMLDB для решения перекоса данных и показаны окончательные результаты оптимизации производительности. Если вы заинтересованы в оптимизации Spark, крупномасштабных вычислениях функций, базе данных OpenMLDB и т. д., мы будем делиться другими похожими техническими статьями, пожалуйста, продолжайте обращать вниманиеСтолбец OpenMLDB.