Добро пожаловать, чтобы следовать за мнойличный блогвыучить больше
transform
Функция: преобразование данных Soure (исходных данных) в требуемые данные.
Общие функции
map
Оператор карты похож на карту в питоне, в питоне данные преобразуются влямбда-выражениеДанные в , а карта в flink более обширна, черезновая функция карты, пользовательский метод map() определяет процесс преобразования, преобразуя один тип данных (входные) в другой тип данных (выходные). Формат следующий
dataStream.map(new Mapfunction<input,output>(){
@Override
map(input){xxx};
})
Лучше понять, рисуя
Прямоугольник стал эллипсом, но цвет не изменился (логика не изменилась)
flatMap
Оператор выравнивания flatMap: преобразует входной тип ввода в выходной тип.В отличие от карты, плоская карта выводит несколько выходных типов привести пример Строка "привет, слово" Вывод в виде Tuple2("hello", 1), Tuple2("word", 1)
inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = input.split(",");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
});
filter
Оператор filter отфильтровывает входные данные входного типа. true, чтобы остаться, false, чтобы отфильтровать Схема выглядит следующим образом
keyby (группировка)
Поток данных → KeyedStream: Логически разбивает поток на непересекающиеся разделы, каждый из которых содержит элементы с одним и тем же ключом, реализованным внутри в виде хэша.
использовать dataStream.keyby (параметр)
param: нижний индекс поля данных начинается с 0 по умолчанию Вы также можете ввести поле идентификатора, и оно будет разделено в соответствии с идентификатором
Скользящее агрегирование
- сумма : суммирование
- max : выберите максимальное значение для каждого потока
- min : выберите минимальное значение для каждого потока
- minby : выберите минимальное значение для данных поля в keyedStream
- maxby : выберите максимальное значение для данных поля в keyedStream
уменьшить (сложная агрегация)
KeyedStream → DataStream: объединить текущий элемент и результат последней агрегации для получения нового значения,Возвращаемый поток содержит результаты каждой агрегации, а не только окончательный результат последней агрегации.. Случай: Сравните температуру последней метки времени по данным с идентификатора датчика и выберите данные метки времени с максимальной температурой.
import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Transform_keyed_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 数据源为自定义产生的传感器数据
DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
//根据传感器id传来的数据比较上一次时间戳的温度,选择最大温度的时间戳数据
SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
.reduce(new CustomReduceFunction());
resultDataStream.print();
env.execute();
}
public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
@Override
public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
String id = sensorReading.getId();
Long timestamp = input.getTimestamp();
//按照时间戳选温度最大的值
double temperature = Math.max(sensorReading.getTemperature(), input.getTemperature());
return new SensorReading(id, timestamp, temperature);
}
}
}
разделить и выбрать
split
Поток данных → Разделенный поток: разделить поток данных на два или более потоков данных в соответствии с определенными характеристиками. Схема выглядит следующим образом
select
Разделенный поток→Поток данных: получить один или несколько потоков данных из SplitStream.
Схема выглядит следующим образом
кейс
В зависимости от температуры датчика примите 60 градусов за стандарт, если она больше или равна 60 градусам, это высокий поток, а остальные - низкий поток.
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;
public class Transform_Split_Select {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
// 按照温度60标准进行分流 split里面其实是实现的select(选择)操作
SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
Double temperature = sensorReading.getTemperature();
if (temperature >= 60) {
return Lists.newArrayList("high");
} else {
return Lists.newArrayList("low");
}
}
});
//SplitStream→DataStream操作 select变量可以为多个 上面两种split 可以select成3种DataStream流
DataStream<SensorReading> high = splitStream.select("high");
DataStream<SensorReading> low = splitStream.select("low");
DataStream<SensorReading> all = splitStream.select("high", "low");
high.print("high").setParallelism(1);
low.print("low").setParallelism(1);
all.print("all").setParallelism(1);
env.execute();
}
}
Результат выглядит следующим образом
подключиться и комап
подключение (одна страна, две системы)
DataStream, DataStream → ConnectedStreams: Соедините два потока как потоки ConnectedStreams, но сохраните их различные типы данных неизменными, два внутренних потока независимы друг от друга, а типы входных данных могут быть одинаковыми или разными.
Схема выглядит следующим образом
comap coflatmap
Подключенные потоки → DataStreamФункция такая же, как у map и flatMap, но поскольку типы данных двух потоков класса connect различны, map и flatMap должны выполняться на потоках внутри, и окончательный результат может быть одинаковым или нет.
кейс
Добавьте предупреждающую метку к высокой температуре в соответствии с разделением и выберите случаи Нормальный вывод данных о температуре
часть кода
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectDataStream = highDataStream.connect(lowDataStream);
SingleOutputStreamOperator<Object> resultDataStream = connectDataStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> input) throws Exception {
// 处理高温数据
return new Tuple3<>(input.f0, input.f1, "warnning");
}
@Override
public Object map2(SensorReading input) throws Exception {
// 处理正常温度数据
return new Tuple3<>(input.getId(), input.getTimestamp(), input.getTemperature());
}
});
union
Поток данных → Поток данныхОбъединяет два или более DataStream для создания нового DataStream, содержащего все элементы DataStream.Примечание. Если вы объедините поток данных с самим собой, вы увидите, что каждый элемент появляется дважды в новом потоке данных.Схема выглядит следующим образом
Разница между Connect и Union
- Типы двух потоков перед Union должны быть одинаковыми, а Connect может быть разным
- Connect можно настроить так, чтобы он был одинаковым или другим в последующих coMaps.
- Connect может работать только с двумя потоками, Union может работать с несколькими
кейс
Объединение выполняется в соответствии со случаями разделения и выбора, а к высокой температуре добавляется предупредительная метка.Вывод данных нормальной температуры
часть кода
DataStream<SensorReading> unionDataStream = high.union(low);
SingleOutputStreamOperator<Tuple3<String, Long, Object>> resultDataStream = unionDataStream.map(new MapFunction<SensorReading, Tuple3<String, Long, Object>>() {
@Override
public Tuple3<String, Long, Object> map(SensorReading input) throws Exception {
if (input.getTemperature() >= 60) {
return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), "warnning");
} else {
return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), input.getTemperature());
}
}
});
Эта статья воспроизведена в моем личном блогеОператор преобразования API обработки потока FlinkследитьCC 4.0 BY-SA Авторское соглашение