Все практические упражнения в этой статье будут выполняться в интерфейсе командной строки Flink SQL, и весь процесс включает в себя только простой текст SQL, без единой строки кода Java/Scala и без установки IDE.
В прошлый четверг я поделился «Демо: создание потоковых приложений на основе Flink SQL» в группе DingTalk китайского сообщества Flink. Эта статья представляет собой краткое изложение живого контента, и в него были внесены некоторые улучшения, например, все компоненты, кроме Flink, устанавливаются с помощью Docker Compose, что упрощает процесс подготовки. Читатели также могут учиться вместе с видео и этой статьей. Вы можете посмотреть видеообзор для полной доли:воооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооооо
Только что был выпущен Flink 1.10.0 со множеством интересных новых функций. В частности, модуль Flink SQL развивается очень быстро, поэтому эта статья преднамеренно начинается с практической точки зрения и ведет вас к изучению того, как использовать Flink SQL для быстрого создания потоковых приложений.
В этой статье Flink SQL будет использоваться для создания приложения для анализа поведения пользователей электронной коммерции в реальном времени на основе Kafka, MySQL, Elasticsearch, Kibana. Все практические упражнения в этой статье будут выполняться в интерфейсе командной строки Flink SQL, и весь процесс включает в себя только простой текст SQL, без единой строки кода Java/Scala и без установки IDE. Окончательная визуализация этого практического упражнения:
Подготовить
Компьютер Linux или MacOS с Docker и Java8.
Запустите контейнер с помощью Docker Compose
Компоненты, на которых основана эта практическая демонстрация, организованы в контейнеры, поэтому их можноdocker-compose
запуск одной кнопкой. ты можешь пройтиwget
команда для автоматической загрузкиdocker-compose.yml
файлы также можно загрузить вручную.
mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
Контейнеры, включенные в этот Docker Compose:
-
DataGen:генератор данных. После запуска контейнер автоматически начнет генерировать данные о поведении пользователя и отправлять их в кластер Kafka. По умолчанию генерируется 1000 единиц данных в секунду, что длится около 3 часов. также можно изменить
docker-compose.yml
в датагенеspeedup
параметры для настройки скорости сборки (перезапустите docker compose, чтобы изменения вступили в силу). -
MySQL:MySQL 5.7 интегрирован, и таблица категорий предварительно создана (
category
), предварительно заполняется отношение сопоставления между подкатегорией и категорией верхнего уровня, которое позже будет использоваться в качестве таблицы измерений. - Kafka:В основном используется как источник данных. Компонент DataGen будет автоматически заливать данные в этот контейнер.
- Zookeeper:Зависимости контейнера Kafka.
- Elasticsearch:В основном он хранит данные, созданные Flink SQL.
- Kibana:Визуализируйте данные в Elasticsearch.
Перед запуском контейнера рекомендуется изменить конфигурацию Docker, чтобы настроить ресурсы на 4 ГБ и 4 ядра. Чтобы запустить все контейнеры, просто запуститеdocker-compose.yml
Запустите следующую команду в каталоге, где она находится.
docker-compose up -d
Эта команда автоматически запускает все контейнеры, определенные в конфигурации Docker Compose, в автономном режиме. ты можешь пройтиdocker ps
Посмотрим, нормально ли запускаются вышеперечисленные пять контейнеров. можно также посетитьhttp://localhost:5601/чтобы увидеть, правильно ли работает Kibana.
Кроме того, все контейнеры можно остановить с помощью следующей команды:
docker-compose down
Скачайте и установите локальный кластер Flink
Мы рекомендуем пользователям загружать и устанавливать Flink вручную вместо автоматического запуска Flink через Docker. Потому что это позволяет более интуитивно понимать различные компоненты, зависимости и сценарии Flink.
-
Загрузите установочный пакет Flink 1.10.0 и разархивируйте его (каталог распаковки
flink-1.10.0
):Woohoo.Apache.org/Day 3/Fulinkang/… -
Войдите в каталог flink-1.10.0:
cd flink-1.10.0
-
Загрузите зависимый пакет jar с помощью следующей команды и скопируйте его в
lib/
каталог, вы также можете загрузить и скопировать вручную. Потому что нам нужно полагаться на каждую реализацию коннектора во время выполнения.
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/org/… |
wget -P ./lib/ repo1.maven.org/maven2/… ничего…

4. 将 `conf/flink-conf.yaml` 中的 `taskmanager.numberOfTaskSlots` 修改成 10,因为我们会同时运行多个任务。
5. 执行 `./bin/start-cluster.sh`,启动集群。
运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。并且可以看到可用 Slots 数为 10 个。

6. 执行 `bin/sql-client.sh embedded` 启动 SQL CLI。便会看到如下的松鼠欢迎界面。

## 使用 DDL 创建 Kafka 表
Datagen 容器在启动后会往 Kafka 的 `user_behavior` topic 中持续不断地写入数据。数据包含了2017年11月27日一天的用户行为(行为包括点击、购买、加购、喜欢),每一行表示一条用户行为,以 JSON 的格式由用户ID、商品ID、商品类目ID、行为类型和时间组成。该原始数据集来自[阿里云天池公开数据集](https://tianchi.aliyun.com/dataset/dataDetail?dataId=649),特此鸣谢。
我们可以在 `docker-compose.yml` 所在目录下运行如下命令,查看 Kafka 集群中生成的前10条数据。
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} {"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} ...
有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic 了。在 Flink SQL CLI 中执行该 DDL。
СОЗДАТЬ ТАБЛИЦУ user_behavior ( user_id БОЛЬШОЕ, item_id БОЛЬШОЕ, id_категории БОЛЬШОЕ, поведение STRING, tsTIMESTAMP(3), proctime as PROCTIME(), -- создать столбец времени обработки из вычисляемого столбца ВОДЯНОЙ ЗНАК ДЛЯ ts as ts - ИНТЕРВАЛ '5' СЕКУНД - определить водяной знак на ts, ts становится столбцом времени события ) С УЧАСТИЕМ ( 'connector.type' = 'kafka', -- использовать коннектор kafka 'connector.version' = 'universal', -- версия kafka, универсальная поддерживает версии выше 0.11 'connector.topic' = 'user_behavior', -- тема кафки 'connector.startup-mode' = 'самое раннее смещение', -- читать с начального смещения 'connector.properties.zookeeper.connect' = 'localhost:2181', -- адрес zookeeper 'connector.properties.bootstrap.servers' = 'localhost:9092', -- адрес брокера kafka 'format.type' = 'json' -- формат источника данных json );
如上我们按照数据的格式声明了 5 个字段,除此之外,我们还通过计算列语法和 `PROCTIME()` 内置函数声明了一个产生处理时间的虚拟列。我们还通过 WATERMARK 语法,在 ts 字段上声明了 watermark 策略(容忍5秒乱序), ts 字段因此也成了事件时间列。关于时间属性以及 DDL 语法可以阅读官方文档了解更多:
- 时间属性:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html
- DDL:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table
在 SQL CLI 中成功创建 Kafka 表后,可以通过 `show tables;` 和 `describe user_behavior;` 来查看目前已注册的表,以及表的详细信息。我们也可以直接在 SQL CLI 中运行 `SELECT * FROM user_behavior;` 预览下数据(按`q`退出)。
接下来,我们会通过三个实战场景来更深入地了解 Flink SQL 。
## 统计每小时的成交量
### 使用 DDL 创建 Elasticsearch 表
我们先在 SQL CLI 中创建一个 ES 结果表,根据场景需求主要需要保存两个数据:小时、成交量。
СОЗДАТЬ ТАБЛИЦУ buy_cnt_per_hour ( hour_of_day БОЛЬШОЙ, buy_cnt БОЛЬШОЙ ) С УЧАСТИЕМ ( 'connector.type' = 'elasticsearch', -- использовать соединитель elasticsearch 'connector.version' = '6', -- версия elasticsearch, 6 может поддерживать версии es 6+ и 7+ 'connector.hosts' = 'http://localhost:9200', -- адрес elasticsearch 'connector.index' = 'buy_cnt_per_hour', -- имя индекса elasticsearch, эквивалентное имени таблицы базы данных 'connector.document-type' = 'user_behavior', -- тип elasticsearch, который эквивалентен имени библиотеки базы данных 'connector.bulk-flush.max-actions' = '1', -- обновлять все данные 'format.type' = 'json', -- формат выходных данных json 'режим обновления' = 'добавлять' );
我们不需要在 Elasticsearch 中事先创建 `buy_cnt_per_hour` 索引,Flink Job 会自动创建该索引。
### 提交 Query
统计每小时的成交量就是每小时共有多少 "buy" 的用户行为。因此会需要用到 TUMBLE 窗口函数,按照一小时切窗。然后每个窗口分别统计 "buy" 的个数,这可以通过先过滤出 "buy" 的数据,然后 `COUNT(*)` 实现。
```sql
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
Здесь мы используемHOUR
Встроенная функция для извлечения значения часа дня из столбца TIMESTAMP. использовалINSERT INTO
Результаты запроса непрерывно вставляются в таблицу результатов es, определенную выше (таблицу результатов es можно понимать как материализованное представление запроса). Также прочтите этот документ, чтобы узнать больше об объединении окон:this.apache.org/projects/legal…
После выполнения приведенного выше запроса в интерфейсе командной строки Flink SQL вы можете увидеть отправленную задачу в веб-интерфейсе Flink.Задача является потоковой задачей, поэтому она будет выполняться всегда.
Визуализируйте результаты с Kibana
Мы запустили контейнер Kibana через Docker Compose, доступ к которому можно получить черезhttp://localhost:5601Посетите Кибану. Сначала нам нужно настроить шаблон индекса. Нажмите «Управление» на левой панели инструментов, чтобы найти «Шаблоны индексирования». Нажмите «Создать шаблон индекса» и создайте шаблон индекса, введя полное имя индекса «buy_cnt_per_hour». После создания Kibana знает наш индекс, и мы можем начать изучение данных.
Сначала нажмите кнопку «Обнаружение» на левой панели инструментов, и Kibana отобразит содержимое только что созданного индекса.
Далее давайте создадим информационную панель для отображения каждой визуализации. Нажмите «Панель мониторинга» в левой части страницы, чтобы создать панель мониторинга под названием «Анализ журнала поведения пользователя». Затем нажмите «Создать новый», чтобы создать новое представление, выберите диаграмму с областями «Область», выберите индекс «buy_cnt_per_hour», нарисуйте диаграмму с областями объема в соответствии с конфигурацией на снимке экрана ниже (слева) и сохраните ее как «часовой график». объем "".
Видно, что раннее утро – это корыто объема дня.
Подсчитайте совокупное количество уникальных пользователей каждые 10 минут в течение одного дня.
Другая интересная визуализация заключается в подсчете кумулятивного количества уникальных пользователей (ув) в каждый момент дня, то есть количество ув в каждый момент представляет собой общее количество ув от 0 до текущего момента, поэтому кривая должна быть монотонно возрастает.
Мы по-прежнему начинаем с создания таблицы Elasticsearch в SQL CLI для хранения сводных данных результатов. Есть два основных поля: время и суммарное количество ультрафиолетовых лучей.
CREATE TABLE cumulative_uv (
time_str STRING,
uv BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'cumulative_uv',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
Чтобы реализовать эту кривую, мы можем сначала рассчитать текущую минуту для каждого данных через OVER WINDOW и текущую совокупную uv (количество независимых пользователей от 0 до текущей строки). Для статистики ув передаем встроенныйCOUNT(DISTINCT user_id)
Чтобы завершить, Flink SQL выполнил большую внутреннюю оптимизацию COUNT DISTINCT, поэтому вы можете использовать его с уверенностью.
CREATE VIEW uv_per_10min AS
SELECT
MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
Здесь мы используемSUBSTR
иDATE_FORMAT
и||
Встроенная функция для преобразования поля TIMESTAMP в строку времени в 10-минутных единицах, например:12:10
, 12:20
. Дополнительные сведения о OVER WINDOW см. в документации:this.apache.org/projects/legal…
Мы также использовали синтаксис CREATE VIEW для регистрации запроса как логического представления, на которое можно легко ссылаться в последующих запросах, что способствует разбору сложных запросов. Обратите внимание, что создание логического представления не приведет к выполнению задания, а результаты представления не будут переданы, поэтому оно очень легковесно и не требует дополнительных затрат. так какuv_per_10min
Каждая часть входных данных создает одну часть выходных данных, поэтому увеличивается нагрузка на хранилище. Мы можем основываться наuv_per_10min
Затем выполните агрегирование на основе минутного времени, чтобы каждые 10 минут в Elasticsearch сохранялась только одна точка, а нагрузка на визуализацию Elasticsearch и Kibana была намного меньше.
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
После отправки вышеуказанного запроса создайте в Kibanacumulative_uv
, затем создайте линейную диаграмму «Линия» в Dashboard, выберитеcumulative_uv
Индексируйте, нарисуйте кривую кумулятивного числа независимых пользователей в соответствии с конфигурацией на снимке экрана ниже (слева) и сохраните ее.
Таблица лидеров высшей категории
Последняя интересная визуализация — это таблица лидеров категорий, чтобы понять, какие категории являются основными категориями. Однако, поскольку категории в исходных данных слишком мелко классифицированы (около 5000 категорий), они не имеют большого значения для рейтинга, поэтому мы надеемся свести их к верхним категориям. Поэтому автор заранее подготовил данные сопоставления подкатегорий и категорий верхнего уровня в контейнере mysql и использовал их в качестве таблиц измерений.
Создайте таблицу MySQL в интерфейсе командной строки SQL, чтобы позже использовать ее в качестве запроса к таблице измерений.
CREATE TABLE category_dim (
sub_category_id BIGINT, -- 子类目
parent_category_id BIGINT -- 顶级类目
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink',
'connector.table' = 'category',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
В то же время мы создаем таблицу Elasticsearch для хранения статистики категорий.
CREATE TABLE top_category (
category_name STRING, -- 类目名称
buy_cnt BIGINT -- 销量
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://localhost:9200',
'connector.index' = 'top_category',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
Первым шагом является завершение имени категории через ассоциацию таблицы измерений. Мы по-прежнему используем CREATE VIEW для регистрации запроса как представления, упрощая логику. Связи таблиц измерений используют синтаксис временного соединения, дополнительную информацию см. в документации:this.apache.org/projects/legal…
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior,
CASE C.parent_category_id
WHEN 1 THEN '服饰鞋包'
WHEN 2 THEN '家装家饰'
WHEN 3 THEN '家电'
WHEN 4 THEN '美妆'
WHEN 5 THEN '母婴'
WHEN 6 THEN '3C数码'
WHEN 7 THEN '运动户外'
WHEN 8 THEN '食品'
ELSE '其他'
END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
Наконец, согласно группировке названий категорий, статистикаbuy
количество событий, записанных в Elasticsearch.
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
После отправки вышеуказанного запроса создайте в Kibanatop_category
, затем создайте гистограмму «горизонтальная гистограмма» в Dashboard, выберитеtop_category
Индексируйте, нарисуйте таблицу лидеров категории в соответствии с конфигурацией на скриншоте ниже (слева) и сохраните ее.
Видно, что объем сделок с одеждой, обувью и сумками намного опережает другие категории.
На данный момент мы завершили три практических кейса и их визуализации. Теперь вы можете вернуться на страницу информационной панели и перетащить каждое представление, чтобы наша информационная панель выглядела более формальной и интуитивно понятной (например, изображения в начале этой статьи). Конечно, Kibana также предоставляет множество вариантов графики и визуализации, и в данных о поведении пользователей есть много интересной информации, которую стоит изучить.Заинтересованные читатели могут использовать Flink SQL для анализа данных в большем количестве измерений и использовать Kibana, чтобы показать больше. графики и наблюдайте за изменениями данных графика в режиме реального времени.
конец
В этой статье мы показали, как использовать Flink SQL для интеграции Kafka, MySQL, Elasticsearch и Kibana для быстрого создания приложения для аналитики в реальном времени. Весь процесс не требует ни одной строки кода Java/Scala и может быть выполнен с использованием простого SQL. Мы надеемся, что благодаря этой статье читатели смогут понять простоту использования и возможности Flink SQL, включая простое подключение к различным внешним системам, встроенную поддержку времени события и обработки неупорядоченных данных, ассоциацию таблиц измерений, богатые встроенные функции. по функциям и прочее. Надеюсь, вам понравятся наши практические упражнения, и вы получите удовольствие и знания!