Введение
SparkВ настоящее время это самая популярная платформа распределенной пакетной обработки больших данных.Используя Spark, вы можете легко реализовать операции SQL с сотнями гигабайт или даже с данными уровня T, например, вычисление признаков одной строки или объединение нескольких таблиц.
OpenMLDBЭто проект базы данных с открытым исходным кодом, оптимизированный для сценариев ИИ, в котором реализован вычислительный движок для автономных сценариев MPP и онлайн-сценариев OLTP, совместимых с данными и вычислениями. На самом деле движок MPP можно реализовать на основе Spark, а производительность можно повысить в несколько раз за счет расширения исходного кода Spark.
Сам Spark также очень эффективен.Он реализует стандартный ANSI SQL лексический анализ и синтаксический анализ на основе Antlr, а также реализует большое количество статических оптимизаций SQL в модуле Catalyst, а затем преобразует их в распределенные вычисления RDD.Основная структура данных использует Java. Unsafe.API для настройки UnsafeRow распределения памяти, а также полагаться на JIT-компилятор Janino для динамической генерации оптимизированного байт-кода JVM для методов расчета. Тем не менее, все еще есть возможности для улучшения масштабируемости, особенно для сценариев машинного обучения, хотя требования могут быть выполнены, но неэффективны.В этой статье LastJoin используется в качестве примера, чтобы показать, как OpenMLDB может достичь производительности в несколько раз или даже в десятки раз. улучшение за счет расширения исходного кода Spark.
Сценарий машинного обучения LastJoin
LastJoin — это особый тип написания таблиц, представленный в сценариях с ИИ. Это вариант LeftJoin. При соблюдении условий соединения каждая строка в левой таблице принимает только последнюю строку в правой таблице, которая соответствует отправке. Семантические характеристики LastJoin могут гарантировать, что количество строк в выходном результате после правописания будет соответствовать левой таблице ввода. В сценарии машинного обучения количество входных выборочных таблиц остается прежним, а конечное количество выборок не будет увеличиваться или уменьшаться из-за операций с данными, таких как проверка орфографии таблиц.Этот метод более удобен для онлайн-поддержки службы и больше в в соответствии с потребностями ученых в моделировании.
С точки зрения технической защиты разработка и реализация LastJoin являются патентами компании Fourth Paradigm (Beijing) Technology Co., Ltd., номер публикации 111611245A, дата публикации 01.09.2020. Код проекта OpenMLDB, включая функцию LastJoin, находится в Github с открытым исходным кодом по протоколу Apache 2.0, и все пользователи могут с уверенностью его использовать.
Реализация LastJoin на основе Spark
Поскольку тип LastJoin не является стандартом в ANSI SQL, он не реализован на основных вычислительных платформах, таких как SparkSQL.Для достижения аналогичных функций пользователи могут реализовать их только с помощью операторов более низкого уровня, таких как DataFrame или RDD. Идея реализации LastJoin на основе оператора Spark состоит в том, чтобы сначала добавить столбец индекса в левую таблицу, затем использовать стандартный LeftOuterJoin, и, наконец, уменьшить результат склейки и удалить строку индекса, хотя семантику LastJoin можно реализовать , по-прежнему существует большое узкое место в производительности.
По сравнению с совместимостью функций и синтаксиса SQL, еще одна особенность Spark заключается в том, что пользователи могут реализовывать логику числовых вычислений, которая не поддерживается стандартным SQL, с помощью таких интерфейсов, как отображение, уменьшение, группировка и пользовательские пользовательские функции. Однако пользователи функции Join не могут расширить реализацию с помощью DataFrame или RDD API, поскольку реализация таблицы соединений реализована в физическом узле Spark Catalyst, что включает в себя объединение нескольких внутренних строк после перемешивания и генерацию Java. исходные строки для JIT. Кроме того, в зависимости от объема данных различных входных таблиц, Spark своевременно выберет BrocastHashJoin, SortMergeJoin или ShuffleHashJoin для их реализации.Обычные пользователи не могут использовать RDD API для расширения алгоритмов реализации этих таблиц.
Вы можете просмотреть полную реализацию Spark LastJoin в проекте OpenMLDB, кодовый адресGitHub.com/4paradigm/О….
Первым шагом является расширение столбца индекса входной левой таблицы.Существует много способов реализации расширения.Пока каждая строка добавленного столбца индекса имеет уникальный идентификатор, ниже приведен код реализации первого шага.
// Add the index column for Spark DataFrame
def addIndexColumn(spark: SparkSession, df: DataFrame, indexColName: String, method: String): DataFrame = {
logger.info("Add the indexColName(%s) to Spark DataFrame(%s)".format(indexColName, df.toString()))
method.toLowerCase() match {
case "zipwithuniqueid" | "zip_withunique_id" => addColumnByZipWithUniqueId(spark, df, indexColName)
case "zipwithindex" | "zip_with_index" => addColumnByZipWithIndex(spark, df, indexColName)
case "monotonicallyincreasingid" | "monotonically_increasing_id" =>
addColumnByMonotonicallyIncreasingId(spark, df, indexColName)
case _ => throw new HybridSeException("Unsupported add index column method: " + method)
}
}
def addColumnByZipWithUniqueId(spark: SparkSession, df: DataFrame, indexColName: String = null): DataFrame = {
logger.info("Use zipWithUniqueId to generate index column")
val indexedRDD = df.rdd.zipWithUniqueId().map {
case (row, id) => Row.fromSeq(row.toSeq :+ id)
}
spark.createDataFrame(indexedRDD, df.schema.add(indexColName, LongType))
}
def addColumnByZipWithIndex(spark: SparkSession, df: DataFrame, indexColName: String = null): DataFrame = {
logger.info("Use zipWithIndex to generate index column")
val indexedRDD = df.rdd.zipWithIndex().map {
case (row, id) => Row.fromSeq(row.toSeq :+ id)
}
spark.createDataFrame(indexedRDD, df.schema.add(indexColName, LongType))
}
def addColumnByMonotonicallyIncreasingId(spark: SparkSession,
df: DataFrame, indexColName: String = null): DataFrame = {
logger.info("Use monotonicallyIncreasingId to generate index column")
df.withColumn(indexColName, monotonically_increasing_id())
}
Вторым шагом является выполнение стандартного LeftOuterJoin.Поскольку нижний слой OpenMLDB реализован на основе C++, выражения нескольких условий соединения должны быть преобразованы в выражения Spark (инкапсулированные в объекты Spark Column), а затем функция соединения Spark DataFrame можно вызвать.Используйте "left" или "left_outer" для типа соединения.
val joined = leftDf.join(rightDf, joinConditions.reduce(_ && _), "left")
Третий шаг — уменьшить сплайсированную таблицу, потому что можно расширить входные данные через LeftOuterJoin, то есть преобразование 1:N, и все новые строки имеют уникальный идентификатор первого шага для расширения столбца индекса , Поэтому достаточно уменьшить уникальный идентификатор.Здесь используются интерфейсы groupByKey и mapGroups Spark DataFrame (обратите внимание, что этот API не поддерживается в Spark 2.0).При этом, если есть дополнительное поле сортировки, можно получить максимальное или минимальное значение каждой группы.
val distinct = joined
.groupByKey {
row => row.getLong(indexColIdx)
}
.mapGroups {
case (_, iter) =>
val timeExtractor = SparkRowUtil.createOrderKeyExtractor(
timeIdxInJoined, timeColType, nullable=false)
if (isAsc) {
iter.maxBy(row => {
if (row.isNullAt(timeIdxInJoined)) {
Long.MinValue
} else {
timeExtractor.apply(row)
}
})
} else {
iter.minBy(row => {
if (row.isNullAt(timeIdxInJoined)) {
Long.MaxValue
} else {
timeExtractor.apply(row)
}
})
}
}(RowEncoder(joined.schema))
Последний шаг — просто удалить столбец индекса, что может быть достигнуто с помощью предварительно заданного имени столбца индекса.
distinct.drop(indexName)
Кратко опишите решение LastJoin на основе оператора Spark, которое в настоящее время является наиболее эффективной реализацией на основе интерфейса программирования Spark.Для младших версий, таких как Spark 1.6, необходимо использовать такие интерфейсы, как mapPartition, для реализации функций, аналогичных mapGroups. Поскольку она реализована на основе LeftOuterJoin, эта реализация LastJoin хуже, чем LeftOuterJoin, и фактический объем выходных данных меньше.Для левой таблицы и правой таблицы, когда может быть выполнено большое количество условий сплайсинга, общее потребление памяти все тот же очень огромный. Поэтому ниже представлен собственный LastJoin, реализованный на основе модификации исходного кода Spark, который позволяет избежать вышеуказанных проблем.
Расширить реализацию LastJoin исходного кода Spark
Собственная реализация LastJoin относится к функции LastJoin, реализованной непосредственно в исходном коде Spark, а не на основе Spark DataFrame и LeftOuterJoin, которые значительно оптимизированы по производительности и потреблению памяти. OpenMLDB использует настраиваемый оптимизированный дистрибутив Spark, а зависимый исходный код Spark также является открытым исходным кодом в Github.GitHub - 4paradigm/spark at v3.0.0-openmldb.
Для поддержки родного LastJoin последний синтаксис необходимо добавить в JoinType.Поскольку синтаксический анализ SQL, реализованный Spark на основе Antlr, также будет напрямую преобразовывать тип соединения SQL в JoinType, вам нужно только изменить файл JoinType.scala.
object JoinType {
def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
// Add by 4Paradigm
case "last" => LastJoinType
case "rightouter" | "right" => RightOuter
case "leftsemi" | "semi" => LeftSemi
case "leftanti" | "anti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"last", "leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
Реализация типа LastJoinType выглядит следующим образом.
// Add by 4Paradigm
case object LastJoinType extends JoinType {
override def sql: String = "LAST"
}
В исходном коде Spark также есть несколько классов проверки синтаксиса и классов оптимизатора, которые проверяют поддерживаемые внутри типы соединений, поэтому в файлах Analyzer.scala, Optimizer.scala, basicLogicalOperators.scala, SparkStrategies.scala все должны иметь простую модификацию. , случай переключения scala поддерживает добавление поддержки нового типа соединения к типу перечисления, что не будет повторяться здесь, если можно добавить синтаксический анализ и отсутствие поддержки нового типа перечисления во время выполнения.
// the output list looks like: join keys, columns from left, columns from right
val projectList = joinType match {
case LeftOuter =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
// Add by 4Paradigm
case LastJoinType =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
case LeftExistence(_) =>
leftKeys ++ lUniqueOutput
case RightOuter =>
rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
case FullOuter =>
// in full outer join, joinCols should be non-null if there is.
val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true))
case _ : InnerLike =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput
case _ =>
sys.error("Unsupported natural join type " + joinType)
}
После того, как анализ синтаксиса и структура данных поддержат новый тип соединения, основное внимание уделяется изменению кодов реализации трех физических операторов соединения Spark. Во-первых, когда правильная таблица относительно мала, Spark автоматически оптимизирует ее для BrocastHashJoin. В это время правильная таблица копируется в память всех исполнителей посредством широковещательной рассылки. Обходя правильную таблицу, можно найти все строки, соответствующие условию соединения. . Если правая таблица не соответствует условиям, внутренняя строка левой таблицы сохраняется, а значение поля правой таблицы равно нулю. Если одна или несколько строк соответствуют условиям, две внутренние строки объединяются в выходную внутреннюю строку. , Код реализован в BroadcastHashJoinExec.scala. Поскольку добавлен тип перечисления типа соединения, мы модифицируем эти два метода для поддержки этого типа соединения и отличаем его от предыдущей реализации типа соединения параметрами.
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
joinType match {
case _: InnerLike => codegenInner(ctx, input)
case LeftOuter | RightOuter => codegenOuter(ctx, input)
// Add by 4Paradigm
case LastJoinType => codegenOuter(ctx, input, true)
case LeftSemi => codegenSemi(ctx, input)
case LeftAnti => codegenAnti(ctx, input)
case j: ExistenceJoin => codegenExistence(ctx, input)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
}
}
Основной код реализации BrocastHashJoin также реализован с использованием JIT, поэтому нам нужно изменить логику codegen в строку кода Java.В функции codegenOuter сохраните исходную реализацию LeftOuterJoin и используйте предыдущие параметры, чтобы определить, следует ли использовать новую реализация типа соединения. Измененная здесь логика также очень проста, потому что новый тип соединения должен только убедиться, что в правильной таблице есть строка данных, а затем вернуться, поэтому нет необходимости проходить набор кандидатов правильной таблицы через while.
// Add by 4Paradigm
if (isLastJoin) {
s"""
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashRelation
|$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value});
|boolean $found = false;
|// the last iteration of this loop is to emit an empty row if there is no matched rows.
|if ($matches != null && $matches.hasNext() || !$found) {
| UnsafeRow $matched = $matches != null && $matches.hasNext() ?
| (UnsafeRow) $matches.next() : null;
| ${checkCondition.trim}
| if ($conditionPassed) {
| $found = true;
| $numOutput.add(1);
| ${consume(ctx, resultVars)}
| }
|}
""".stripMargin
}
Затем необходимо изменить реализацию SortMergeJoin для поддержки нового типа объединения.Если правая таблица слишком велика и не может быть передана напрямую, скорее всего, это будет реализовано с помощью SortMergeJoin.Принцип реализации аналогичен предыдущей модификации.Различие заключается в том, что он не реализован через JIT, поэтому напрямую измените логику таблицы правописания, чтобы гарантировать, что, пока есть строка, соответствующая условиям, ее можно соединить и вернуть.
private def bufferMatchingRows(): Unit = {
assert(streamedRowKey != null)
assert(!streamedRowKey.anyNull)
assert(bufferedRowKey != null)
assert(!bufferedRowKey.anyNull)
assert(keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
// This join key may have been produced by a mutable projection, so we need to make a copy:
matchJoinKey = streamedRowKey.copy()
bufferedMatches.clear()
// Add by 4Paradigm
if (isLastJoin) {
bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
advancedBufferedToRowWithNullFreeJoinKey()
} else {
do {
bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
advancedBufferedToRowWithNullFreeJoinKey()
} while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
}
}
Последняя реализация ShuffleHashJoin. Соответствующая реализация находится в подклассе HashJoin.scala. Принцип аналогичен предыдущему. При вызове функции externalJoin для обхода таблицы потока измените основную логику обхода, чтобы обеспечить сохраняется, и добавляется ноль, когда он не может быть написан по буквам. , вы можете вернуться сразу после написания строки.
private def outerJoin(
streamedIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
isLastJoin: Boolean = false): Iterator[InternalRow] = {
val joinedRow = new JoinedRow()
val keyGenerator = streamSideKeyGenerator()
val nullRow = new GenericInternalRow(buildPlan.output.length)
streamedIter.flatMap { currentRow =>
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
val buildIter = hashedRelation.get(rowKey)
new RowIterator {
private var found = false
override def advanceNext(): Boolean = {
// Add by 4Paradigm to support last join
if (isLastJoin && found) {
return false
}
// Add by 4Paradigm to support last join
if (isLastJoin) {
if (buildIter != null && buildIter.hasNext) {
val nextBuildRow = buildIter.next()
if (boundCondition(joinedRow.withRight(nextBuildRow))) {
found = true
return true
}
}
} else {
while (buildIter != null && buildIter.hasNext) {
val nextBuildRow = buildIter.next()
if (boundCondition(joinedRow.withRight(nextBuildRow))) {
found = true
return true
}
}
}
if (!found) {
joinedRow.withRight(nullRow)
found = true
return true
}
false
}
override def getRow: InternalRow = joinedRow
}.toScala
}
}
Изменив предыдущий тип соединения и три физических узла соединения, пользователь может использовать интерфейс SQL или DataFrame для выполнения новой логики составления таблиц, как и для других встроенных типов соединений.После составления таблиц убедитесь, что количество выходных строк соответствует левая таблица, а результат такой же, как и в предыдущей схеме на основе LeftOuterJoin + dropDuplicated.
Сравнение производительности реализации LastJoin
Теперь, когда реализован новый алгоритм соединения, давайте сравним производительность двух предыдущих решений.Предыдущее решение напрямую основано на последней версии Spark 3.0 с открытым исходным кодом.Без модификации оптимизатора Spark широковещательное соединение будет использоваться для оптимизации производительности для небольшие данные.Если пользователь напрямую использует скомпилированную версию модифицированного исходного кода Spark, Spark также будет оптимизирован для реализации широковещательного соединения с небольшими данными.
Во-первых, проверить случай, когда условие соединения может объединять несколько строк.Для LeftOuterJoin, поскольку он может объединять несколько строк, таблица, выводимая LeftOuterJoin на первом этапе, будет намного больше, а dropDupplication на втором этапе также займет больше времени. Возвращается после объединения в одну строку, поэтому производительность не снижается из-за объединения нескольких строк.
Из результатов также очевидна разница в производительности. Поскольку объем данных в правой таблице относительно невелик, Spark оптимизирует реализацию широковещательного соединения для этих трех наборов данных. Поскольку LeftOuterJoin будет объединять несколько строк, производительность намного медленнее, чем новый LastJoin. Когда объем данных увеличивается, объем данных в таблице результатов, склеенной LeftOuterJoin, становится еще более взрывоопасным, а производительность падает экспоненциально, что в десятки и сотни раз отличается от LastJoin. В конце концов, он может дать сбой из-за OOM, в то время как LastJoin не выйдет из строя из-за объема данных.
Несколько несправедливо, что решение LeftOuterJoin + dropDupilicated может соединять несколько строк в правой таблице, поэтому мы добавили новый тестовый сценарий, при сращивании мы гарантируем, что левая таблица может быть успешно склеена только со строкой в правой таблице, чтобы результаты операций LeftOuterJoin и LastJoin были точно такими же. , сравнение производительности в этом сценарии более значимо.
Судя по результатам, разница в производительности не так очевидна, но LastJoin по-прежнему почти в два раза быстрее, чем предыдущее решение.Первые две группы правых таблиц имеют относительно небольшой объем данных и оптимизированы Spark для реализации широковещательных соединений. последняя группа не оптимизирована и будет использовать реализацию sorge merge.join. Как видно из окончательного кода, сгенерированного BroadcastHashJoin и SortMergeJoin, если только одна строка в правой таблице успешно объединена, логика реализации LeftOuterJoin и LastJoin в основном одинакова, поэтому разница в производительности в основном связана с тем, что первая схема должна выполнить вычисление dropDuplicated.Хотя этот этап вычисляетСложность не высока, но доля затрат времени в небольшом масштабе данных по-прежнему относительно велика.Независимо от того, какая тестовая схема используется для изменения исходного кода Spark в этом специальном сценарии написания таблицы , это по-прежнему схема реализации с наилучшей производительностью.
Техническое резюме
Наконец, если кратко подытожить, то благодаря пониманию и изменению исходного кода Spark проект OpenMLDB может реализовать новую логику алгоритма проверки правописания таблиц в соответствии с бизнес-сценариями.С точки зрения производительности, производительность может быть значительно улучшена по сравнению с использованием собственного интерфейса Spark. Исходный код Spark включает анализ синтаксиса SQL, оптимизацию логического плана Catalyst, динамическую компиляцию кода JIT и т. д. Благодаря этим основам функции и производительность Spark могут быть расширены на более низком уровне. OpenMLDB продолжит делиться техническими подробностями, связанными с оптимизацией Spark, в будущем. .Добро пожаловать Всем продолжайте общаться.
Другие разработчики также могут обратить внимание и принять участие в проекте с открытым исходным кодом OpenMLDB.