Это 8-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления
текст
1. Роль водяного знака
Водяной знак используется для обработки событий не по порядку, а правильная обработка событий не по порядку обычно реализуется путем объединения механизма водяного знака с окном. Мы знаем, что есть процесс и время в середине обработки потока от генерации события до прохождения через источник и затем до оператора. Хотя в большинстве случаев данные, поступающие к оператору, поступают во временной последовательности событий, это не исключает выхода из строя или запаздывания элемента из-за сети, противодавления и других причин. Но для позднего элемента мы не можем ждать бесконечно.Должен быть механизм, гарантирующий, что через определенное время окно должно быть запущено для выполнения вычисления. Этот специальный механизм является водяным знаком.
2. Водяной знак устраняет просроченные данные
В системе реального времени из-за задержек, вызванных различными причинами, время отправки некоторых сообщений на флинк задерживается дольше, чем время генерации события.Если окно построено на основе времени события, но для опаздывающего элемента мы не можем ждать бесконечно, должен быть механизм, гарантирующий, что через определенное время окно должно быть запущено для выполнения вычислений.. Этот специальный механизм является водяным знаком.
Водяные знаки — это механизм решения этой проблемы.
- Обратитесь к дизайну Google DataFlow.
- Это признак хода обработки времени события.
- Указывает, что поступили события раньше (старее), чем водяной знак (нет данных ниже водяного знака).
- На основе водяного знака выполняется оценка вычисления триггера окна.
Водяной знак упорядоченного потока данных:
В некоторых случаях потоки данных, основанные на времени события, являются непрерывными (относительно времени события). В упорядоченном потоке водяной знак — это простая периодическая метка.
Водяной знак неупорядоченного потока данных:
В других сценариях поток данных, основанный на времени события, является прерывистым (относительно времени события).
В неупорядоченном потоке водяной знак очень важен: он сообщает оператору, что произошло событие раньше (старше/меньше временной метки), чем водяной знак, и оператор может увеличить время внутреннего события до временной метки водяного знака (что может вызвать Расчет окна. )
Водяной знак в параллельном потоке:
Обычно водяной знак генерируется в исходной функции, но он также может быть на любом этапе после источника.Если вы укажете несколько водяных знаков, водяной знак, указанный позже, перезапишет предыдущее значение. Каждая подзадача источника генерирует водяной знак независимо. Когда водяной знак проходит через оператора, он сдвигает текущее время события у оператора, и оператор генерирует новый водяной знак для нисходящего потока. Текущее время события оператора с несколькими входами (union, keyBy, partition) — это минимальное значение времени события его входного потока. Примечание. В случае нескольких степеней параллелизма при выравнивании водяного знака будет использоваться наименьший водяной знак из всех каналов.
3. Как создать водяной знак
Обычно водяной знак следует генерировать сразу после получения данных из источника, однако также можно применить простую операцию сопоставления или фильтрации после источника, а затем сгенерировать водяной знак.
Существует две основные категории способов создания водяных знаков:
- With Periodic Watermarks
- With Punctuated Watermarks
Первый может определить максимально допустимое время простоя, которое в данном случае используется больше. Мы в основном фокусируемся на периодических водяных знаках, чтобы проиллюстрировать, ниже приведен метод создания периодического водяного знака:
4, обработка водяных знаков последовательных данных
Требование: определить окно в 10 секунд, и данные с задержкой в 10 секунд также могут быть правильно подсчитаны через время события данных в сочетании с водяным знаком. Мы проталкиваем вперед 10 секунд через eventTime данных, чтобы получить водяной знак данных, Код:
package com.shockang.study.bigdata.flink.watermark
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting
object FlinkWaterMark2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//设置flink的数据处理时间为eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tupleStream: DataStream[(String, Long)] = env.socketTextStream("node01", 9000).map(x => {
val strings: Array[String] = x.split(" ")
(strings(0), strings(1).toLong)
})
//注册我们的水印
val waterMarkStream: DataStream[(String, Long)] = tupleStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
var currentTimemillis: Long = 0L
var timeDiff: Long = 10000L
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/* //获取当前数据的waterMark
override def getNext: Watermark = {
}*/
override def getCurrentWatermark: Watermark = {
val watermark = new Watermark(currentTimemillis - timeDiff)
watermark
}
//抽取数据的eventTime
override def extractTimestamp(element: (String, Long), l: Long): Long = {
val enventTime = element._2
currentTimemillis = Math.max(enventTime, currentTimemillis)
val id = Thread.currentThread().getId
println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentTimemillis + "|" + sdf.format(currentTimemillis) + "],watermark:[" + this.getCurrentWatermark.getTimestamp + "|" + sdf.format(this.getCurrentWatermark.getTimestamp) + "]")
enventTime
}
})
waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new MyWindowFunction2).print()
env.execute()
}
}
class MyWindowFunction2 extends WindowFunction[(String, Long), String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)],
out: Collector[String]): Unit = {
val keyStr = key.toString
val arrBuf = ArrayBuffer[Long]()
val ite = input.iterator
while (ite.hasNext) {
val tup2 = ite.next()
arrBuf.append(tup2._2)
}
val arr = arrBuf.toArray
Sorting.quickSort(arr) //对数据进行排序,按照eventTime进行排序
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
val result = "聚合数据的key为:" + keyStr + "," + "窗口当中数据的条数为:" + arr.length + "," + "窗口当中第一条数据为:" + sdf.format(arr.head) + "," + "窗口当中最后一条数据为:" + sdf.format(arr.last) + "," + "窗口起始时间为:" + sdf.format(window.getStart) + "," + "窗口结束时间为:" + sdf.format(window.getEnd) + "!!!!!看到这个结果,就证明窗口已经运行了"
out.collect(result)
}
}
Введите данные теста
Примечание. Если вам нужно инициировать вызов окна flink, должны быть выполнены два условия. 1: WaterMarkTime > eventTime 2: В окне есть данные
Викторина по вводу данных:
按照十秒钟统计一次,我们程序会将时间划分成为以下时间间隔段
2019-10-01 10:11:00 到 2019-10-01 10:11:10
2019-10-01 10:11:10 到 2019-10-01 10:11:20
2019-10-01 10:11:20 到 2019-10-01 10:11:30
2019-10-01 10:11:30 到 2019-10-01 10:11:40
2019-10-01 10:11:40 到 2019-10-01 10:11:50
2019-10-01 10:11:50 到 2019-10-01 10:12:00
顺序计算:
触发数据计算的条件依据为两个
第一个waterMark时间大于数据的eventTime时间,第二个窗口之内有数据
我们这里的waterMark直接使用eventTime的最大值减去10秒钟
0001 1569895882000 数据eventTime为:2019-10-01 10:11:22 数据waterMark为 2019-10-01 10:11:12
0001 1569895885000 数据eventTime为:2019-10-01 10:11:25 数据waterMark为 2019-10-01 10:11:15
0001 1569895888000 数据eventTime为:2019-10-01 10:11:28 数据waterMark为 2019-10-01 10:11:18
0001 1569895890000 数据eventTime为:2019-10-01 10:11:30 数据waterMark为 2019-10-01 10:11:20
0001 1569895891000 数据eventTime为:2019-10-01 10:11:31 数据waterMark为 2019-10-01 10:11:21
0001 1569895895000 数据eventTime为:2019-10-01 10:11:35 数据waterMark为 2019-10-01 10:11:25
0001 1569895898000 数据eventTime为:2019-10-01 10:11:38 数据waterMark为 2019-10-01 10:11:28
0001 1569895900000 数据eventTime为:2019-10-01 10:11:40 数据waterMark为 2019-10-01 10:11:30 触发第一条到第三条数据计算,数据包前不包后,不会计算2019-10-01 10:11:30 这条数据
0001 1569895911000 数据eventTime为:2019-10-01 10:11:51 数据waterMark为 2019-10-01 10:11:41 触发2019-10-01 10:11:20到2019-10-01 10:11:28时间段的额数据计算,数据包前不包后,不会触发2019-10-01 10:11:30这条数据的计算
5. Водяной знак обрабатывает неупорядоченные данные
Введите данные теста Затем продолжайте вводить следующие неверные данные, чтобы проверить, можно ли решить проблему неупорядоченных данных flink.
乱序数据
0001 1569895948000 数据eventTime为:2019-10-01 10:12:28 数据waterMark为 2019-10-01 10:12:18
0001 1569895945000 数据eventTime为:2019-10-01 10:12:25 数据waterMark为 2019-10-01 10:12:18
0001 1569895947000 数据eventTime为:2019-10-01 10:12:27 数据waterMark为 2019-10-01 10:12:18
0001 1569895950000 数据eventTime为:2019-10-01 10:12:30 数据waterMark为 2019-10-01 10:12:20
0001 1569895960000 数据eventTime为:2019-10-01 10:12:40 数据waterMark为 2019-10-01 10:12:30 触发计算 waterMark > eventTime 并且窗口内有数据,触发 2019-10-01 10:12:28到2019-10-01 10:12:27 这三条数据的计算,数据包前不包后,不会触发2019-10-01 10:12:30 这条数据的计算
0001 1569895949000 数据eventTime为:2019-10-01 10:12:29 数据waterMark为 2019-10-01 10:12:30 迟到太多的数据,flink直接丢弃,可以设置flink将这些迟到太多的数据保存起来,便于排查问题
6. Как решить данные позже водяного знака
Если мы установим водяной знак данных на определенное время после времени события каждых данных, например, время события данных — 2019-08-20 15:30:30, окно окна программы — 10 с, а затем ставим водяной знак на 2019-08 -20 15:30:40, Тогда что, если время события определенного фрагмента данных 2019-08-20 15:30:32, а время достижения программы flink 2019-08-20 15:30:45, эти данные даже позже, чем водяной знак время окна Что мне делать с часами 5S? Для этих данных, которые позже водяного знака, у flink есть три метода обработки.
1. Откажитесь напрямую
Мы вводим много неупорядоченных (фактически до тех пор, пока время события
输入:【输入两条内容】
late element
0001 1569895948000 数据eventTime为:2019-10-01 10:12:28 数据直接丢弃
0001 1569895945000 数据eventTime为:2019-10-01 10:12:25 数据直接丢弃
Примечание: в настоящее время окно не запускается. Поскольку окно, в котором находятся входные данные, уже выполнено, план обработки flink по умолчанию для этих поздних данных состоит в том, чтобы отбросить его.
2. allowLateness указывает время, допустимое для задержки данных.
В некоторых случаях мы хотели бы предоставить дополнительный льготный период для просроченных данных. Flink предоставляет метод allowLateness для установки времени задержки для просроченных данных, а данные, поступающие в течение указанного времени задержки, могут инициировать выполнение окна.
Измените код:
waterMarkStream
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2))//允许数据迟到2S
//function: (K, W, Iterable[T], Collector[R]) => Unit
.apply(new MyWindowFunction).print()
Проверьте задержку данных: Входные данные:
Перезапускаем нашу программу после смены кода, а затем повторно вводим прежние данные
0001 1569895882000
0001 1569895885000
0001 1569895888000
0001 1569895890000
0001 1569895891000
0001 1569895895000
0001 1569895898000
0001 1569895900000
0001 1569895911000
0001 1569895948000
0001 1569895945000
0001 1569895947000
0001 1569895950000
0001 1569895960000
0001 1569895949000
Проверьте задержку данных: определите, что данные задерживаются только на 2 с, чтобы повторно получить данные и пересчитать
0001 1569895948000 数据eventTime为:2019-10-01 10:12:28 触发数据计算 数据waterMark为 2019-10-01 10:12:30
0001 1569895945000 数据eventTime为:2019-10-01 10:12:25 触发数据计算 数据waterMark为 2019-10-01 10:12:30
0001 1569895958000 数据eventTime为:2019-10-01 10:12:38 不会触发数据计算 数据waterMark为 2019-10-01 10:12:30 waterMarkTime < eventTime,所以不会触发计算
Настройка водяного знака данных на 41 секунду может инициировать расчет вышеуказанных данных.
0001 1569895971000 数据eventTime为:2019-10-01 10:12:51 数据waterMark为 2019-10-01 10:12:41
Он будет продолжать вызывать расчет этих данных 0001 1569895958000
3. sideOutputLateData собирает просроченные данные
Благодаря sideOutputLateData просроченные данные могут собираться и храниться единообразно, что удобно для последующего устранения неполадок.
Сначала необходимо изменить код:
package com.shockang.study.bigdata.flink.watermark
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting
object FlinkWaterMark {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
//设置time类型为eventtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//暂时定义并行度为1
env.setParallelism(1)
val text = env.socketTextStream("node01", 9000)
val inputMap: DataStream[(String, Long)] = text.map(line => {
val arr = line.split(" ")
(arr(0), arr(1).toLong)
})
//给我们的数据注册waterMark
val waterMarkStream: DataStream[(String, Long)] = inputMap
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, Long)] {
var currentMaxTimestamp = 0L
//watermark基于eventTime向后推迟10秒钟,允许消息最大乱序时间为10s
val waterMarkDiff: Long = 10000L
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//获取下一个水印
override def checkAndGetNextWatermark(t: (String, Long), l: Long): Watermark = {
val watermark = new Watermark(currentMaxTimestamp - waterMarkDiff)
watermark
}
//抽取当前数据的时间作为eventTime
override def extractTimestamp(element: (String, Long), l: Long): Long = {
val eventTime = element._2
currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)
val id = Thread.currentThread().getId
println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp) + "],watermark:[" + this.checkAndGetNextWatermark(element, l).getTimestamp + "|" + sdf.format(this.checkAndGetNextWatermark(element, l).getTimestamp) + "]")
eventTime
}
})
val outputTag: OutputTag[(String, Long)] = new OutputTag[(String, Long)]("late_data")
val outputWindow: DataStream[String] = waterMarkStream
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
// .allowedLateness(Time.seconds(2))//允许数据迟到2S
.sideOutputLateData(outputTag)
//function: (K, W, Iterable[T], Collector[R]) => Unit
.apply(new MyWindowFunction)
val sideOuptut: DataStream[(String, Long)] = outputWindow.getSideOutput(outputTag)
sideOuptut.print()
outputWindow.print()
//执行程序
env.execute()
}
}
class MyWindowFunction extends WindowFunction[(String, Long), String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
val arrBuf = ArrayBuffer[Long]()
val ite = input.iterator
while (ite.hasNext) {
val tup2 = ite.next()
arrBuf.append(tup2._2)
}
val arr = arrBuf.toArray
Sorting.quickSort(arr)
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last) + "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)
out.collect(result)
}
}
Давайте введем некоторые данные, чтобы проверить ввод
0001 1569895882000
0001 1569895885000
0001 1569895888000
0001 1569895890000
0001 1569895891000
0001 1569895895000
0001 1569895898000
0001 1569895900000
0001 1569895911000
0001 1569895948000
0001 1569895945000
0001 1569895947000
0001 1569895950000
0001 1569895960000
0001 1569895949000
输入两条迟到的数据,会被收集起来
0001 1569895948000
0001 1569895945000
В настоящее время для этих поздних данных все они сохраняются в outputTag через sideOutputLateData.
7. Механизм водяных знаков с несколькими степенями параллелизма
Параллелизм установлен в 1 в предыдущем коде
env.setParallelism(1);
Если здесь не указано, код будет считывать количество локальных ЦП по умолчанию, чтобы установить параллелизм при работе. Закомментируйте код параллелизма кода
//env.setParallelism(1)
Затем добавьте идентификатор потока к выводу
Появятся следующие данные: Введите следующие строки:
вывод:
Вы обнаружите, что окно не запущено.
Потому что в это время все эти 7 фрагментов данных обрабатываются разными потоками. Каждая тема имеет водяной знак.
Потому что в случае нескольких степеней параллелизма выравнивание водяного знака возьмет наименьший водяной знак из всех каналов, а у нас сейчас по умолчанию степеней параллелизма 8. Эти 7 данных обрабатываются разными потоками, и наименьший водяной знак еще не получен. Таким образом, окно не может быть запущено для выполнения.Проверим это и настроим степень параллелизма в коде на 2.
env.setParallelism(2)
Введите следующее:
0001 1569895890000
0001 1569895903000
0001 1569895908000
вывод:На этом этапе вы обнаружите, что при вводе третьего фрагмента данных запускается окно [10:11:30,10:11:33).
После ввода первых двух данных минимальный водяной знак получается 10:11:20, и в это время в соответствующем окне данных нет.
После третьего ввода данных минимальный водяной знак получается 10:11:33, а соответствующее окно в это время — [10:11:30,10:11:33). Значит спровоцировано.