Сначала поймите два понятия
Прежде всего, мы должны сначала узнать, что такое семантика времени и водяной знак.
семантика времени
В потоке обработки данных flink есть три важных концепции времени, как показано на рисунке выше:
- Время события: время, когда было создано событие (то есть время, когда были сгенерированы данные)
- Ingestion Time: время ввода данных Flink.
- Время обработки: время для выполнения вычислений с данными, которое зависит от машины для локального системного времени.
В flink существует три вида временной семантики. По умолчанию flink выводит данные в соответствии со временем обработки. Однако из-за распределения, параллелизма и перегрузки сети потоковые данные передаются оператору flink. вывод выполняется в соответствии со временем обработки, когда порядок не соответствует порядку, что изменяет порядок времени события, когда данные фактически генерируются.С этой точки зрения нас больше волнует время события.. При обработке данных временного потока вывод конвейера оператору часто выводится не в порядке времени, что вызывает очень серьезную проблему.временное окноПри расчете следующегоЕсли данные более позднего временного порядка попадают в окно раньше, чем данные более раннего временного порядка, это окно закрыто или нет? Очевидно, что он должен быть закрыт, но в этом случае этоНа расчет данных окна повлияет потеря некоторых данных, что делает расчет окна неточным.Как избежать неправильного расчета, вызванного неправильными данными?Представьте нижеВодяной знакконцепция
уровень воды
- Водяной знак — это механизм измерения хода времени события, обычно используемый для отложенного срабатывания временных окон.
- Когда окно встречает отметку времени, которая достигает времени закрытия окна, оно не должно запускать расчет окна немедленно, а должно ждать. Подождите некоторое время (Водяной знак), дождитесь поступления поздних данных и затем закройте окно.
- Водяной знак можно понимать как механизм срабатывания задержки.Установив время задержки t Водяного знака, система будет проверять максимальное значение maxEventTime в поступивших данных каждый раз, а затем определять, что все данные, чье eventTime меньше maxEventTime-t, имеют Время остановки равно maxEventTime-t, затем это окно запускается на выполнение.
Особенности уровня воды
- водяной знак — это специальная запись данных
- Водяной знак должен увеличиваться монотонно, чтобы гарантировать, что часы времени события задачи продвигаются без отступает
- Водяной знак связан с отметкой времени данных
- водяной знак позволяет программе сбалансировать задержку и правильность результата (половина значения должна быть проверена повторно)
Например
Введение временной семантики и водяных знаков
Познакомить с семантикой времени
В потоковой обработке Flink семантика времени, используемая большей частью бизнеса, — это eventTime, которая представлена в коде следующим образом.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 参数 TimeCharacteristic 有三种类型:
* ProcessingTime,
* IngestionTime,
* EventTime;
*/
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Ввести водяной знак
Введение водяного знака фактически устанавливает время задержки, а также извлекает и устанавливает eventTime. Наиболее часто используемый метод заключается в вызове метода assignTimestampAndWatermarks, передаче времени задержки и перезаписи метода extractTimestamp для установки eventTime.
обращать внимание: при использовании eventTime необходимо указать метку времени в источнике данных, иначе, даже если будет введена семантика времени eventTime, программа все равно будет использовать Processing Time Возьмем датчик в качестве примера
SingleOutputStreamOperator<SensorReading> watermarkDataStream = sensorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2L)) {
@Override
public long extractTimestamp(SensorReading sensorReading) {
// 提取 eventTime 需要注意的是 该 eventTime 是毫秒单位,如果 sensorReading 的时间戳是以秒单位需要 乘以 1000
return sensorReading.getTimestamp();
}
});
обновление водяного знака
Придумайте вопрос о водяном знаке Когда будет обновлен водяной знак? Обновляются ли данные по мере поступления или периодически? Фактически, Flink предоставляет нам интерфейс TimestampAssigner для реализации, чтобы мы могли настраивать такие вещи, как Как извлекать временные метки и создавать водяные знаки из данных о событиях
dataStream.assignTimestampsAndWatermarks(new MyAssigner());
В интерфейсе TimestampAssigner есть два метода. AssignerWithPeriodicWatermarks AssignerWithPunctuatedWatermarks В соответствии с периодическим обновлением водяного знака и прерывистым обновлением водяного знака
Назначатель с периодическими водяными знаками
Периодически генерировать водяные знаки: Система будет периодически вставлять водяные знаки в поток (водяные знаки — это тоже особое событие!). Период по умолчанию составляет 200 мс. можно использовать Метод ExecutionConfig.setAutoWatermarkInterval() для установки.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置 EventTime 特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置 Watermark 周期性更新
env.getConfig().setAutoWatermarkInterval(500L);
Конечно, вы также можете переписать настройку, но делать это не рекомендуется, и есть много деталей, которые трудно понять (поскольку во Flink есть метод getCurrentWatermark() на месте, который поможет вам инкапсулировать все детали). , нужно только изменить время настройки, почему бы этим не воспользоваться? )
Логика генерации водяного знака: каждые 5 секунд Flink будет звонить Метод getCurrentWatermark() класса AssignerWithPeriodicWatermarks. Если метод возвращает отметку времени с отметкой времени, превышающей предыдущий водяной знак, в поток вставляется новый водяной знак. Эта проверка гарантирует, что водяной знак монотонно увеличивается. Если временная метка, возвращаемая методом, меньше или равна временной метке предыдущего уровня воды, новый водяной знак не будет создан.
Назначатель с пунктирными водяными знаками
Периодически создавать водяные знаки. В отличие от метода периодической генерации, этот метод не имеет фиксированного времени, но может фильтровать и обрабатывать данные по мере необходимости.
Этот метод обновляется в соответствии с потребностями и может быть настроен только
Ниже приведен пользовательский код, который учитывает только содержимое реализации.
кейс
По данным, id датчика которых равен sensor_1, извлекается соответствующий водяной знак и вставляется в поток данных.
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class Watermark_CustomPunctuatedWatermark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 不再设置周期行性获取watermark
// env.getConfig().setAutoWatermarkInterval(500L);
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.assignTimestampsAndWatermarks(new CustomPunctuatedWatermark());
resultDataStream.print();
env.execute();
}
public static class CustomPunctuatedWatermark implements AssignerWithPunctuatedWatermarks<SensorReading> {
// 延迟 2s
private long bound = 2 * 1000L;
/**
* @param lastElement -> 上一条数据
* @param extractTimestamp -> 当前数据的时间戳 根据 extractTimestamp 方法获取
* @return
*/
@Nullable
@Override
public Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractTimestamp) {
// 如果上一条数据的id 等于 sensor_1 则更新时间戳 否则返回 null
if ("sensor_1".equals(lastElement.getId())) {
return new Watermark(extractTimestamp - bound);
} else {
return null;
}
}
/**
* @param sensorReading -> 当前数据
* @param previousElementTimestamp -> 上一条数据的事件事件戳
* @return
*/
@Override
public long extractTimestamp(SensorReading sensorReading, long previousElementTimestamp) {
return sensorReading.getTimestamp();
}
}
}
настройки водяного знака
-
Во Flink водяные знаки генерируются разработчиками приложений, что обычно требует иметь определенное представление о предмете
-
Если задержка настройки водяного знака слишком длинная, скорость получения результатов может быть очень медленной. Метод должен выводить приблизительный результат, прежде чем водяной знак достигнут
-
И если водяной знак приходит слишком рано, вы можете получить неправильный результат, но Flink обрабатывает его с опозданием. Механизм данных может решить эту проблему