записывать
В предыдущей статье мы в основном анализировали рабочий процесс FLink. В этой статье будет подробно описано понимание водяного знака.
причина
Когда поток Flink обрабатывает данные, происходит процесс от генерации события к источнику потока и к оператору, и в большинстве случаев данные, поступающие к оператору, идут в порядке событий, сгенерированных событием, но не исключаются. генерация беспорядка данных из-за сетевой задержки и других причин. Особенно при потреблении какфа данные сообщения нескольких разделов выходят из строя.В это время, чтобы обеспечить упорядоченность данных и своевременность расчета, родился механизм водяных знаков.
что
Проще говоря, водяной знак — это специальная метка времени, которая представляет собой механизм, установленный Flink для обработки расчетов окна EventTime. Системное событие, созданное источником или настраиваемым генератором водяных знаков в соответствии с требованиями, которое передается соответствующему оператору, как обычное событие потока данных, и оператор, который получает событие водяного знака, постоянно настраивает свои собственные часы управления EventTime.Flink гарантирует, что водяной знак монотонно возрастает, а оператор При получении Водяного знака вы будете знать, где количество раз поток данных был обработан, то есть измерение времени.
как генерировать
Для создания водяных знаков в Flink версии 1.10 и выше вам больше не нужно самостоятельно реализовывать интерфейс.Flink API предоставляет WatermarkStrategy, включающую в себя как TimestampAssigner, так и WatermarkGenerator. Класс инструментов WatermarkStrategy предоставляет множество часто используемых стратегий водяных знаков.Конечно, мы также можем создавать собственные стратегии водяных знаков в некоторых сценариях.
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根据策略实例化一个 watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
Как показано выше. Обычно нам не нужно реализовывать этот интерфейс, но мы можем использовать стратегию водяных знаков, переданную в классе инструментов WatermarkStrategy, или использовать этот класс инструментов для привязки пользовательских TimestampAssigner и WatermarkGenerator. Например, когда нам нужно использовать генератор водяных знаков с ограниченным нарушением порядка и лямбда-выражение в качестве средства назначения метки времени, мы можем:
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
Параметр TimestampAssigner является необязательным и обычно не требует указания. Например, при использовании kafka временную метку можно получить непосредственно из источника данных.
использовать стратегию
WatermarkStrategy используется во Flink двумя способами: один — использовать его непосредственно в источнике данных, а другой — использовать его непосредственно после операции с источником, не являющимся источником данных.
Первый метод рекомендуется, поскольку источник данных может использовать информацию об осколке/разделе в логике создания водяного знака. Таким образом, водяной знак можно отследить более точно, и общая генерация водяного знака будет более точной.Указание стратегии водяного знака непосредственно в источнике данных должно использовать определенный интерфейс источника данных, такой как связь с kafka, с использованием kafka Connector, только когда напрямую использовать источник данных невозможно. При настройке политики используйте только второй метод.
//第一种 直接在kafka上使用
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<MyType> stream = env.addSource(kafkaSource);
//第二种
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
Когда источник данных используется напрямую, если определенный раздел/сегмент в источнике данных не отправлял данные о событии в течение определенного периода времени, это означает, что WatermarkStrategy не получит никаких данных для создания водяных знаков. или раздел помечен как свободный.
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));//当时间超过1分钟则设置为空闲状态
Пользовательский генератор водяных знаков
В дополнение к некоторым реализованным стратегиям водяных знаков, которые поставляются с Flink API, мы также можем настраивать и реализовывать наши собственные стратегии водяных знаков через интерфейс WatermarkGenerator.
/**
* {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
*
* <p><b>注意:</b> WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks}
* 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
*/
@Public
public interface WatermarkGenerator<T> {
/**
* 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* 周期性的调用,也许会生成新的 watermark,也许不会。
*
* <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
*/
void onPeriodicEmit(WatermarkOutput output);
}