Оператор преобразования API обработки потока Flink

Flink
Оператор преобразования API обработки потока Flink

Добро пожаловать, чтобы следовать за мнойличный блогвыучить больше

transform

Функция: преобразование данных Soure (исходных данных) в требуемые данные.

Общие функции

map

Оператор карты похож на карту в питоне, в питоне данные преобразуются влямбда-выражениеДанные в , а карта в flink более обширна, черезновая функция карты, пользовательский метод map() определяет процесс преобразования, преобразуя один тип данных (входные) в другой тип данных (выходные). Формат следующий

dataStream.map(new Mapfunction<input,output>(){
	@Override
	map(input){xxx};
})

Лучше понять, рисуяimage.png

Прямоугольник стал эллипсом, но цвет не изменился (логика не изменилась)

flatMap

Оператор выравнивания flatMap: преобразует входной тип ввода в выходной тип.В отличие от карты, плоская карта выводит несколько выходных типов привести пример Строка "привет, слово" Вывод в виде Tuple2("hello", 1), Tuple2("word", 1)

inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = input.split(",");
                for (String word : words) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        });

filter

Оператор filter отфильтровывает входные данные входного типа. true, чтобы остаться, false, чтобы отфильтровать Схема выглядит следующим образомimage.png

keyby (группировка)

Поток данных → KeyedStream: Логически разбивает поток на непересекающиеся разделы, каждый из которых содержит элементы с одним и тем же ключом, реализованным внутри в виде хэша.image.png

использовать dataStream.keyby (параметр)

param: нижний индекс поля данных начинается с 0 по умолчанию Вы также можете ввести поле идентификатора, и оно будет разделено в соответствии с идентификатором

Скользящее агрегирование

  • сумма : суммирование
  • max : выберите максимальное значение для каждого потока
  • min : выберите минимальное значение для каждого потока
  • minby : выберите минимальное значение для данных поля в keyedStream
  • maxby : выберите максимальное значение для данных поля в keyedStream

уменьшить (сложная агрегация)

KeyedStream → DataStream: объединить текущий элемент и результат последней агрегации для получения нового значения,Возвращаемый поток содержит результаты каждой агрегации, а не только окончательный результат последней агрегации.. Случай: Сравните температуру последней метки времени по данным с идентификатора датчика и выберите данные метки времени с максимальной температурой.

import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_keyed_Reduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 数据源为自定义产生的传感器数据
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        //根据传感器id传来的数据比较上一次时间戳的温度,选择最大温度的时间戳数据
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
                .reduce(new CustomReduceFunction());
        resultDataStream.print();
        env.execute();
    }

    public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
        @Override
        public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
            String id = sensorReading.getId();
            Long timestamp = input.getTimestamp();
            //按照时间戳选温度最大的值
            double temperature = Math.max(sensorReading.getTemperature(), input.getTemperature());
            return new SensorReading(id, timestamp, temperature);
        }
    }
}

разделить и выбрать

split

Поток данных → Разделенный поток: разделить поток данных на два или более потоков данных в соответствии с определенными характеристиками. Схема выглядит следующим образом

image.png

select

Разделенный поток→Поток данных: получить один или несколько потоков данных из SplitStream.

Схема выглядит следующим образомimage.png

кейс

В зависимости от температуры датчика примите 60 градусов за стандарт, если она больше или равна 60 градусам, это высокий поток, а остальные - низкий поток.

import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.chengyuyang.apitest.SensorReading;
import com.chengyuyang.apitest.SourceFromCustom;

public class Transform_Split_Select {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
        // 按照温度60标准进行分流 split里面其实是实现的select(选择)操作
        SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
                Double temperature = sensorReading.getTemperature();
                if (temperature >= 60) {
                    return Lists.newArrayList("high");
                } else {
                    return Lists.newArrayList("low");
                }
            }
        });
        //SplitStream→DataStream操作 select变量可以为多个 上面两种split 可以select成3种DataStream流
        DataStream<SensorReading> high = splitStream.select("high");
        DataStream<SensorReading> low = splitStream.select("low");
        DataStream<SensorReading> all = splitStream.select("high", "low");

        high.print("high").setParallelism(1);
        low.print("low").setParallelism(1);
        all.print("all").setParallelism(1);
        env.execute();
    }
}

Результат выглядит следующим образомimage.png

подключиться и комап

подключение (одна страна, две системы)

DataStream, DataStream → ConnectedStreams: Соедините два потока как потоки ConnectedStreams, но сохраните их различные типы данных неизменными, два внутренних потока независимы друг от друга, а типы входных данных могут быть одинаковыми или разными.

Схема выглядит следующим образомimage.png

comap coflatmap

Подключенные потоки → DataStreamФункция такая же, как у map и flatMap, но поскольку типы данных двух потоков класса connect различны, map и flatMap должны выполняться на потоках внутри, и окончательный результат может быть одинаковым или нет.

кейс

Добавьте предупреждающую метку к высокой температуре в соответствии с разделением и выберите случаи Нормальный вывод данных о температуре

часть кода

ConnectedStreams<Tuple2<String, Double>, SensorReading> connectDataStream = highDataStream.connect(lowDataStream);
        SingleOutputStreamOperator<Object> resultDataStream = connectDataStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
            @Override
            public Object map1(Tuple2<String, Double> input) throws Exception {
                // 处理高温数据
                return new Tuple3<>(input.f0, input.f1, "warnning");
            }

            @Override
            public Object map2(SensorReading input) throws Exception {
                // 处理正常温度数据
                return new Tuple3<>(input.getId(), input.getTimestamp(), input.getTemperature());
            }
        });

union

Поток данных → Поток данныхОбъединяет два или более DataStream для создания нового DataStream, содержащего все элементы DataStream.Примечание. Если вы объедините поток данных с самим собой, вы увидите, что каждый элемент появляется дважды в новом потоке данных.Схема выглядит следующим образомimage.png

Разница между Connect и Union

  • Типы двух потоков перед Union должны быть одинаковыми, а Connect может быть разным
  • Connect можно настроить так, чтобы он был одинаковым или другим в последующих coMaps.
  • Connect может работать только с двумя потоками, Union может работать с несколькими

кейс

Объединение выполняется в соответствии со случаями разделения и выбора, а к высокой температуре добавляется предупредительная метка.Вывод данных нормальной температуры

часть кода

DataStream<SensorReading> unionDataStream = high.union(low);
        SingleOutputStreamOperator<Tuple3<String, Long, Object>> resultDataStream = unionDataStream.map(new MapFunction<SensorReading, Tuple3<String, Long, Object>>() {
            @Override
            public Tuple3<String, Long, Object> map(SensorReading input) throws Exception {
                if (input.getTemperature() >= 60) {
                    return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), "warnning");
                } else {
                    return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), input.getTemperature());
                }
            }
        });

Эта статья воспроизведена в моем личном блогеОператор преобразования API обработки потока FlinkследитьCC 4.0 BY-SA Авторское соглашение

Добро пожаловать на обмен и обучение

личный блог

домашняя страница csdn