Первый опыт серии Spark Structured Streaming (1)

Spark

Обзор

Structured Streaming — это механизм обработки распределенных потоков, основанный на Spark SQL и обладающий следующими характеристиками:

  • Вы можете напрямую использовать DF/DS API для написания логики обработки потоков, например агрегации потоков, окон времени событий, объединений потоков и пакетов и т. д.
  • Используйте контрольную точку и WAL (журнал с упреждающей записью), чтобы обеспечить сквозное потребление сразу после семантики.
  • Сквозная обработка с низкой задержкой (100 мс)
  • Высокая масштабируемость, высокая отказоустойчивость

основная концепция

Структурированная потоковая передача рассматривает непрерывный поток данных как таблицу, которую можно вставлять бесконечно. Когда есть операция запроса, она создает «таблицу результатов» в соответствии с определенным кодом. Каждый раз, когда часть данных поступает в течение интервала, «таблица результатов» будет обновлена. Конечно, структурированная потоковая передача на самом деле не реализует всю таблицу. На самом деле она просто считывает последние данные для обработки обновленных результатов и отбрасывает исходные данные, а для обновления результатов поддерживает только наименьшие данные промежуточного состояния. состояние позднее.
Ниже приведена концептуальная схема, представленная на официальном сайте для простоты понимания:

structured-streaming-stream-as-a-table.png

structured-streaming-model.png

демонстрация кода

Приведенного выше описания так много, что лучше сразу перейти к коду.На этот раз пример — классический WordCount, используемый язык — Scala, версия — 2.11.8. Если у вас есть друг, который раньше не сталкивался со Scala, вы можете взглянуть на мою серию статей о Scala.
Первый — изменить pom.xml и добавить следующие зависимости:

<properties>
  <scala.version>2.11.8</scala.version>
  <scala.binary.version>2.11</scala.binary.version>
</properties>    

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_${scala.binary.version}</artifactId>
  <version>2.4.4</version>
</dependency>

Затем создайте SparkSession в качестве точки входа. Это также показывает, что структурированная потоковая передача основана на механизме Spark SQL. Вы можете использовать DS/DF для потоковой обработки.

val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[2]")
      .getOrCreate()

Последняя наша знакомая трилогия, источник->преобразование->приемник, мы читаем данные из сокета и выполняем операцию wc и выводим их на консоль.

import spark.implicits._
val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", "9999")
      .load()

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

query.awaitTermination()

Поскольку среда разработки Mac, непосредственно nc -lk 9999, введите серию тестовых данных, консоль выведет результаты, результаты следующие. Если вы не являетесь партнером Mac, также возможен облачный хостинг, вам просто нужно изменить хост!

$ nc -lk 9999
hadoop hadoop flink
spark spark
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| flink|    1|
|hadoop|    2|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
| spark|    2|
| flink|    1|
|hadoop|    2|
+------+-----+

Исследуйте исходный код

Друзья, которые подвергались воздействию Spark SQL, должны быть такими же, как и я. Это также запись SparkSession, и код очень похож. В чем разница в реализации исходного кода для загрузки внешних источников данных?
Давайте сначала посмотрим, как Spark Structured Streaming загружает источник данных, щелкнем по исходному коду spark.readStream и обнаружим, что это в основном новый DataStreamReader(self), затем мы в основном исследуем этот класс.
Первое, что нужно прочитать исходный код, это прочитать его комментарии.Исходный код выглядит следующим образом:

/**
 * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
 * key-value stores, etc). Use `SparkSession.readStream` to access this.
 *
 * @since 2.0.0
 */
@InterfaceStability.Evolving
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {

Из комментариев видно, что этот класс загружает внешние источники данных, такие как набор данных, но его определение определяет, что его можно использовать только в пакете org.apache.spark.sql и нельзя использовать извне.
Второе — найти запись.Вообще говоря, это основная функция, а в этом классе нет.Из предыдущего примера мы знаем, что последний вызов — это метод загрузки, поэтому начинаем чтение с этого метода.

/**
   * Loads input data stream in as a `DataFrame`, for data streams that don't require a path
   * (e.g. external key-value stores).
   *
   * @since 2.0.0
   */
  def load(): DataFrame = {
    //source是类属性,表示外部数据源格式,由format方法外部指定,默认parquet
    //不能将source直接指定为hive,否则会报错
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()

Метод lookupDataSource является более важным методом, в основном для загрузки встроенных и внешних пользовательских источников данных, он будет пытаться получить загрузчик контекста, если нет, то это будет загрузчик Spark, а затем сначала искать указанный извне источник в реализация класса интерфейса DataSourceRegister, если нет, загрузить его напрямую.Основная логика следующая:

def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource"
    //尝试得到ContextClassLoader,若不行则是SparkClassLoader
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

    try {
      //实现DataSourceRegister接口类中寻找外部指定的source
      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
        // the provider format did not match any given registered aliases
        case Nil =>
          try {
            // 找不到尝试直接加载
            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
              case Success(dataSource) =>
                // Found the data source using fully qualified path
                dataSource
              case Failure(error) =>
                ....
            }
          } catch {
            .....
          }
        case head :: Nil =>
          // there is exactly one registered alias
          head.getClass
        case sources =>
         ....
      }
    } catch {
      ....
    }
  }

Вернитесь к реализации функции загрузки, продолжите чтение и обнаружите, что исходный код считает, что V2 совместим с прерывистым микропакетом V1, поэтому отношение V1 передается в качестве параметра, и вы можете переключиться на V1. если запрос не является непрерывным.

// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
    // We can't be sure at this point whether we'll actually want to use V2, since we don't know the
    // writer or whether the query is continuous.
    val v1DataSource = DataSource(
      sparkSession,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = extraOptions.toMap)
    val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }
    ds match {
      case s: MicroBatchReadSupport =>
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = s, conf = sparkSession.sessionState.conf)
        val options = sessionOptions ++ extraOptions
        val dataSourceOptions = new DataSourceOptions(options.asJava)
        var tempReader: MicroBatchReader = null
        val schema = try {
          tempReader = s.createMicroBatchReader(
            Optional.ofNullable(userSpecifiedSchema.orNull),
            //用于故障恢复
            Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
            dataSourceOptions)
          tempReader.readSchema()
        } finally {
          // Stop tempReader to avoid side-effect thing
          if (tempReader != null) {
            tempReader.stop()
            tempReader = null
          }
        }
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, options,
            schema.toAttributes, v1Relation)(sparkSession))
      case s: ContinuousReadSupport =>
        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
          ds = s, conf = sparkSession.sessionState.conf)
        val options = sessionOptions ++ extraOptions
        val dataSourceOptions = new DataSourceOptions(options.asJava)
        val tempReader = s.createContinuousReader(
          Optional.ofNullable(userSpecifiedSchema.orNull),
          //用于故障恢复
          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
          dataSourceOptions)
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, options,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
      case _ =>
        // Code path for data source v1.
        Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
    }
  }

Прочитав приведенный выше код, я задаю своим друзьям следующие вопросы:

  • В чем разница между DataSourceV1 и V2?
  • Что представляют подтаблицы MicroBatchReadSupport и ContinuousReadSupport и в чем различия?
  • Что делают StreamingRelation и StreamingRelationV2 соответственно?

Вы можете обсудить это вместе в области комментариев. Если есть проблемы с вышеперечисленным или у вас есть лучшие предложения, пожалуйста, оставьте сообщение в области комментариев вместе. Я объясню вышеуказанные проблемы всем в начале следующей статьи. .