предисловие
Теперь Hive стал основным компонентом экосистемы хранилища данных. Это не только механизм SQL для анализа больших данных и ETL, но и платформа управления данными, которую можно использовать для обнаружения, определения и развития данных. В настоящее время Flink является самым популярным механизмом потоковых вычислений, который может выполнять вычисления состояния для неограниченных и ограниченных потоков данных. Flink поддерживает интеграцию с Hive, начиная с версии 1.9, но версия 1.9 является бета-версией и не рекомендуется для использования в производственных средах. В версии 1.10, после завершения интеграции Alibaba Blink, интеграция Flink с Hive также достигла требований производственного уровня. Стоит отметить, что разные версии Flink интегрируют Hive по-разному.В этой статье в качестве примеров будут взяты последняя версия Flink 1.12 и версия Hive 3.1.2, чтобы кратко описать шаги Flink по интеграции Hive.
Интеграция
Интеграция Flink и Hive включает следующие два уровня:
-
Прежде всего, Flink использует хранилище метаданных Hive в качестве постоянного каталога, мы можем передать
HiveCatalog
Сохраняйте метаданные Flink из разных сеансов в хранилище метаданных Hive. Например, мы можем использоватьHiveCatalog
Сохраняйте метаданные для таблиц Kafka или Elasticsearch в хранилище метаданных Hive и повторно используйте их в последующих запросах SQL. -
Во-вторых, используйте Flink для чтения и записи таблиц Hive точно так же, как SparkSQL или Impala для запроса данных в Hive.
HiveCatalog
Дизайн Hive обеспечивает хорошую совместимость с Hive, и пользователи могут получить доступ к своим существующим хранилищам данных Hive «из коробки». Нет необходимости изменять существующее хранилище метаданных Hive или изменять расположение данных или разделение таблицы.
Поддерживаемые версии Hive
Flink поддерживает следующие версии Hive.
большая версия | V1 | V2 | V3 | V4 | V5 | V6 | V7 |
---|---|---|---|---|---|---|---|
1.0 | 1.0.0 | 1.0.1 | |||||
1.1 | 1.1.0 | 1.1.1 | |||||
1.2 | 1.2.0 | 1.2.1 | 1.2.2 | ||||
2.0 | 2.0.0 | 2.0.1 | |||||
2.1 | 2.1.0 | 2.1.1 | |||||
2.2 | 2.2.0 | ||||||
2.3 | 2.3.0 | 2.3.1 | 2.3.2 | 2.3.3 | 2.3.4 | 2.3.5 | 2.3.6 |
3.1 | 3.1.0 | 3.1.1 | 3.1.2 |
Обратите внимание, что доступность некоторых функций зависит от используемой вами версии Hive, эти ограничения не связаны с Flink:
- Встроенные функции Hive поддерживаются при использовании Hive-1.2.0 и более поздних версий.
- Ограничения столбцов, а именно PRIMARY KEY и NOT NULL, поддерживаются при использовании Hive-3.1.0 и более поздних версий.
- Статистика таблиц изменений, поддерживается при использовании Hive-1.2.0 и более поздних версий.
-
DATE
Статистика столбцов, поддерживаемая при использовании Hive-1.2.0 и более поздних версий. - Запись в таблицы ORC не поддерживается при использовании версий Hive-2.0.x.
зависимости
Чтобы интегрировать Hive с Flink, вам нужно добавить несколько дополнительных JAR-пакетов зависимостей и поместить их в папку lib каталога установки Flink, чтобы вы могли взаимодействовать с Hive через Table API или SQL-клиент.
Официальный сайт Flink предоставляет два способа добавления зависимостей Hive. Первый — использовать банку Hive, предоставленную Flink, и соответствующую банку Hive можно выбрать в соответствии с используемой версией Metastore. Второй способ — добавить каждую требуемую банку отдельно, например, используемая вами версия Hive отличается от той, которая совместима с банкой Hive, предоставленной Flink.
Примечание. Рекомендуется использовать банку Hive, предоставленную Flink.
Используйте банку Hive, предоставленную Flink.
Все доступные jar-файлы Hive перечислены ниже, мы можем загрузить соответствующие jar-файлы в соответствии с используемой версией Hive. Например, в этой статье используется версия Hive 3.1.2, поэтому вам нужно всего лишь загрузитьflink-sql-connector-hive-3.1.2.jarИ поместите его в папку lib каталога установки Flink.
Metastore version | Maven dependency | SQL Client JAR |
---|---|---|
1.0.0 ~ 1.2.2 | flink-sql-connector-hive-1.2.2 |
скачать |
2.0.0 ~2.2.0 | flink-sql-connector-hive-2.2.0 |
скачать |
2.3.0 ~2.3.6 | flink-sql-connector-hive-2.3.6 |
скачать |
3.0.0 ~ 3.1.2 | flink-sql-connector-hive-3.1.2 |
скачать |
определяемые пользователем зависимости
Возьмем Hive 3.1.2 в качестве примера, за исключениемflink-sql-connector-hive-3.1.2.jarВ дополнение к jar-файлам, которые нам нужны при использовании Flink SQL Cli, нам также нужно добавить следующие jar-файлы:
банка зависимостей | ссылка для скачивания |
---|---|
flink-connector-hive_2.12-1.12.0.jar | скачать |
flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar | скачать |
hive-exec-3.1.2.jar | В каталоге lib в каталоге установки Hive |
libfb303-0.9.3.jar | В каталоге lib в каталоге установки Hive |
Подключиться к улью
Следующее использует Flink SQL Cli для подключения к Hive.
Настройте sql-client-defaults.yaml
Файл sql-client-defaults.yaml — это файл конфигурации, используемый при запуске Flink SQL Cli. Он находится в папке conf каталога установки Flink. Конкретная конфигурация выглядит следующим образом, в основном, для настройки каталога:
catalogs:
- name: myhive
type: hive
default-database: default
hive-conf-dir: /opt/hive/conf/
hadoop-conf-dir: /opt/hadoop/etc/hadoop/
В следующей таблице перечислены определения через файлы YAML или DDL.HiveCatalog
Параметры поддерживаются, когда:
параметр | обязательный | По умолчанию | тип | описывать |
---|---|---|---|---|
type | да | (никто) | String | Тип каталога. При создании HiveCatalog для этого параметра необходимо установить значение'hive' . |
name | да | (никто) | String | Название каталога. Требуется только при использовании файла YAML. |
hive-conf-dir | нет | (никто) | String | URI, указывающий на каталог, содержащий файл hive-site.xml. URI должен относиться к типу, поддерживаемому файловой системой Hadoop. Если указан относительный URI, то есть не содержит схемы, по умолчанию используется локальная файловая система. Если этот параметр не указан, мы будем искать hive-site.xml в пути к классам. |
default-database | нет | default | String | Текущая база данных по умолчанию для использования, когда каталог установлен в качестве текущего каталога. |
hive-version | нет | (никто) | String | HiveCatalog может автоматически определять используемую версию Hive. мы предлагаемне хотетьВручную установите версию Hive, если механизм автоматического определения не дал сбой. |
hadoop-conf-dir | нет | (никто) | String | Путь к каталогу файла конфигурации Hadoop. В настоящее время поддерживаются только пути локальной файловой системы. Мы рекомендуем использоватьHADOOP_CONF_DIRпеременные среды для указания конфигурации Hadoop. Поэтому рассмотрите возможность использования этого параметра только в том случае, если переменная среды не соответствует вашим потребностям, например, если вы хотите настроить конфигурацию Hadoop индивидуально для каждого HiveCatalog. |
Управление таблицами в Hive
Сначала запустите FlinkSQL Cli, команда выглядит следующим образом:
sql-client.sh embedded
Запуск завершается со следующей ошибкой:
Причина ошибки в том, что Flink не поддерживает встроенное хранилище метаданных при интеграции Hive.При настройке Hive необходимо запустить службу хранилища метаданных Hive и установить правильное значение свойства hive.metastore.uris в файле conf/hive-site. xml-файл конфигурации.
Запустите службу хранилища метаданных Hive.
hive --service metastore
Настройте hive.metastore.uris в файле hive-site.xml.
<property>
<name>hive.metastore.uris</name>
<value>thrift://localhost:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
Снова успешно запустите FlinkSQL Cli, после чего мы сможем просмотреть зарегистрированный каталог.
Flink SQL> show catalogs;
default_catalog
myhive
Используйте зарегистрированный каталог myhive
Flink SQL> use catalog myhive;
Запросите базу данных, proghive - это библиотека Hive, которую я создал, когда практиковал "Руководство по программированию Hive".
Flink SQL> show databases;
default
proghive
посмотреть все столы
Flink SQL> use proghive;
Flink SQL> show tables;
dividends
employees
stocks
Запросите таблицу сотрудников в Hive:
hive> select * from employees;
OK
John Doe 100000.0 ["Mary Smith","Todd Jones"] {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1} {"street":"1 Michigan Ave.","city":"Chicago","state":"IL","zip":60600}
Mary Smith 80000.0 ["Bill King"] {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1} {"street":"100 Ontario St.","city":"Chicago","state":"IL","zip":60601}
Todd Jones 70000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"200 Chicago Ave.","city":"Oak Park","state":"IL","zip":60700}
Bill King 60000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"300 Obscure Dr.","city":"Obscuria","state":"IL","zip":60100}
Boss Man 200000.0 ["John Doe","Fred Finance"] {"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05} {"street":"1 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Fred Finance 150000.0 ["Stacy Accountant"] {"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05} {"street":"2 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Stacy Accountant 60000.0 [] {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1} {"street":"300 Main St.","city":"Naperville","state":"IL","zip":60563}
Time taken: 0.21 seconds, Fetched: 7 row(s)
Теперь мы используем Flink SQL для запроса таблицы в Hive.
Flink SQL> select * from employees;
Затем мы создаем таблицу источника данных Kafka в FlinkSQL Cli:
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用户id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品类id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT, -- 用户行为发生的时间戳
`proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', -- 数据源格式为json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
Посмотреть структуру таблицы
Flink SQL> DESCRIBE user_behavior;
Мы можем выполнить следующую команду в клиенте Hive, чтобы просмотреть таблицу, только что созданную в Flink SQL Cli.
hive> desc formatted user_behavior;
OK
# col_name data_type comment
# Detailed Table Information
Database: proghive
OwnerType: USER
Owner: null
CreateTime: Thu Dec 24 15:52:18 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://localhost:9000/user/hive/warehouse/proghive.db/user_behavior
Table Type: MANAGED_TABLE
Table Parameters:
flink.connector kafka
flink.format json
flink.json.fail-on-missing-field true
flink.json.ignore-parse-errors false
flink.properties.bootstrap.servers localhost:9092
flink.properties.group.id group1
flink.scan.startup.mode earliest-offset
flink.schema.0.data-type BIGINT
flink.schema.0.name user_id
flink.schema.1.data-type BIGINT
flink.schema.1.name item_id
flink.schema.2.data-type BIGINT
flink.schema.2.name cat_id
flink.schema.3.data-type VARCHAR(2147483647)
flink.schema.3.name action
flink.schema.4.data-type INT
flink.schema.4.name province
flink.schema.5.data-type BIGINT
flink.schema.5.name ts
flink.schema.6.data-type TIMESTAMP(3) NOT NULL
flink.schema.6.expr PROCTIME()
flink.schema.6.name proctime
flink.schema.7.data-type TIMESTAMP(3)
flink.schema.7.expr TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
flink.schema.7.name eventTime
flink.schema.watermark.0.rowtime eventTime
flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
flink.schema.watermark.0.strategy.expr `eventTime` - INTERVAL '5' SECOND
flink.topic user_behavior
is_generic true
transient_lastDdlTime 1608796338
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.212 seconds, Fetched: 54 row(s)
Метаданные таблицы user_behavior, созданной Flink SQL Cli, будут сохранены в хранилище метаданных Hive.В этой статье используется MySQL. Выполните следующую команду:
SELECT
a.tbl_id, -- 表id
from_unixtime(create_time) AS create_time, -- 创建时间
a.db_id, -- 数据库id
b.name AS db_name, -- 数据库名称
a.tbl_name -- 表名称
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";