Создайте свое первое приложение Apache Flink с нуля за 5 минут

Flink

Автор: У Чун

В этой статье мы начнем с нуля и научим вас, как создать свое первое приложение Apache Flink (далее Flink).

Подготовка среды разработки

Flink может работать в Linux, Max OS X или Windows. Для разработки приложений Flink вам необходимо иметь на локальном компьютереJava 8.xиmavenокрестности.

Если у вас есть среда Java 8, выполнение следующей команды выведет следующую информацию о версии:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

Если у вас есть среда maven, выполнение следующей команды выведет следующую информацию о версии:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"

Кроме того, мы рекомендуем использовать ItelliJ IDEA (достаточно бесплатной версии сообщества) в качестве среды разработки для приложений Flink. Eclipse также возможен, но Eclipse имеет некоторые известные проблемы в смешанных проектах Scala и Java, поэтому Eclipse не рекомендуется. В следующей главе мы расскажем, как создать проект Flink и импортировать его в ItelliJ IDEA.

Создайте проект Maven

Мы будем использовать архетип Flink Maven для создания структуры нашего проекта и некоторых исходных зависимостей по умолчанию. В рабочем каталоге выполните следующую команду, чтобы создать проект:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false

Вы можете отредактировать указанный выше идентификатор группы, идентификатор артефакта, пакет по своему любимому пути. Используя вышеуказанные параметры, Maven автоматически создаст для вас структуру проекта, которая выглядит следующим образом:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

Наш файл pom.xml уже содержит необходимые зависимости Flink, и в src/main/java есть несколько образцов скелетов. Далее мы начнем писать нашу первую программу Flink.

Написание программ Flink

Запустите IntelliJ IDEA, выберите «Импортировать проект», выберите pom.xml в корневом каталоге проекта my-flink. Согласно руководству, завершите импорт проекта.

Создано в src/main/java/myflinkSocketWindowWordCount.javaдокумент:

package myflink;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

    }
}

Теперь эта программа все еще очень проста, мы будем заполнять код шаг за шагом. Обратите внимание, что ниже мы не будем записывать операторы импорта, потому что IDE добавит их автоматически. В конце этого раздела я покажу полный код, если вы хотите пропустить шаги ниже, вы можете вставить окончательный полный код прямо в редактор.

Первым шагом в программе Flink является созданиеStreamExecutionEnvironment. Это начальный класс, который можно использовать для установки параметров, создания источников данных и отправки задач. Итак, давайте добавим это к основной функции:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Далее мы создадим источник данных, который считывает данные из сокета на локальном порту с номером 9000:

DataStream text = env.socketTextStream("localhost", 9000, "\n");

Это создает строку типаDataStream.DataStreamЭто основной API для потоковой обработки в Flink, который определяет множество общих операций (таких как фильтрация, преобразование, агрегирование, управление окнами, ассоциация и т. д.). В этом примере нас интересует, сколько раз каждое слово встречается в определенном временном окне, скажем, в 5-секундном окне. Для этого мы сначала разбираем строковые данные на слова и числа (используяTuple2<String, Integer>Представление), первое поле — слово, второе поле — количество раз, а начальное значение раз установлено равным 1. мы внедрилиflatmapдля синтаксического анализа, так как в строке данных может быть несколько слов.

DataStream> wordCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                });

Затем мы группируем поток данных в соответствии с полем слова (то есть полем индекса 0), которое можно просто использовать здесь.keyBy(int index)способ получить ключ со словом в качестве ключаTuple2<String, Integer>поток данных. Затем мы можем указать желаемое окно в потоке и вычислить результат на основе данных в окне. В нашем случае мы хотим агрегировать количество слов каждые 5 секунд, каждое окно считается с нуля: .

DataStream> windowCounts = wordCounts
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

второй звонок.timeWindow()Указываем, что нам нужно переворачивающееся окно в 5 секунд (Tumble). Третий вызов указывает ключ для каждого окна.sumАгрегатная функция в нашем случае добавляется по частотному полю (т.е. индексному полю 1). Результирующий поток данных будет выводить количество вхождений каждого слова за эти 5 секунд каждые 5 секунд.

Последнее, что нужно сделать, это вывести поток данных на консоль и начать выполнение:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

Последнийenv.executeВызов необходим для запуска фактического задания Flink. Все операторские операции (например, создание источника, агрегирование, печать) просто строят график внутренних операций оператора. только вexecute()При вызове он будет отправлен в кластер или выполнен на локальном компьютере.

Полный код ниже, часть кода упрощена (код вGitHubтакже можно получить):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // 创建 execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 通过连接 socket 获取输入数据,这里连接到本地9000端口,如果9000端口已被占用,请换一个端口
        DataStream text = env.socketTextStream("localhost", 9000, "\n");

        // 解析数据,按 word 分组,开窗,聚合
        DataStream> windowCounts = text
                .flatMap(new FlatMapFunction>() {
                    @Override
                    public void flatMap(String value, Collector> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }
}

запустить программу

Чтобы запустить пример программы, сначала мы запускаем netcat в терминале, чтобы получить входной поток:

nc -lk 9000

Если это платформа Windows, вы можете пройтиnmap.org/ncat/Установите ncat и запустите:

ncat -lk 9000

затем запустить напрямуюSocketWindowWordCountосновной метод.

Просто введите слово в консоли netcat, и вы сможетеSocketWindowWordCountКонсоль вывода видит статистику частоты слов для каждого слова. Если вы хотите увидеть число больше 1, введите одно и то же слово снова и снова в течение 5 секунд.

Для получения дополнительной информации, пожалуйста, посетитеВеб-сайт китайского сообщества Apache Flink