Процесс установки кратко описан здесь, а IDEA используется для разработки простой программы обработки потоков, которую можно отлаживать локально или запускать в Flink Установка Maven и JDK здесь не описывается.
1. Введение во Флинк
Flink родился из StratoSphere, исследовательского проекта больших данных в Европе. Проект является исследовательским проектом Берлинского технического университета. Сначала Flink выполнял пакетные вычисления, но в 2014 году основные члены StratoSphere вывели Flink, в том же году передали Flink в Apache, а позже он стал главным проектом Apache по работе с большими данными. Вычисления Flink позиционировались как Streaming, то есть использующие потоковые вычисления для выполнения всех вычислений с большими данными, что является предпосылкой рождения технологии Flink.
В 2015 году Alibaba начала вмешиваться в работу flink, отвечая за планирование ресурсов и оптимизацию потокового SQL, и установила внутреннюю версию Alibaba blink.В недавно обновленной версии 1.9, blink начал объединяться с flink.
В будущем flink также будет поддерживать больше языков, таких как java, scala, python и т. д., и проявит свои сильные стороны в области машинного обучения.
2. Создание среды разработки Flink
Во-первых, чтобы запустить Flink, нам нужно скачать и распаковать бинарный пакет Flink.Адрес загрузки выглядит следующим образом:Felicity.apache.org/downloads Также…
Мы можем выбрать комбинированную версию Flink и Scala, здесь мы выбираем последнюю версию 1.9.Apache Flink 1.9.0 for Scala 2.12Скачать.
Установку и развертывание Flink под Windows и Linux можно посмотретьБыстрый запуск Flink — установка и запуск примера, версия для Windows демонстрируется здесь.
После успешной установки запустите окно командной строки cmd, войдите в папку flink и запустите команду в каталоге bin.start-cluster.bat
$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.
Показав, что запуск прошел успешно, мы обращаемся к нему в браузере.http://localhost:8081/ Вы можете увидеть страницу управления flink.
3. Быстрое перелистывание
Убедитесь, что установлен flink и требуются Maven 3.0.4 и Java 8 или более поздней версии. Вот краткое описание процесса сборки Maven.
Другие подробные методы строительства приветствуются для просмотра:Быстро создайте свой первый проект Flink
1. Создайте проект Maven
Создайте проект, используя Flink Maven Archetype.
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
Вы можете редактировать свой собственный идентификатор группы
Структура каталогов следующая:
$ tree quickstart/
quickstart/
├── pom.xml
└── src
└── main
├── java
│ └── org
│ └── myorg
│ └── quickstart
│ ├── BatchJob.java
│ └── StreamingJob.java
└── resources
└── log4j.properties
Основные зависимости в pom:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2. Напишите код
StreamingJob
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStreaming = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStreaming.print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for(String word : sentence.split(" ")){
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
3. Отладка программы
Установите инструмент netcat для простой отладки.
Запустите netcat и введите:
nc -l 9999
стартовая программа
Введите несколько слов в netcat через запятую
Посмотреть результаты на стороне программы
4. Отправьте программу на Flink
начать флинк
windows为 start-cluster.bat linux为start-cluster.sh
localhost:8081 Посмотреть страницу управления
Код упаковки через maven
Отправьте упакованный пакет на flink
Посмотреть журнал
tail -f log/flink-***-jobmanager.out
Продолжайте вводить слова в netcat, просматривайте статус задания в разделе «Выполняемые задания» и просматривайте выходные данные в журнале.
4. Модель программирования Flink
Flink предоставляет различные уровни абстракции для разработки потоковых/пакетных приложений.
Абстракция самого низкого уровня обеспечивает толькопоток с отслеживанием состояния.
На практике большинству приложений не нужны вышеупомянутые низкоуровневые абстракции, но они нацелены наCore APIпрограммирование, напримерDataStream API(ограниченный/неограниченный поток) иDataSet API(ограниченный набор данных).
Table Api объявляет таблицу в соответствии с реляционной моделью.
Абстракция высшего уровняSQL.
Здесь мы использовали только DataStream API.
Основные строительные блоки программы Flink:потокиконвертировать.
Базовая структура программы:
l Получить среду выполнения
l Загрузить/создать необработанные данные
l Укажите метод преобразования этих данных
l Указать место хранения результата расчета
l Запустить выполнение программы
5. Использование DataStreaming API
1. Получите среду выполнения
StreamExecutionEnvironment является основой всех программ Flink, и способы их получения:
getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String ... jarFiles)
В общем случае используйте getExecutionEnvironment. Если вы выполняете в IDE или обычной Java-программе, вы можете создать StreamExecutionEnvironment на локальном компьютере с помощью createLocalEnvironment. Если вы создали jar-программу и хотите получить в ней метод getExecutionEnvironment через метод вызова, вы можете использовать метод createRemoteEnvironment.
2. Загрузите/создайте необработанные данные
Некоторые интерфейсы для доступа к источникам данных, предоставляемые StreamExecutionEnvironment.
(1) Файловые источники данных
readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
(2) Источник данных на основе сокетов (используется в этой статье)
Приходить socketTextStream
(3) Источник данных на основе коллекции
fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)
3. Метод преобразования
(1)Метод карты: Поток данных -> Поток данных
Функция: получить элемент и вывести элемент, аналогично функции UDF в Hive.
Пример:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
(2)Метод FlatMap: Поток данных -> Поток данных
Функция: получить элемент и вывести несколько значений, аналогично функции UDTF в Hive.
Пример:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
(3)Метод фильтрации: Поток данных -> Поток данных
Функция: определите, возвращает ли функция значение true для каждого элемента, и сохраните только тот элемент, который возвращает значение true в конце.
Пример:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
(4)Метод KeyBy: DataStream -> KeyedStream
Функция: логически разделить поток на непересекающиеся части, каждая из которых является элементом одного ключа.
Пример:
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
(5)Метод уменьшения: KeyedStream -> DataStream
Функция: Уменьшение поворота в потоке данных с ключом.
Пример:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
(6)Метод агрегации: KeyedStream -> DataStream
Функция: Агрегатная операция в ключевом потоке данных
Пример:
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
(7)Оконный метод:KeyedStream -> WindowedStream
Функция: используется в KeyedStream, группируется по окнам для каждой клавиши по определенному признаку.
Пример:
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
(8)Метод WindowAll: DataStream -> AllWindowedStream
Функция: группировка по определенному признаку в DataStream.
Пример:
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
(9)Метод объединения: Поток данных* -> Поток данных
Функция: объединить несколько потоков данных в новый поток данных.
Пример:
dataStream.union(otherStream1, otherStream2, ...);
(10)Метод разделения: Поток данных -> Разделенный поток
Функция: разделить поток на несколько потоков.
Пример:
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
(11)Выберите метод: Разделенный поток -> Поток данных
Функция: выбрать поток из разделенного потока
Пример:
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
4. Выходные данные
writeAsText()
writeAsCsv(...)
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
Другие принципы, связанные с Flink:
Король обработки больших данных в реальном времени — Flink
Быстрый запуск Flink — установка и запуск примера
Быстро создайте свой первый проект Flink
Для получения дополнительных сведений о вычислениях в реальном времени, Flink, Kafka и других связанных технических блогах, пожалуйста, обратите внимание на потоковые вычисления в реальном времени: