Примечания к исследованию Flink (4) - глубокое понимание водяного знака

Большие данные

записывать

 В предыдущей статье мы в основном анализировали рабочий процесс FLink. В этой статье будет подробно описано понимание водяного знака.

причина

Когда поток Flink обрабатывает данные, происходит процесс от генерации события к источнику потока и к оператору, и в большинстве случаев данные, поступающие к оператору, идут в порядке событий, сгенерированных событием, но не исключаются. генерация беспорядка данных из-за сетевой задержки и других причин. Особенно при потреблении какфа данные сообщения нескольких разделов выходят из строя.В это время, чтобы обеспечить упорядоченность данных и своевременность расчета, родился механизм водяных знаков.

что

 Проще говоря, водяной знак — это специальная метка времени, которая представляет собой механизм, установленный Flink для обработки расчетов окна EventTime. Системное событие, созданное источником или настраиваемым генератором водяных знаков в соответствии с требованиями, которое передается соответствующему оператору, как обычное событие потока данных, и оператор, который получает событие водяного знака, постоянно настраивает свои собственные часы управления EventTime.Flink гарантирует, что водяной знак монотонно возрастает, а оператор При получении Водяного знака вы будете знать, где количество раз поток данных был обработан, то есть измерение времени.

как генерировать

  Для создания водяных знаков в Flink версии 1.10 и выше вам больше не нужно самостоятельно реализовывать интерфейс.Flink API предоставляет WatermarkStrategy, включающую в себя как TimestampAssigner, так и WatermarkGenerator. Класс инструментов WatermarkStrategy предоставляет множество часто используемых стратегий водяных знаков.Конечно, мы также можем создавать собственные стратегии водяных знаков в некоторых сценариях.

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根据策略实例化一个 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

Как показано выше. Обычно нам не нужно реализовывать этот интерфейс, но мы можем использовать стратегию водяных знаков, переданную в классе инструментов WatermarkStrategy, или использовать этот класс инструментов для привязки пользовательских TimestampAssigner и WatermarkGenerator. Например, когда нам нужно использовать генератор водяных знаков с ограниченным нарушением порядка и лямбда-выражение в качестве средства назначения метки времени, мы можем:

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);

  Параметр TimestampAssigner является необязательным и обычно не требует указания. Например, при использовании kafka временную метку можно получить непосредственно из источника данных.

использовать стратегию

 WatermarkStrategy используется во Flink двумя способами: один — использовать его непосредственно в источнике данных, а другой — использовать его непосредственно после операции с источником, не являющимся источником данных.

  Первый метод рекомендуется, поскольку источник данных может использовать информацию об осколке/разделе в логике создания водяного знака. Таким образом, водяной знак можно отследить более точно, и общая генерация водяного знака будет более точной.Указание стратегии водяного знака непосредственно в источнике данных должно использовать определенный интерфейс источника данных, такой как связь с kafka, с использованием kafka Connector, только когда напрямую использовать источник данных невозможно. При настройке политики используйте только второй метод.

//第一种 直接在kafka上使用
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream<MyType> stream = env.addSource(kafkaSource);
//第二种
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);

Когда источник данных используется напрямую, если определенный раздел/сегмент в источнике данных не отправлял данные о событии в течение определенного периода времени, это означает, что WatermarkStrategy не получит никаких данных для создания водяных знаков. или раздел помечен как свободный.

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));//当时间超过1分钟则设置为空闲状态 

Пользовательский генератор водяных знаков

 В дополнение к некоторым реализованным стратегиям водяных знаков, которые поставляются с Flink API, мы также можем настраивать и реализовывать наши собственные стратегии водяных знаков через интерфейс WatermarkGenerator.

/**
 * {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。
 *
 * <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} 
 * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。
 */
@Public
public interface WatermarkGenerator<T> {

    /**
     * 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性的调用,也许会生成新的 watermark,也许不会。
     *
     * <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。
     */
    void onPeriodicEmit(WatermarkOutput output);
}