Добро пожаловать, чтобы следовать за мнойличный блогвыучить больше
инфраструктура обработки потоков flink
Подобно шторму и искре, у flink также есть среда выполнения. Напишите минимальный кадр ниже
public class SourceTest {
public static void main(String[] args) throws Exception{
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 导入数据
DataStreamSource<String> stream = env.readTextFile("");
//处理数据
stream.print();
// 执行
env.execute();
}
}
StreamExecutionEnvironment.getExecutionEnvironment(); Функция может автоматически вернуться в локальную среду выполнения или в среду выполнения кластера в зависимости от ситуации. Конечно, это также можно установить с помощью функции
Вернитесь в локальную среду выполнения, вам нужно указать параллелизм по умолчанию при вызове
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1)
Вернитесь в среду выполнения кластера и отправьте файл Jar на удаленный сервер. Необходимо указать JobManager при вызове IP-адрес и номер порта, а также укажите пакет Jar для запуска в кластере.
StreamExecutionEnvironment env =ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",
6123,"YOURPATH//wordcount.jar")
Source
Чтение данных из коллекции (ограниченный поток)
Сначала определите пример класса источника данных
//对于数据流中的数据类型 我们通常使用POJP(java)和 case class(Scala)来定义一个数据类型类
//其中对于POJO方法
// 当我们要用keyby进行分组时候
// POJO类的定义必须满足一下条件
//1.字段名必须声明为public的;
//2.必须有默认的无参构造器;
//3.所有构造器必须声明为public的
// 定义样例类 传感器 id 时间戳 温度
class SensorReading {
// 传感器 id
private String id;
// 传感器时间戳
private Long timestamp;
// 传感器温度
private Double temperature;
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
Напишите основной логический код
public class SourceTest {
public static void main(String[] args) throws Exception{
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 导入集合数据 这个是个有界数据流
DataStreamSource<SensorReading> stream1 = env.fromCollection(Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1),
new SensorReading("sensor_15", 1547716505L, 39.1),
new SensorReading("sensor_18", 1547718685L, 78.1)
)
);
// 打印数据
stream1.print(); //并行度默认cpu核心数
// 执行
env.execute("source test");
}
}
выполнять взгляд подождите несколько секунд
Видно, что порядок не в порядке
читать данные из файла
Это легко основной код
DataStreamSource stream2 = env.readTextFile("");
Использовать данные очереди сообщений kafka в качестве источника (курсив)
Знакомство с зависимостью коннектора Kafka
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
Настройка информации, связанной с Kafka
//kafka 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.216.111:9092,192.168.216.112:9092,192.168.216.113:9092");
properties.setProperty("group.id", "flink-kafka");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
читать данные
// 读取Kafka topic中的数据
DataStreamSource<String> stream3 = env.addSource(new FlinkKafkaConsumer011<String>(
"sensor",
new SimpleStringSchema(),
properties
));
stream3.print();
Функция addSource() в flink предоставляет метод для импорта источников данных, в котором вы можете использовать источник, определенный flink (например, kafka выше), и, конечно же, вы также можете настроить источник Давайте посмотрим, как настроить Source
Пользовательский источник
В дополнение к указанным выше исходным источникам данных мы также можем настроить источник. Все, что вам нужно сделать, это пройти в SourceFunction подойдет. Конкретный вызов выглядит следующим образом:
DataStreamSource<String> stream4 = env.addSource(new new CustomSource())
Следующее определяет источник данных, который может случайным образом генерировать данные датчиков.
public class SourceFromCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<SensorReading> inputDataStream = env.addSource(new CustomSource());
inputDataStream.print();
env.execute();
}
// 继承SourceFunction的富函数RichSourceFunction
public static class CustomSource implements SourceFunction<SensorReading> {
boolean running = true;
@Override
public void run(SourceContext<SensorReading> sourceContext) throws Exception {
Random random = new Random();
// 定义无线循环,不断产生数据,除非被cancel
while (running) {
// 每隔 100 秒数据
for (int i = 0; i < 5; i++) {
String id = UUID.randomUUID().toString().substring(0, 8);
long timestamp = System.currentTimeMillis(); //获取毫秒时间
double temperature = 60 + random.nextGaussian() * 20; //nextGaussian():返回平均值为0.0,标准差为1.0的下一个伪随机高斯分布双精度数
sourceContext.collect(new SensorReading(id, timestamp, temperature));
Thread.sleep(100L);
}
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
Текущий результат выглядит следующим образом
Эта статья воспроизведена в моем личном блогеИсточник API обработки потока FlinkследитьCC 4.0 BY-SA Соглашение об авторском праве