Источник API обработки потока Flink

Flink
Источник API обработки потока Flink

Добро пожаловать, чтобы следовать за мнойличный блогвыучить больше

инфраструктура обработки потоков flink

Подобно шторму и искре, у flink также есть среда выполнения. Напишите минимальный кадр ниже

public class SourceTest {
public static void main(String[] args) throws Exception{
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 导入数据
DataStreamSource<String> stream = env.readTextFile("");
//处理数据
stream.print();
// 执行
env.execute();
}
}

StreamExecutionEnvironment.getExecutionEnvironment(); Функция может автоматически вернуться в локальную среду выполнения или в среду выполнения кластера в зависимости от ситуации. Конечно, это также можно установить с помощью функции

Вернитесь в локальную среду выполнения, вам нужно указать параллелизм по умолчанию при вызове

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1)

Вернитесь в среду выполнения кластера и отправьте файл Jar на удаленный сервер. Необходимо указать JobManager при вызове IP-адрес и номер порта, а также укажите пакет Jar для запуска в кластере.

StreamExecutionEnvironment env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",
6123,"YOURPATH//wordcount.jar")

Source

Чтение данных из коллекции (ограниченный поток)

Сначала определите пример класса источника данных

//对于数据流中的数据类型 我们通常使用POJP(java)和 case class(Scala)来定义一个数据类型类

//其中对于POJO方法
// 当我们要用keyby进行分组时候
// POJO类的定义必须满足一下条件
//1.字段名必须声明为public的;
//2.必须有默认的无参构造器;
//3.所有构造器必须声明为public的
// 定义样例类 传感器 id 时间戳 温度
 class SensorReading {
    // 传感器 id
    private String id;
    // 传感器时间戳
    private Long timestamp;
    // 传感器温度
    private Double temperature;


    public SensorReading(String id, Long timestamp, Double temperature) {
        this.id = id;
        this.timestamp = timestamp;
        this.temperature = temperature;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Double getTemperature() {
        return temperature;
    }

    public void setTemperature(Double temperature) {
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return "SensorReading{" +
                "id='" + id + '\'' +
                ", timestamp=" + timestamp +
                ", temperature=" + temperature +
                '}';
    }
}

Напишите основной логический код

public class SourceTest {
    public static void main(String[] args) throws Exception{
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 导入集合数据   这个是个有界数据流
        DataStreamSource<SensorReading> stream1 = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1547718199L, 35.8),
                new SensorReading("sensor_6", 1547718201L, 15.4),
                new SensorReading("sensor_7", 1547718202L, 6.7),
                new SensorReading("sensor_10", 1547718205L, 38.1),
                new SensorReading("sensor_15", 1547716505L, 39.1),
                new SensorReading("sensor_18", 1547718685L, 78.1)
                )
        );
        // 打印数据
        stream1.print(); //并行度默认cpu核心数
        // 执行
        env.execute("source test");
    }
}

выполнять взгляд подождите несколько секундimage.png

Видно, что порядок не в порядке

читать данные из файла

Это легко основной код

DataStreamSource stream2 = env.readTextFile("");

Использовать данные очереди сообщений kafka в качестве источника (курсив)

Знакомство с зависимостью коннектора Kafka

<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>

Настройка информации, связанной с Kafka

//kafka 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092");
        properties.setProperty("group.id", "flink-kafka");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

читать данные

// 读取Kafka topic中的数据
        DataStreamSource<String> stream3 = env.addSource(new FlinkKafkaConsumer011<String>(
                "sensor",
                new SimpleStringSchema(),
                properties
        ));
stream3.print();

Функция addSource() в flink предоставляет метод для импорта источников данных, в котором вы можете использовать источник, определенный flink (например, kafka выше), и, конечно же, вы также можете настроить источник Давайте посмотрим, как настроить Source

Пользовательский источник

В дополнение к указанным выше исходным источникам данных мы также можем настроить источник. Все, что вам нужно сделать, это пройти в SourceFunction подойдет. Конкретный вызов выглядит следующим образом:

DataStreamSource<String> stream4 = env.addSource(new new CustomSource())

Следующее определяет источник данных, который может случайным образом генерировать данные датчиков.

public class SourceFromCustom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new CustomSource());
        inputDataStream.print();
        env.execute();
    }

    // 继承SourceFunction的富函数RichSourceFunction
    public static class CustomSource implements SourceFunction<SensorReading> {
        boolean running = true;
        @Override
        public void run(SourceContext<SensorReading> sourceContext) throws Exception {

            Random random = new Random();
            // 定义无线循环,不断产生数据,除非被cancel
            while (running) {
                // 每隔 100 秒数据
                for (int i = 0; i < 5; i++) {
                    String id = UUID.randomUUID().toString().substring(0, 8);
                    long timestamp = System.currentTimeMillis();  //获取毫秒时间
                    double temperature = 60 + random.nextGaussian() * 20;  //nextGaussian():返回平均值为0.0,标准差为1.0的下一个伪随机高斯分布双精度数
                    sourceContext.collect(new SensorReading(id, timestamp, temperature));

                    Thread.sleep(100L);
                }

                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

Текущий результат выглядит следующим образомimage.png

Эта статья воспроизведена в моем личном блогеИсточник API обработки потока FlinkследитьCC 4.0 BY-SA Соглашение об авторском праве

Добро пожаловать на обмен и обучение

личный блог

домашняя страница csdn