Руководство для начинающих по Flink (подробная версия скриншота)

Flink

file
В этой статье создается версия начальной программы Flink1.9 на основе Java, для которой требуются Maven 3.0.4 и Java 8 или выше. Необходимо установить Netcat для простой отладки.

Процесс установки кратко описан здесь, а IDEA используется для разработки простой программы обработки потоков, которую можно отлаживать локально или запускать в Flink Установка Maven и JDK здесь не описывается.

1. Введение во Флинк

Flink родился из StratoSphere, исследовательского проекта больших данных в Европе. Проект является исследовательским проектом Берлинского технического университета. Сначала Flink выполнял пакетные вычисления, но в 2014 году основные члены StratoSphere вывели Flink, в том же году передали Flink в Apache, а позже он стал главным проектом Apache по работе с большими данными. Вычисления Flink позиционировались как Streaming, то есть использующие потоковые вычисления для выполнения всех вычислений с большими данными, что является предпосылкой рождения технологии Flink.

В 2015 году Alibaba начала вмешиваться в работу flink, отвечая за планирование ресурсов и оптимизацию потокового SQL, и установила внутреннюю версию Alibaba blink.В недавно обновленной версии 1.9, blink начал объединяться с flink.

В будущем flink также будет поддерживать больше языков, таких как java, scala, python и т. д., и проявит свои сильные стороны в области машинного обучения.

2. Создание среды разработки Flink

Во-первых, чтобы запустить Flink, нам нужно скачать и распаковать бинарный пакет Flink.Адрес загрузки выглядит следующим образом:Felicity.apache.org/downloads Также…

Мы можем выбрать комбинированную версию Flink и Scala, здесь мы выбираем последнюю версию 1.9.Apache Flink 1.9.0 for Scala 2.12Скачать.

Установку и развертывание Flink под Windows и Linux можно посмотретьБыстрый запуск Flink — установка и запуск примера, версия для Windows демонстрируется здесь.

После успешной установки запустите окно командной строки cmd, войдите в папку flink и запустите команду в каталоге bin.start-cluster.bat

$ cd flink
$ cd bin
$ start-cluster.bat
Starting a local cluster with one JobManager process and one TaskManager process.
You can terminate the processes via CTRL-C in the spawned shell windows.
Web interface by default on http://localhost:8081/.

Показав, что запуск прошел успешно, мы обращаемся к нему в браузере.http://localhost:8081/ Вы можете увидеть страницу управления flink.

file

3. Быстрое перелистывание

Убедитесь, что установлен flink и требуются Maven 3.0.4 и Java 8 или более поздней версии. Вот краткое описание процесса сборки Maven.

Другие подробные методы строительства приветствуются для просмотра:Быстро создайте свой первый проект Flink

1. Создайте проект Maven

Создайте проект, используя Flink Maven Archetype.

 $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

Вы можете редактировать свой собственный идентификатор группы

Структура каталогов следующая:

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

Основные зависимости в pom:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2. Напишите код

StreamingJob

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamingJob {

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStream<Tuple2<String, Integer>> dataStreaming = env
				.socketTextStream("localhost", 9999)
				.flatMap(new Splitter())
				.keyBy(0)
				.timeWindow(Time.seconds(5))
				.sum(1);

		dataStreaming.print();

		// execute program
		env.execute("Flink Streaming Java API Skeleton");
	}
	public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

		@Override
		public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
			for(String word : sentence.split(" ")){
				out.collect(new Tuple2<String, Integer>(word, 1));
			}
		}

	}
}

3. Отладка программы

Установите инструмент netcat для простой отладки.

Запустите netcat и введите:

nc -l 9999

стартовая программа

file

Введите несколько слов в netcat через запятую

file

Посмотреть результаты на стороне программы

file

4. Отправьте программу на Flink

начать флинк

windows为 start-cluster.bat    linux为start-cluster.sh

localhost:8081 Посмотреть страницу управления

file

Код упаковки через maven

file

Отправьте упакованный пакет на flink

file

Посмотреть журнал

tail -f log/flink-***-jobmanager.out

Продолжайте вводить слова в netcat, просматривайте статус задания в разделе «Выполняемые задания» и просматривайте выходные данные в журнале.

file

4. Модель программирования Flink

Flink предоставляет различные уровни абстракции для разработки потоковых/пакетных приложений.

file

Абстракция самого низкого уровня обеспечивает толькопоток с отслеживанием состояния.

На практике большинству приложений не нужны вышеупомянутые низкоуровневые абстракции, но они нацелены наCore APIпрограммирование, напримерDataStream API(ограниченный/неограниченный поток) иDataSet API(ограниченный набор данных).

Table Api объявляет таблицу в соответствии с реляционной моделью.

Абстракция высшего уровняSQL.

Здесь мы использовали только DataStream API.

Основные строительные блоки программы Flink:потокиконвертировать.

Базовая структура программы:

l Получить среду выполнения

l Загрузить/создать необработанные данные

l Укажите метод преобразования этих данных

l Указать место хранения результата расчета

l Запустить выполнение программы

file

5. Использование DataStreaming API

1. Получите среду выполнения

StreamExecutionEnvironment является основой всех программ Flink, и способы их получения:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String ... jarFiles)

В общем случае используйте getExecutionEnvironment. Если вы выполняете в IDE или обычной Java-программе, вы можете создать StreamExecutionEnvironment на локальном компьютере с помощью createLocalEnvironment. Если вы создали jar-программу и хотите получить в ней метод getExecutionEnvironment через метод вызова, вы можете использовать метод createRemoteEnvironment.

2. Загрузите/создайте необработанные данные

Некоторые интерфейсы для доступа к источникам данных, предоставляемые StreamExecutionEnvironment.

(1) Файловые источники данных

readTextFile(path)
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

(2) Источник данных на основе сокетов (используется в этой статье)

Приходить socketTextStream

(3) Источник данных на основе коллекции

fromCollection(Collection)
fromCollection(Iterator, Class)
fromElements(T ...)
fromParallelCollection(SplittableIterator, Class)
generateSequence(from, to)

3. Метод преобразования

(1)Метод карты: Поток данных -> Поток данных

Функция: получить элемент и вывести элемент, аналогично функции UDF в Hive.

Пример:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

(2)Метод FlatMap: Поток данных -> Поток данных

Функция: получить элемент и вывести несколько значений, аналогично функции UDTF в Hive.

Пример:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

(3)Метод фильтрации: Поток данных -> Поток данных

Функция: определите, возвращает ли функция значение true для каждого элемента, и сохраните только тот элемент, который возвращает значение true в конце.

Пример:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

(4)Метод KeyBy: DataStream -> KeyedStream

Функция: логически разделить поток на непересекающиеся части, каждая из которых является элементом одного ключа.

Пример:

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

(5)Метод уменьшения: KeyedStream -> DataStream

Функция: Уменьшение поворота в потоке данных с ключом.

Пример:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

(6)Метод агрегации: KeyedStream -> DataStream

Функция: Агрегатная операция в ключевом потоке данных

Пример:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

(7)Оконный метод:KeyedStream -> WindowedStream

Функция: используется в KeyedStream, группируется по окнам для каждой клавиши по определенному признаку.

Пример:

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

(8)Метод WindowAll: DataStream -> AllWindowedStream

Функция: группировка по определенному признаку в DataStream.

Пример:

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

(9)Метод объединения: Поток данных* -> Поток данных

Функция: объединить несколько потоков данных в новый поток данных.

Пример:

dataStream.union(otherStream1, otherStream2, ...);

(10)Метод разделения: Поток данных -> Разделенный поток

Функция: разделить поток на несколько потоков.

Пример:

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

(11)Выберите метод: Разделенный поток -> Поток данных

Функция: выбрать поток из разделенного потока

Пример:

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

4. Выходные данные

writeAsText()
writeAsCsv(...)
print() / printToErr() 
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink

Другие принципы, связанные с Flink:

Фреймворк для вычислений в реальном времени, путешествующий во времени и пространстве — обработка времени Flink.

Король обработки больших данных в реальном времени — Flink

Унифицированная пакетная обработка потока — принцип интегрированной реализации пакетного потока Flink

Быстрый запуск Flink — установка и запуск примера

Быстро создайте свой первый проект Flink

Для получения дополнительных сведений о вычислениях в реальном времени, Flink, Kafka и других связанных технических блогах, пожалуйста, обратите внимание на потоковые вычисления в реальном времени:

file