Автор: У Чун
В этой статье мы начнем с нуля и научим вас, как создать свое первое приложение Apache Flink (далее Flink).
Подготовка среды разработки
Flink может работать в Linux, Max OS X или Windows. Для разработки приложений Flink вам необходимо иметь на локальном компьютереJava 8.xиmavenокрестности.
Если у вас есть среда Java 8, выполнение следующей команды выведет следующую информацию о версии:
$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
Если у вас есть среда maven, выполнение следующей команды выведет следующую информацию о версии:
$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
Кроме того, мы рекомендуем использовать ItelliJ IDEA (достаточно бесплатной версии сообщества) в качестве среды разработки для приложений Flink. Eclipse также возможен, но Eclipse имеет некоторые известные проблемы в смешанных проектах Scala и Java, поэтому Eclipse не рекомендуется. В следующей главе мы расскажем, как создать проект Flink и импортировать его в ItelliJ IDEA.
Создайте проект Maven
Мы будем использовать архетип Flink Maven для создания структуры нашего проекта и некоторых исходных зависимостей по умолчанию. В рабочем каталоге выполните следующую команду, чтобы создать проект:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.1 \
-DgroupId=my-flink-project \
-DartifactId=my-flink-project \
-Dversion=0.1 \
-Dpackage=myflink \
-DinteractiveMode=false
Вы можете отредактировать указанный выше идентификатор группы, идентификатор артефакта, пакет по своему любимому пути. Используя вышеуказанные параметры, Maven автоматически создаст для вас структуру проекта, которая выглядит следующим образом:
$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
└── main
├── java
│ └── myflink
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
Наш файл pom.xml уже содержит необходимые зависимости Flink, и в src/main/java есть несколько образцов скелетов. Далее мы начнем писать нашу первую программу Flink.
Написание программ Flink
Запустите IntelliJ IDEA, выберите «Импортировать проект», выберите pom.xml в корневом каталоге проекта my-flink. Согласно руководству, завершите импорт проекта.
Создано в src/main/java/myflinkSocketWindowWordCount.java
документ:
package myflink;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
}
}
Теперь эта программа все еще очень проста, мы будем заполнять код шаг за шагом. Обратите внимание, что ниже мы не будем записывать операторы импорта, потому что IDE добавит их автоматически. В конце этого раздела я покажу полный код, если вы хотите пропустить шаги ниже, вы можете вставить окончательный полный код прямо в редактор.
Первым шагом в программе Flink является созданиеStreamExecutionEnvironment
. Это начальный класс, который можно использовать для установки параметров, создания источников данных и отправки задач. Итак, давайте добавим это к основной функции:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Далее мы создадим источник данных, который считывает данные из сокета на локальном порту с номером 9000:
DataStream text = env.socketTextStream("localhost", 9000, "\n");
Это создает строку типаDataStream
.DataStream
Это основной API для потоковой обработки в Flink, который определяет множество общих операций (таких как фильтрация, преобразование, агрегирование, управление окнами, ассоциация и т. д.). В этом примере нас интересует, сколько раз каждое слово встречается в определенном временном окне, скажем, в 5-секундном окне. Для этого мы сначала разбираем строковые данные на слова и числа (используяTuple2<String, Integer>
Представление), первое поле — слово, второе поле — количество раз, а начальное значение раз установлено равным 1. мы внедрилиflatmap
для синтаксического анализа, так как в строке данных может быть несколько слов.
DataStream> wordCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
Затем мы группируем поток данных в соответствии с полем слова (то есть полем индекса 0), которое можно просто использовать здесь.keyBy(int index)
способ получить ключ со словом в качестве ключаTuple2<String, Integer>
поток данных. Затем мы можем указать желаемое окно в потоке и вычислить результат на основе данных в окне. В нашем случае мы хотим агрегировать количество слов каждые 5 секунд, каждое окно считается с нуля: .
DataStream> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
второй звонок.timeWindow()
Указываем, что нам нужно переворачивающееся окно в 5 секунд (Tumble). Третий вызов указывает ключ для каждого окна.sum
Агрегатная функция в нашем случае добавляется по частотному полю (т.е. индексному полю 1). Результирующий поток данных будет выводить количество вхождений каждого слова за эти 5 секунд каждые 5 секунд.
Последнее, что нужно сделать, это вывести поток данных на консоль и начать выполнение:
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
Последнийenv.execute
Вызов необходим для запуска фактического задания Flink. Все операторские операции (например, создание источника, агрегирование, печать) просто строят график внутренних операций оператора. только вexecute()
При вызове он будет отправлен в кластер или выполнен на локальном компьютере.
Полный код ниже, часть кода упрощена (код вGitHubтакже можно получить):
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 创建 execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
DataStream text = env.socketTextStream("localhost", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream> windowCounts = text
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
запустить программу
Чтобы запустить пример программы, сначала мы запускаем netcat в терминале, чтобы получить входной поток:
nc -lk 9000
Если это платформа Windows, вы можете пройтиnmap.org/ncat/Установите ncat и запустите:
ncat -lk 9000
затем запустить напрямуюSocketWindowWordCount
основной метод.
Просто введите слово в консоли netcat, и вы сможетеSocketWindowWordCount
Консоль вывода видит статистику частоты слов для каждого слова. Если вы хотите увидеть число больше 1, введите одно и то же слово снова и снова в течение 5 секунд.
Для получения дополнительной информации, пожалуйста, посетитеВеб-сайт китайского сообщества Apache Flink