предисловие
Теперь Hive стал основным компонентом экосистемы хранилища данных. Это не только механизм SQL для анализа больших данных и ETL, но и платформа управления данными, которую можно использовать для обнаружения, определения и развития данных. В настоящее время Flink является самым популярным механизмом потоковых вычислений, который может выполнять вычисления состояния для неограниченных и ограниченных потоков данных. Flink поддерживает интеграцию с Hive, начиная с версии 1.9, но версия 1.9 является бета-версией и не рекомендуется для использования в производственных средах. В версии 1.10, после завершения интеграции Alibaba Blink, интеграция Flink с Hive также достигла требований производственного уровня. Стоит отметить, что разные версии Flink интегрируют Hive по-разному.В этой статье в качестве примеров будут взяты последняя версия Flink 1.12 и версия Hive 3.1.2, чтобы кратко описать шаги Flink по интеграции Hive.
Интеграция
Интеграция Flink и Hive включает следующие два уровня:
-
Прежде всего, Flink использует хранилище метаданных Hive в качестве постоянного каталога, мы можем передать
HiveCatalogСохраняйте метаданные Flink из разных сеансов в хранилище метаданных Hive. Например, мы можем использоватьHiveCatalogСохраняйте метаданные для таблиц Kafka или Elasticsearch в хранилище метаданных Hive и повторно используйте их в последующих запросах SQL. -
Во-вторых, используйте Flink для чтения и записи таблиц Hive точно так же, как SparkSQL или Impala для запроса данных в Hive.
HiveCatalogДизайн Hive обеспечивает хорошую совместимость с Hive, и пользователи могут получить доступ к своим существующим хранилищам данных Hive «из коробки». Нет необходимости изменять существующее хранилище метаданных Hive или изменять расположение данных или разделение таблицы.
Поддерживаемые версии Hive
Flink поддерживает следующие версии Hive.
| большая версия | V1 | V2 | V3 | V4 | V5 | V6 | V7 |
|---|---|---|---|---|---|---|---|
| 1.0 | 1.0.0 | 1.0.1 | |||||
| 1.1 | 1.1.0 | 1.1.1 | |||||
| 1.2 | 1.2.0 | 1.2.1 | 1.2.2 | ||||
| 2.0 | 2.0.0 | 2.0.1 | |||||
| 2.1 | 2.1.0 | 2.1.1 | |||||
| 2.2 | 2.2.0 | ||||||
| 2.3 | 2.3.0 | 2.3.1 | 2.3.2 | 2.3.3 | 2.3.4 | 2.3.5 | 2.3.6 |
| 3.1 | 3.1.0 | 3.1.1 | 3.1.2 |
Обратите внимание, что доступность некоторых функций зависит от используемой вами версии Hive, эти ограничения не связаны с Flink:
- Встроенные функции Hive поддерживаются при использовании Hive-1.2.0 и более поздних версий.
- Ограничения столбцов, а именно PRIMARY KEY и NOT NULL, поддерживаются при использовании Hive-3.1.0 и более поздних версий.
- Статистика таблиц изменений, поддерживается при использовании Hive-1.2.0 и более поздних версий.
-
DATEСтатистика столбцов, поддерживаемая при использовании Hive-1.2.0 и более поздних версий. - Запись в таблицы ORC не поддерживается при использовании версий Hive-2.0.x.
зависимости
Чтобы интегрировать Hive с Flink, вам нужно добавить несколько дополнительных JAR-пакетов зависимостей и поместить их в папку lib каталога установки Flink, чтобы вы могли взаимодействовать с Hive через Table API или SQL-клиент.
Официальный сайт Flink предоставляет два способа добавления зависимостей Hive. Первый — использовать банку Hive, предоставленную Flink, и соответствующую банку Hive можно выбрать в соответствии с используемой версией Metastore. Второй способ — добавить каждую требуемую банку отдельно, например, используемая вами версия Hive отличается от той, которая совместима с банкой Hive, предоставленной Flink.
Примечание. Рекомендуется использовать банку Hive, предоставленную Flink.
Используйте банку Hive, предоставленную Flink.
Все доступные jar-файлы Hive перечислены ниже, мы можем загрузить соответствующие jar-файлы в соответствии с используемой версией Hive. Например, в этой статье используется версия Hive 3.1.2, поэтому вам нужно всего лишь загрузитьflink-sql-connector-hive-3.1.2.jarИ поместите его в папку lib каталога установки Flink.
| Metastore version | Maven dependency | SQL Client JAR |
|---|---|---|
| 1.0.0 ~ 1.2.2 | flink-sql-connector-hive-1.2.2 |
скачать |
| 2.0.0 ~2.2.0 | flink-sql-connector-hive-2.2.0 |
скачать |
| 2.3.0 ~2.3.6 | flink-sql-connector-hive-2.3.6 |
скачать |
| 3.0.0 ~ 3.1.2 | flink-sql-connector-hive-3.1.2 |
скачать |
определяемые пользователем зависимости
Возьмем Hive 3.1.2 в качестве примера, за исключениемflink-sql-connector-hive-3.1.2.jarВ дополнение к jar-файлам, которые нам нужны при использовании Flink SQL Cli, нам также нужно добавить следующие jar-файлы:
| банка зависимостей | ссылка для скачивания |
|---|---|
| flink-connector-hive_2.12-1.12.0.jar | скачать |
| flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar | скачать |
| hive-exec-3.1.2.jar | В каталоге lib в каталоге установки Hive |
| libfb303-0.9.3.jar | В каталоге lib в каталоге установки Hive |
Подключиться к улью
Следующее использует Flink SQL Cli для подключения к Hive.
Настройте sql-client-defaults.yaml
Файл sql-client-defaults.yaml — это файл конфигурации, используемый при запуске Flink SQL Cli. Он находится в папке conf каталога установки Flink. Конкретная конфигурация выглядит следующим образом, в основном, для настройки каталога:
catalogs:
- name: myhive
type: hive
default-database: default
hive-conf-dir: /opt/hive/conf/
hadoop-conf-dir: /opt/hadoop/etc/hadoop/
В следующей таблице перечислены определения через файлы YAML или DDL.HiveCatalogПараметры поддерживаются, когда:
| параметр | обязательный | По умолчанию | тип | описывать |
|---|---|---|---|---|
| type | да | (никто) | String | Тип каталога. При создании HiveCatalog для этого параметра необходимо установить значение'hive'. |
| name | да | (никто) | String | Название каталога. Требуется только при использовании файла YAML. |
| hive-conf-dir | нет | (никто) | String | URI, указывающий на каталог, содержащий файл hive-site.xml. URI должен относиться к типу, поддерживаемому файловой системой Hadoop. Если указан относительный URI, то есть не содержит схемы, по умолчанию используется локальная файловая система. Если этот параметр не указан, мы будем искать hive-site.xml в пути к классам. |
| default-database | нет | default | String | Текущая база данных по умолчанию для использования, когда каталог установлен в качестве текущего каталога. |
| hive-version | нет | (никто) | String | HiveCatalog может автоматически определять используемую версию Hive. мы предлагаемне хотетьВручную установите версию Hive, если механизм автоматического определения не дал сбой. |
| hadoop-conf-dir | нет | (никто) | String | Путь к каталогу файла конфигурации Hadoop. В настоящее время поддерживаются только пути локальной файловой системы. Мы рекомендуем использоватьHADOOP_CONF_DIRпеременные среды для указания конфигурации Hadoop. Поэтому рассмотрите возможность использования этого параметра только в том случае, если переменная среды не соответствует вашим потребностям, например, если вы хотите настроить конфигурацию Hadoop индивидуально для каждого HiveCatalog. |
Управление таблицами в Hive
Сначала запустите FlinkSQL Cli, команда выглядит следующим образом:
sql-client.sh embedded
Запуск завершается со следующей ошибкой:
Причина ошибки в том, что Flink не поддерживает встроенное хранилище метаданных при интеграции Hive.При настройке Hive необходимо запустить службу хранилища метаданных Hive и установить правильное значение свойства hive.metastore.uris в файле conf/hive-site. xml-файл конфигурации.
Запустите службу хранилища метаданных Hive.
hive --service metastore
Настройте hive.metastore.uris в файле hive-site.xml.
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
Снова успешно запустите FlinkSQL Cli, после чего мы сможем просмотреть зарегистрированный каталог.
Flink SQL> show catalogs;
default_catalog
myhive
Используйте зарегистрированный каталог myhive
Flink SQL> use catalog myhive;
Запросите базу данных, proghive - это библиотека Hive, которую я создал, когда практиковал "Руководство по программированию Hive".
Flink SQL> show databases;
default
proghive
посмотреть все столы
Flink SQL> use proghive;
Flink SQL> show tables;
dividends
employees
stocks
Запросите таблицу сотрудников в Hive:
hive> select * from employees;
OK
John Doe 100000.0 ["Mary Smith","Todd Jones"] {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1} {"street":"1 Michigan Ave.","city":"Chicago","state":"IL","zip":60600}
Mary Smith 80000.0 ["Bill King"] {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1} {"street":"100 Ontario St.","city":"Chicago","state":"IL","zip":60601}
Todd Jones 70000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"200 Chicago Ave.","city":"Oak Park","state":"IL","zip":60700}
Bill King 60000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"300 Obscure Dr.","city":"Obscuria","state":"IL","zip":60100}
Boss Man 200000.0 ["John Doe","Fred Finance"] {"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05} {"street":"1 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Fred Finance 150000.0 ["Stacy Accountant"] {"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05} {"street":"2 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Stacy Accountant 60000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"300 Main St.","city":"Naperville","state":"IL","zip":60563}
Time taken: 0.21 seconds, Fetched: 7 row(s)
Теперь мы используем Flink SQL для запроса таблицы в Hive.
Flink SQL> select * from employees;
Затем мы создаем таблицу источника данных Kafka в FlinkSQL Cli:
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用户id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品类id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT, -- 用户行为发生的时间戳
`proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', -- 数据源格式为json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
Посмотреть структуру таблицы
Flink SQL> DESCRIBE user_behavior;
Мы можем выполнить следующую команду в клиенте Hive, чтобы просмотреть таблицу, только что созданную в Flink SQL Cli.
hive> desc formatted user_behavior;
OK
# col_name data_type comment
# Detailed Table Information
Database: proghive
OwnerType: USER
Owner: null
CreateTime: Thu Dec 24 15:52:18 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/proghive.db/user_behavior
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector kafka
flink.format json
flink.json.fail-on-missing-field true
flink.json.ignore-parse-errors false
flink.properties.bootstrap.servers localhost:9092
flink.properties.group.id group1
flink.scan.startup.mode earliest-offset
flink.schema.0.data-type BIGINT
flink.schema.0.name user_id
flink.schema.1.data-type BIGINT
flink.schema.1.name item_id
flink.schema.2.data-type BIGINT
flink.schema.2.name cat_id
flink.schema.3.data-type VARCHAR(2147483647)
flink.schema.3.name action
flink.schema.4.data-type INT
flink.schema.4.name province
flink.schema.5.data-type BIGINT
flink.schema.5.name ts
flink.schema.6.data-type TIMESTAMP(3) NOT NULL
flink.schema.6.expr PROCTIME()
flink.schema.6.name proctime
flink.schema.7.data-type TIMESTAMP(3)
flink.schema.7.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
flink.schema.7.name eventTime
flink.schema.watermark.0.rowtime eventTime
flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '5' SECOND
flink.topic user_behavior
is_generic true
transient_lastDdlTime 1608796338
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.212 seconds, Fetched: 54 row(s)
Метаданные таблицы user_behavior, созданной Flink SQL Cli, будут сохранены в хранилище метаданных Hive.В этой статье используется MySQL. Выполните следующую команду:
SELECT
a.tbl_id, -- 表id
from_unixtime(create_time) AS create_time, -- 创建时间
a.db_id, -- 数据库id
b.name AS db_name, -- 数据库名称
a.tbl_name -- 表名称
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";