Демонстрация: создание потоковых приложений на основе Flink SQL

Flink

Все практические упражнения в этой статье будут выполняться в интерфейсе командной строки Flink SQL, и весь процесс включает в себя только простой текст SQL, без единой строки кода Java/Scala и без установки IDE.

В прошлый четверг я поделился «Демо: создание потоковых приложений на основе Flink SQL» в группе DingTalk китайского сообщества Flink. Эта статья представляет собой краткое изложение живого контента, и в него были внесены некоторые улучшения, например, все компоненты, кроме Flink, устанавливаются с помощью Docker Compose, что упрощает процесс подготовки. Читатели также могут учиться вместе с видео и этой статьей. Вы можете посмотреть видеообзор для полной доли:воооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо

Только что был выпущен Flink 1.10.0 со множеством интересных новых функций. В частности, модуль Flink SQL развивается очень быстро, поэтому эта статья преднамеренно начинается с практической точки зрения и ведет вас к изучению того, как использовать Flink SQL для быстрого создания потоковых приложений.

В этой статье Flink SQL будет использоваться для создания приложения для анализа поведения пользователей электронной коммерции в реальном времени на основе Kafka, MySQL, Elasticsearch, Kibana. Все практические упражнения в этой статье будут выполняться в интерфейсе командной строки Flink SQL, и весь процесс включает в себя только простой текст SQL, без единой строки кода Java/Scala и без установки IDE. Окончательная визуализация этого практического упражнения:

1.png

Подготовить

Компьютер Linux или MacOS с Docker и Java8.

Запустите контейнер с помощью Docker Compose

Компоненты, на которых основана эта практическая демонстрация, организованы в контейнеры, поэтому их можноdocker-composeзапуск одной кнопкой. ты можешь пройтиwgetкоманда для автоматической загрузкиdocker-compose.ymlфайлы также можно загрузить вручную.

mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml

Контейнеры, включенные в этот Docker Compose:

  • DataGen:генератор данных. После запуска контейнер автоматически начнет генерировать данные о поведении пользователя и отправлять их в кластер Kafka. По умолчанию генерируется 1000 единиц данных в секунду, что длится около 3 часов. также можно изменитьdocker-compose.ymlв датагенеspeedupпараметры для настройки скорости сборки (перезапустите docker compose, чтобы изменения вступили в силу).
  • MySQL:MySQL 5.7 интегрирован, и таблица категорий предварительно создана (category), предварительно заполняется отношение сопоставления между подкатегорией и категорией верхнего уровня, которое позже будет использоваться в качестве таблицы измерений.
  • Kafka:В основном используется как источник данных. Компонент DataGen будет автоматически заливать данные в этот контейнер.
  • Zookeeper:Зависимости контейнера Kafka.
  • Elasticsearch:В основном он хранит данные, созданные Flink SQL.
  • Kibana:Визуализируйте данные в Elasticsearch.

Перед запуском контейнера рекомендуется изменить конфигурацию Docker, чтобы настроить ресурсы на 4 ГБ и 4 ядра. Чтобы запустить все контейнеры, просто запуститеdocker-compose.ymlЗапустите следующую команду в каталоге, где она находится.

docker-compose up -d

Эта команда автоматически запускает все контейнеры, определенные в конфигурации Docker Compose, в автономном режиме. ты можешь пройтиdocker psПосмотрим, нормально ли запускаются вышеперечисленные пять контейнеров. можно также посетитьhttp://localhost:5601/чтобы увидеть, правильно ли работает Kibana.

Кроме того, все контейнеры можно остановить с помощью следующей команды:

docker-compose down

Скачайте и установите локальный кластер Flink

Мы рекомендуем пользователям загружать и устанавливать Flink вручную вместо автоматического запуска Flink через Docker. Потому что это позволяет более интуитивно понимать различные компоненты, зависимости и сценарии Flink.

  1. Загрузите установочный пакет Flink 1.10.0 и разархивируйте его (каталог распаковкиflink-1.10.0):Woohoo.Apache.org/Day 3/Fulinkang/…

  2. Войдите в каталог flink-1.10.0:cd flink-1.10.0

  3. Загрузите зависимый пакет jar с помощью следующей команды и скопируйте его вlib/каталог, вы также можете загрузить и скопировать вручную. Потому что нам нужно полагаться на каждую реализацию коннектора во время выполнения.

wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/… ничего…


![](https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2020/2/27/17086efb1beff28e~tplv-t2oaga2asx-image.image)


4. 将 `conf/flink-conf.yaml` 中的 `taskmanager.numberOfTaskSlots` 修改成 10,因为我们会同时运行多个任务。
5. 执行 `./bin/start-cluster.sh`,启动集群。
运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。并且可以看到可用 Slots 数为 10 个。

![2.jpg](https://ucc.alicdn.com/pic/developer-ecology/ff8fa73598584c83aadea3414ecc4b98.jpg)


6. 执行 `bin/sql-client.sh embedded` 启动 SQL CLI。便会看到如下的松鼠欢迎界面。

![3.png](https://ucc.alicdn.com/pic/developer-ecology/35a3fab21cbe4fe4bda635c7964b34a6.png)


## 使用 DDL 创建 Kafka 表

Datagen 容器在启动后会往 Kafka 的 `user_behavior` topic 中持续不断地写入数据。数据包含了2017年11月27日一天的用户行为(行为包括点击、购买、加购、喜欢),每一行表示一条用户行为,以 JSON 的格式由用户ID、商品ID、商品类目ID、行为类型和时间组成。该原始数据集来自[阿里云天池公开数据集](https://tianchi.aliyun.com/dataset/dataDetail?dataId=649),特此鸣谢。

我们可以在 `docker-compose.yml` 所在目录下运行如下命令,查看 Kafka 集群中生成的前10条数据。

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'


{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} {"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} ...


有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic 了。在 Flink SQL CLI 中执行该 DDL。

СОЗДАТЬ ТАБЛИЦУ user_behavior ( user_id БОЛЬШОЕ, item_id БОЛЬШОЕ, id_категории БОЛЬШОЕ, поведение STRING, tsTIMESTAMP(3), proctime as PROCTIME(), -- создать столбец времени обработки из вычисляемого столбца ВОДЯНОЙ ЗНАК ДЛЯ ts as ts - ИНТЕРВАЛ '5' СЕКУНД - определить водяной знак на ts, ts становится столбцом времени события ) С УЧАСТИЕМ ( 'connector.type' = 'kafka', -- использовать коннектор kafka 'connector.version' = 'universal', -- версия kafka, универсальная поддерживает версии выше 0.11 'connector.topic' = 'user_behavior', -- тема кафки 'connector.startup-mode' = 'самое раннее смещение', -- читать с начального смещения 'connector.properties.zookeeper.connect' = 'localhost:2181', -- адрес zookeeper 'connector.properties.bootstrap.servers' = 'localhost:9092', -- адрес брокера kafka 'format.type' = 'json' -- формат источника данных json );


如上我们按照数据的格式声明了 5 个字段,除此之外,我们还通过计算列语法和 `PROCTIME()` 内置函数声明了一个产生处理时间的虚拟列。我们还通过 WATERMARK 语法,在 ts 字段上声明了 watermark 策略(容忍5秒乱序), ts 字段因此也成了事件时间列。关于时间属性以及 DDL 语法可以阅读官方文档了解更多:

 - 时间属性:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html
 - DDL:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table

在 SQL CLI 中成功创建 Kafka 表后,可以通过 `show tables;` 和 `describe user_behavior;` 来查看目前已注册的表,以及表的详细信息。我们也可以直接在 SQL CLI 中运行 `SELECT * FROM user_behavior;` 预览下数据(按`q`退出)。

接下来,我们会通过三个实战场景来更深入地了解 Flink SQL 。

## 统计每小时的成交量

### 使用 DDL 创建 Elasticsearch 表

我们先在 SQL CLI 中创建一个 ES 结果表,根据场景需求主要需要保存两个数据:小时、成交量。

СОЗДАТЬ ТАБЛИЦУ buy_cnt_per_hour ( hour_of_day БОЛЬШОЙ, buy_cnt БОЛЬШОЙ ) С УЧАСТИЕМ ( 'connector.type' = 'elasticsearch', -- использовать соединитель elasticsearch 'connector.version' = '6', -- версия elasticsearch, 6 может поддерживать версии es 6+ и 7+ 'connector.hosts' = 'http://localhost:9200', -- адрес elasticsearch 'connector.index' = 'buy_cnt_per_hour', -- имя индекса elasticsearch, эквивалентное имени таблицы базы данных 'connector.document-type' = 'user_behavior', -- тип elasticsearch, который эквивалентен имени библиотеки базы данных 'connector.bulk-flush.max-actions' = '1', -- обновлять все данные 'format.type' = 'json', -- формат выходных данных json 'режим обновления' = 'добавлять' );


我们不需要在 Elasticsearch 中事先创建 `buy_cnt_per_hour` 索引,Flink Job 会自动创建该索引。

### 提交 Query

统计每小时的成交量就是每小时共有多少 "buy" 的用户行为。因此会需要用到 TUMBLE 窗口函数,按照一小时切窗。然后每个窗口分别统计 "buy" 的个数,这可以通过先过滤出 "buy" 的数据,然后 `COUNT(*)` 实现。

```sql
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

Здесь мы используемHOURВстроенная функция для извлечения значения часа дня из столбца TIMESTAMP. использовалINSERT INTOРезультаты запроса непрерывно вставляются в таблицу результатов es, определенную выше (таблицу результатов es можно понимать как материализованное представление запроса). Также прочтите этот документ, чтобы узнать больше об объединении окон:this.apache.org/projects/legal…

После выполнения приведенного выше запроса в интерфейсе командной строки Flink SQL вы можете увидеть отправленную задачу в веб-интерфейсе Flink.Задача является потоковой задачей, поэтому она будет выполняться всегда.

4.jpg

Визуализируйте результаты с Kibana

Мы запустили контейнер Kibana через Docker Compose, доступ к которому можно получить черезhttp://localhost:5601Посетите Кибану. Сначала нам нужно настроить шаблон индекса. Нажмите «Управление» на левой панели инструментов, чтобы найти «Шаблоны индексирования». Нажмите «Создать шаблон индекса» и создайте шаблон индекса, введя полное имя индекса «buy_cnt_per_hour». После создания Kibana знает наш индекс, и мы можем начать изучение данных.

Сначала нажмите кнопку «Обнаружение» на левой панели инструментов, и Kibana отобразит содержимое только что созданного индекса.

5.jpg

Далее давайте создадим информационную панель для отображения каждой визуализации. Нажмите «Панель мониторинга» в левой части страницы, чтобы создать панель мониторинга под названием «Анализ журнала поведения пользователя». Затем нажмите «Создать новый», чтобы создать новое представление, выберите диаграмму с областями «Область», выберите индекс «buy_cnt_per_hour», нарисуйте диаграмму с областями объема в соответствии с конфигурацией на снимке экрана ниже (слева) и сохраните ее как «часовой график». объем "".

6.jpg

Видно, что раннее утро – это корыто объема дня.

Подсчитайте совокупное количество уникальных пользователей каждые 10 минут в течение одного дня.

Другая интересная визуализация заключается в подсчете кумулятивного количества уникальных пользователей (ув) в каждый момент дня, то есть количество ув в каждый момент представляет собой общее количество ув от 0 до текущего момента, поэтому кривая должна быть монотонно возрастает.

Мы по-прежнему начинаем с создания таблицы Elasticsearch в SQL CLI для хранения сводных данных результатов. Есть два основных поля: время и суммарное количество ультрафиолетовых лучей.

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

Чтобы реализовать эту кривую, мы можем сначала рассчитать текущую минуту для каждого данных через OVER WINDOW и текущую совокупную uv (количество независимых пользователей от 0 до текущей строки). Для статистики ув передаем встроенныйCOUNT(DISTINCT user_id)Чтобы завершить, Flink SQL выполнил большую внутреннюю оптимизацию COUNT DISTINCT, поэтому вы можете использовать его с уверенностью.

CREATE VIEW uv_per_10min AS
SELECT 
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, 
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

Здесь мы используемSUBSTRиDATE_FORMATи||Встроенная функция для преобразования поля TIMESTAMP в строку времени в 10-минутных единицах, например:12:10, 12:20. Дополнительные сведения о OVER WINDOW см. в документации:this.apache.org/projects/legal…

Мы также использовали синтаксис CREATE VIEW для регистрации запроса как логического представления, на которое можно легко ссылаться в последующих запросах, что способствует разбору сложных запросов. Обратите внимание, что создание логического представления не приведет к выполнению задания, а результаты представления не будут переданы, поэтому оно очень легковесно и не требует дополнительных затрат. так какuv_per_10minКаждая часть входных данных создает одну часть выходных данных, поэтому увеличивается нагрузка на хранилище. Мы можем основываться наuv_per_10minЗатем выполните агрегирование на основе минутного времени, чтобы каждые 10 минут в Elasticsearch сохранялась только одна точка, а нагрузка на визуализацию Elasticsearch и Kibana была намного меньше.

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

После отправки вышеуказанного запроса создайте в Kibanacumulative_uv, затем создайте линейную диаграмму «Линия» в Dashboard, выберитеcumulative_uvИндексируйте, нарисуйте кривую кумулятивного числа независимых пользователей в соответствии с конфигурацией на снимке экрана ниже (слева) и сохраните ее.

7.jpg

Таблица лидеров высшей категории

Последняя интересная визуализация — это таблица лидеров категорий, чтобы понять, какие категории являются основными категориями. Однако, поскольку категории в исходных данных слишком мелко классифицированы (около 5000 категорий), они не имеют большого значения для рейтинга, поэтому мы надеемся свести их к верхним категориям. Поэтому автор заранее подготовил данные сопоставления подкатегорий и категорий верхнего уровня в контейнере mysql и использовал их в качестве таблиц измерений.

Создайте таблицу MySQL в интерфейсе командной строки SQL, чтобы позже использовать ее в качестве запроса к таблице измерений.

CREATE TABLE category_dim (
    sub_category_id BIGINT,  -- 子类目
    parent_category_id BIGINT -- 顶级类目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

В то же время мы создаем таблицу Elasticsearch для хранения статистики категорий.

CREATE TABLE top_category (
    category_name STRING,  -- 类目名称
    buy_cnt BIGINT  -- 销量
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);

Первым шагом является завершение имени категории через ассоциацию таблицы измерений. Мы по-прежнему используем CREATE VIEW для регистрации запроса как представления, упрощая логику. Связи таблиц измерений используют синтаксис временного соединения, дополнительную информацию см. в документации:this.apache.org/projects/legal…

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, 
  CASE C.parent_category_id
    WHEN 1 THEN '服饰鞋包'
    WHEN 2 THEN '家装家饰'
    WHEN 3 THEN '家电'
    WHEN 4 THEN '美妆'
    WHEN 5 THEN '母婴'
    WHEN 6 THEN '3C数码'
    WHEN 7 THEN '运动户外'
    WHEN 8 THEN '食品'
    ELSE '其他'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

Наконец, согласно группировке названий категорий, статистикаbuyколичество событий, записанных в Elasticsearch.

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

После отправки вышеуказанного запроса создайте в Kibanatop_category, затем создайте гистограмму «горизонтальная гистограмма» в Dashboard, выберитеtop_categoryИндексируйте, нарисуйте таблицу лидеров категории в соответствии с конфигурацией на скриншоте ниже (слева) и сохраните ее.

8.jpg

Видно, что объем сделок с одеждой, обувью и сумками намного опережает другие категории.

На данный момент мы завершили три практических кейса и их визуализации. Теперь вы можете вернуться на страницу информационной панели и перетащить каждое представление, чтобы наша информационная панель выглядела более формальной и интуитивно понятной (например, изображения в начале этой статьи). Конечно, Kibana также предоставляет множество вариантов графики и визуализации, и в данных о поведении пользователей есть много интересной информации, которую стоит изучить.Заинтересованные читатели могут использовать Flink SQL для анализа данных в большем количестве измерений и использовать Kibana, чтобы показать больше. графики и наблюдайте за изменениями данных графика в режиме реального времени.

конец

В этой статье мы показали, как использовать Flink SQL для интеграции Kafka, MySQL, Elasticsearch и Kibana для быстрого создания приложения для аналитики в реальном времени. Весь процесс не требует ни одной строки кода Java/Scala и может быть выполнен с использованием простого SQL. Мы надеемся, что благодаря этой статье читатели смогут понять простоту использования и возможности Flink SQL, включая простое подключение к различным внешним системам, встроенную поддержку времени события и обработки неупорядоченных данных, ассоциацию таблиц измерений, богатые встроенные функции. по функциям и прочее. Надеюсь, вам понравятся наши практические упражнения, и вы получите удовольствие и знания!