Обзор
Structured Streaming — это механизм обработки распределенных потоков, основанный на Spark SQL и обладающий следующими характеристиками:
- Вы можете напрямую использовать DF/DS API для написания логики обработки потоков, например агрегации потоков, окон времени событий, объединений потоков и пакетов и т. д.
- Используйте контрольную точку и WAL (журнал с упреждающей записью), чтобы обеспечить сквозное потребление сразу после семантики.
- Сквозная обработка с низкой задержкой (100 мс)
- Высокая масштабируемость, высокая отказоустойчивость
основная концепция
Структурированная потоковая передача рассматривает непрерывный поток данных как таблицу, которую можно вставлять бесконечно. Когда есть операция запроса, она создает «таблицу результатов» в соответствии с определенным кодом. Каждый раз, когда часть данных поступает в течение интервала, «таблица результатов» будет обновлена. Конечно, структурированная потоковая передача на самом деле не реализует всю таблицу. На самом деле она просто считывает последние данные для обработки обновленных результатов и отбрасывает исходные данные, а для обновления результатов поддерживает только наименьшие данные промежуточного состояния. состояние позднее.
Ниже приведена концептуальная схема, представленная на официальном сайте для простоты понимания:
демонстрация кода
Приведенного выше описания так много, что лучше сразу перейти к коду.На этот раз пример — классический WordCount, используемый язык — Scala, версия — 2.11.8. Если у вас есть друг, который раньше не сталкивался со Scala, вы можете взглянуть на мою серию статей о Scala.
Первый — изменить pom.xml и добавить следующие зависимости:
<properties>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>2.4.4</version>
</dependency>
Затем создайте SparkSession в качестве точки входа. Это также показывает, что структурированная потоковая передача основана на механизме Spark SQL. Вы можете использовать DS/DF для потоковой обработки.
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[2]")
.getOrCreate()
Последняя наша знакомая трилогия, источник->преобразование->приемник, мы читаем данные из сокета и выполняем операцию wc и выводим их на консоль.
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
Поскольку среда разработки Mac, непосредственно nc -lk 9999, введите серию тестовых данных, консоль выведет результаты, результаты следующие. Если вы не являетесь партнером Mac, также возможен облачный хостинг, вам просто нужно изменить хост!
$ nc -lk 9999
hadoop hadoop flink
spark spark
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| flink| 1|
|hadoop| 2|
+------+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| spark| 2|
| flink| 1|
|hadoop| 2|
+------+-----+
Исследуйте исходный код
Друзья, которые подвергались воздействию Spark SQL, должны быть такими же, как и я. Это также запись SparkSession, и код очень похож. В чем разница в реализации исходного кода для загрузки внешних источников данных?
Давайте сначала посмотрим, как Spark Structured Streaming загружает источник данных, щелкнем по исходному коду spark.readStream и обнаружим, что это в основном новый DataStreamReader(self), затем мы в основном исследуем этот класс.
Первое, что нужно прочитать исходный код, это прочитать его комментарии.Исходный код выглядит следующим образом:
/**
* Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
* key-value stores, etc). Use `SparkSession.readStream` to access this.
*
* @since 2.0.0
*/
@InterfaceStability.Evolving
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
Из комментариев видно, что этот класс загружает внешние источники данных, такие как набор данных, но его определение определяет, что его можно использовать только в пакете org.apache.spark.sql и нельзя использовать извне.
Второе — найти запись.Вообще говоря, это основная функция, а в этом классе нет.Из предыдущего примера мы знаем, что последний вызов — это метод загрузки, поэтому начинаем чтение с этого метода.
/**
* Loads input data stream in as a `DataFrame`, for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
def load(): DataFrame = {
//source是类属性,表示外部数据源格式,由format方法外部指定,默认parquet
//不能将source直接指定为hive,否则会报错
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
Метод lookupDataSource является более важным методом, в основном для загрузки встроенных и внешних пользовательских источников данных, он будет пытаться получить загрузчик контекста, если нет, то это будет загрузчик Spark, а затем сначала искать указанный извне источник в реализация класса интерфейса DataSourceRegister, если нет, загрузить его напрямую.Основная логика следующая:
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
//尝试得到ContextClassLoader,若不行则是SparkClassLoader
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try {
//实现DataSourceRegister接口类中寻找外部指定的source
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
// the provider format did not match any given registered aliases
case Nil =>
try {
// 找不到尝试直接加载
Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) =>
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
....
}
} catch {
.....
}
case head :: Nil =>
// there is exactly one registered alias
head.getClass
case sources =>
....
}
} catch {
....
}
}
Вернитесь к реализации функции загрузки, продолжите чтение и обнаружите, что исходный код считает, что V2 совместим с прерывистым микропакетом V1, поэтому отношение V1 передается в качестве параметра, и вы можете переключиться на V1. если запрос не является непрерывным.
// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
// We can't be sure at this point whether we'll actually want to use V2, since we don't know the
// writer or whether the query is continuous.
val v1DataSource = DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
val v1Relation = ds match {
case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
case s: MicroBatchReadSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = s, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dataSourceOptions = new DataSourceOptions(options.asJava)
var tempReader: MicroBatchReader = null
val schema = try {
tempReader = s.createMicroBatchReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
//用于故障恢复
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
dataSourceOptions)
tempReader.readSchema()
} finally {
// Stop tempReader to avoid side-effect thing
if (tempReader != null) {
tempReader.stop()
tempReader = null
}
}
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
s, source, options,
schema.toAttributes, v1Relation)(sparkSession))
case s: ContinuousReadSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = s, conf = sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions
val dataSourceOptions = new DataSourceOptions(options.asJava)
val tempReader = s.createContinuousReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
//用于故障恢复
Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
dataSourceOptions)
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
s, source, options,
tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
case _ =>
// Code path for data source v1.
Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
}
}
Прочитав приведенный выше код, я задаю своим друзьям следующие вопросы:
- В чем разница между DataSourceV1 и V2?
- Что представляют подтаблицы MicroBatchReadSupport и ContinuousReadSupport и в чем различия?
- Что делают StreamingRelation и StreamingRelationV2 соответственно?
Вы можете обсудить это вместе в области комментариев. Если есть проблемы с вышеперечисленным или у вас есть лучшие предложения, пожалуйста, оставьте сообщение в области комментариев вместе. Я объясню вышеуказанные проблемы всем в начале следующей статьи. .