Вопрос 1: Как обеспечить точное попадание данных в один и тот же раздел в соответствии со временем события;
- Используйте водяной знак для настройки правил группирования, перейдите по ссылке: решение разделения данных flink fall hdfs в соответствии с событиямиdeveloper.aliyun.com/article/719…
/**
* @Author:wenwei
* @Date : 2020/9/8 22:15
* 自定义分桶的规则
* 1:按照什么格式定义文件名,默认为yyyy-MM-dd-HH
*/
@PublicEvolving
public class CustomBucketAssigner<IN> implements BucketAssigner<IN, String> {
private static final long serialVersionUID = 1L;
private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";
private final String formatString;
private final ZoneId zoneId;
private transient DateTimeFormatter dateTimeFormatter;
/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
*/
public CustomBucketAssigner() {
this(DEFAULT_FORMAT_STRING);
}
/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
*
* @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
* the bucket id.
*/
public CustomBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
*
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public CustomBucketAssigner(ZoneId zoneId) {
this(DEFAULT_FORMAT_STRING, zoneId);
}
/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
*
* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
* the bucket path.
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public CustomBucketAssigner(String formatString, ZoneId zoneId) {
this.formatString = Preconditions.checkNotNull(formatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
}
//将分桶的规则写成按照事件时间;
@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
//固定格式命名文件夹名称
return "p_data_day="+dateTimeFormatter.format(Instant.ofEpochMilli(context.currentWatermark()));
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String toString() {
return "DateTimeBucketAssigner{" +
"formatString='" + formatString + '\'' +
", zoneId=" + zoneId +
'}';
}
}
Вопрос 2: Как flink точно разделяет окна?
Как правильно определить время окна окна, чтобы данные были точно разделены по событию, а данные предыдущего дня не попали в следующий временной раздел, вы можете обратиться к исходному коду в окнах, который определяет время начала, на которое стоит обратить внимание
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start. 事件发生的时间
* @param offset The offset which window start would be shifted by. 定义TumblingEventTimeWindows 设置云讯的offset的值,默认都为零
* @param windowSize The size of the generated windows. 窗口大小
* @return window start
对应的数据应windows
例如 windows Size = 5s ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s
2 - (2-0+5) % 5 = 0 ,
7 - (7 - 0 + 5) % 5 = 5 ,
例如 windows Size = 7s ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s
2 - (2-0+7) % 7 = 0;
7 - (7-0+7)%7= 7
例如 windows Size = 5s ,offset = 1s ; 例如当前的 timestamp = 2s ; 7s
2 - (2-1+5) % 5 = 1 ,
7 - (7 - 0 + 5) % 5 = 6 ,
例如 windows Size = 7s ,offset = 0 ; 例如当前的 timestamp = 2s ; 7s
2 - (2-1+7) % 7 = 1;
7 - (7-1+7)%7= 8
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
Проблема 3: Из-за постоянного увеличения объема данных при разборе IP-адреса слишком много файловых дескрипторов;
- Превратить класс, разбирающий ip, в одноэлементный класс, который нужно оптимизировать
public class Ip2regionSingleton {
private static Logger logger = LoggerFactory.getLogger(Ip2regionSingleton.class);
private static Ip2regionSingleton instance = new Ip2regionSingleton();
private static DbConfig config;
private static DbSearcher searcher;
public DbSearcher getSearcher() {
return searcher;
}
// 私有化构造方法
private Ip2regionSingleton() {
String path = Ip2regionSingleton.class.getResource("/").getPath();
String dbPath = path + "plugins/ip2region.db";
File file = new File(dbPath);
logger.info("singleton count:{}","-------------------------------------------------------");
if ( file.exists() ) {
try{
config = new DbConfig();
searcher = new DbSearcher(config, dbPath);
}catch (Exception e){
logger.error("Ip2regionSingleton:{}",e.getMessage());
e.printStackTrace();
}
}
}
public static Ip2regionSingleton getInstance() {
return instance;
}
}
Вопрос 4: Как решить проблему зависимостей пакетов в файле flink pom;
- maven helper, найти соответствующий конфликтующий класс jar;
- Удалите конфликтующие классы jar с помощью исключения
Вопрос 5: Как обеспечить согласованность и порядок сквозных данных в flink;
-
Гарантировать последовательность данных в kafka; (глобальную последовательность данных добиться в принципе невозможно, но можно добиться согласованности данных по одному разделу)
-
В кака у каждой машины есть брокер, в брокере несколько разделов, и разделы реплицируются в режиме master-slave, что обеспечивает согласованность данных;
-
Установите семантику ровно один раз в flink;
env.enableCheckpointing(parameter.getLong("checkpoint.cycle",300*1000L),CheckpointingMode.EXACTLY_ONCE);
Вопрос 6: Как сделать так, чтобы значение водяного знака обновлялось при отсутствии обновления данных о событии, а затем срабатывал расчет окна
-
При обработке некоторых данных временной интервал обновления потока данных больше размера окна.Если используется PunctuatedWatermarks, то водяной знак не будет обновляться постоянно, его можно поменять на периодически обновляемый водяной знак AssignerWithPeriodicWatermarks
private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<PageActivityDO> { private static final long serialVersionUID = 1L; private Long currentTime = 0L; //允许2分钟的延迟 private Long allowDelayTime = 120L; @Override public Watermark checkAndGetNextWatermark(PageActivityDO topic, long l) { return new Watermark(currentTime - allowDelayTime); } @Override public long extractTimestamp(PageActivityDO topic, long l) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); if(StringUtils.isNullOrWhitespaceOnly(topic.getPoint_time())){ return currentTime; } LocalDateTime localDateTime = LocalDateTime.parse(topic.getPoint_time(), formatter); currentTime = Math.max(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(), currentTime); return currentTime; } }
private static class CustomWatermarksPeriodc<T> implements AssignerWithPeriodicWatermarks<ActivityInfoDO> { private static final long serialVersionUID = 1L; //允许30s的延迟 private Long allowDelayTime = 30000L; @Override public long extractTimestamp(ActivityInfoDO topic, long l) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); if(StringUtils.isNullOrWhitespaceOnly(topic.getPush_time())){ return System.currentTimeMillis(); } LocalDateTime localDateTime = LocalDateTime.parse(topic.getPush_time(), formatter); logger.info("extractTimestamp,currentWatermark:{}",localDateTime ); return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli(); } @Nullable @Override public Watermark getCurrentWatermark() { logger.info("getCurrentWatermark, currentWatermark:{}",System.currentTimeMillis() - allowDelayTime); return new Watermark(System.currentTimeMillis() - allowDelayTime); } }
-
Обратите особое внимание на выбор Periodic WatermarkGenerator, вам нужно установить механизм автоматического обновления водяных знаков, setAutoWatermarkInterval(1000)
Вопрос 7: Как обеспечить реализацию двухфазной фиксации и обеспечить возможность записи данных идемпотентно и транзакционно
- Убедитесь, что данные источника данных воспроизводимы
- Приемник данных поддерживает обработку транзакций (предварительная фиксация, откат, фиксация)
- Или через приемник, поддержите уникальную дедупликацию
Вопрос 8: При стоке на mysql часто сообщается об ошибке
- Тип ошибки: последний пакет, успешно полученный с сервера, был 1 203 500 миллисекунд назад.
- Возможно, появится версия jdbc, и лучше всего использовать пул соединений mysql
Правильное использование valueState
flink может выбрать fsStateBackend для управления промежуточными состояниями, которые не являются крупномасштабными StateBackend fsStateBackend = new FsStateBackend(parameter.get("flink.state.path"));
- К ним относятся продолжительность сохранения состояния, тип обновления, видимость или невидимость.
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(ttlDays))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// .cleanupInRocksdbCompactFilter(1000L)
.build();
Ссылка на ссылку:
2:Исследование границы скользящего окна timeWindow и задержки данных во Flink
3:Обзор Kafka: глубокое понимание архитектуры
5:Двухфазный коммит (2PC) и его применение во Flink ровно один раз
Напоминаем, что используется версия flink 1.9.