Flink 1.12.0 интегрирует Hive 3.1.2

Flink

предисловие

Теперь Hive стал основным компонентом экосистемы хранилища данных. Это не только механизм SQL для анализа больших данных и ETL, но и платформа управления данными, которую можно использовать для обнаружения, определения и развития данных. В настоящее время Flink является самым популярным механизмом потоковых вычислений, который может выполнять вычисления состояния для неограниченных и ограниченных потоков данных. Flink поддерживает интеграцию с Hive, начиная с версии 1.9, но версия 1.9 является бета-версией и не рекомендуется для использования в производственных средах. В версии 1.10, после завершения интеграции Alibaba Blink, интеграция Flink с Hive также достигла требований производственного уровня. Стоит отметить, что разные версии Flink интегрируют Hive по-разному.В этой статье в качестве примеров будут взяты последняя версия Flink 1.12 и версия Hive 3.1.2, чтобы кратко описать шаги Flink по интеграции Hive.

Интеграция

Интеграция Flink и Hive включает следующие два уровня:

  • Прежде всего, Flink использует хранилище метаданных Hive в качестве постоянного каталога, мы можем передатьHiveCatalog Сохраняйте метаданные Flink из разных сеансов в хранилище метаданных Hive. Например, мы можем использоватьHiveCatalogСохраняйте метаданные для таблиц Kafka или Elasticsearch в хранилище метаданных Hive и повторно используйте их в последующих запросах SQL.

  • Во-вторых, используйте Flink для чтения и записи таблиц Hive точно так же, как SparkSQL или Impala для запроса данных в Hive.

HiveCatalogДизайн Hive обеспечивает хорошую совместимость с Hive, и пользователи могут получить доступ к своим существующим хранилищам данных Hive «из коробки». Нет необходимости изменять существующее хранилище метаданных Hive или изменять расположение данных или разделение таблицы.

Поддерживаемые версии Hive

Flink поддерживает следующие версии Hive.

большая версия V1 V2 V3 V4 V5 V6 V7
1.0 1.0.0 1.0.1
1.1 1.1.0 1.1.1
1.2 1.2.0 1.2.1 1.2.2
2.0 2.0.0 2.0.1
2.1 2.1.0 2.1.1
2.2 2.2.0
2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6
3.1 3.1.0 3.1.1 3.1.2

Обратите внимание, что доступность некоторых функций зависит от используемой вами версии Hive, эти ограничения не связаны с Flink:

  • Встроенные функции Hive поддерживаются при использовании Hive-1.2.0 и более поздних версий.
  • Ограничения столбцов, а именно PRIMARY KEY и NOT NULL, поддерживаются при использовании Hive-3.1.0 и более поздних версий.
  • Статистика таблиц изменений, поддерживается при использовании Hive-1.2.0 и более поздних версий.
  • DATEСтатистика столбцов, поддерживаемая при использовании Hive-1.2.0 и более поздних версий.
  • Запись в таблицы ORC не поддерживается при использовании версий Hive-2.0.x.

зависимости

Чтобы интегрировать Hive с Flink, вам нужно добавить несколько дополнительных JAR-пакетов зависимостей и поместить их в папку lib каталога установки Flink, чтобы вы могли взаимодействовать с Hive через Table API или SQL-клиент.

Официальный сайт Flink предоставляет два способа добавления зависимостей Hive. Первый — использовать банку Hive, предоставленную Flink, и соответствующую банку Hive можно выбрать в соответствии с используемой версией Metastore. Второй способ — добавить каждую требуемую банку отдельно, например, используемая вами версия Hive отличается от той, которая совместима с банкой Hive, предоставленной Flink.

Примечание. Рекомендуется использовать банку Hive, предоставленную Flink.

Используйте банку Hive, предоставленную Flink.

Все доступные jar-файлы Hive перечислены ниже, мы можем загрузить соответствующие jar-файлы в соответствии с используемой версией Hive. Например, в этой статье используется версия Hive 3.1.2, поэтому вам нужно всего лишь загрузитьflink-sql-connector-hive-3.1.2.jarИ поместите его в папку lib каталога установки Flink.

Metastore version Maven dependency SQL Client JAR
1.0.0 ~ 1.2.2 flink-sql-connector-hive-1.2.2 скачать
2.0.0 ~2.2.0 flink-sql-connector-hive-2.2.0 скачать
2.3.0 ~2.3.6 flink-sql-connector-hive-2.3.6 скачать
3.0.0 ~ 3.1.2 flink-sql-connector-hive-3.1.2 скачать

определяемые пользователем зависимости

Возьмем Hive 3.1.2 в качестве примера, за исключениемflink-sql-connector-hive-3.1.2.jarВ дополнение к jar-файлам, которые нам нужны при использовании Flink SQL Cli, нам также нужно добавить следующие jar-файлы:

банка зависимостей ссылка для скачивания
flink-connector-hive_2.12-1.12.0.jar скачать
flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar скачать
hive-exec-3.1.2.jar В каталоге lib в каталоге установки Hive
libfb303-0.9.3.jar В каталоге lib в каталоге установки Hive

Подключиться к улью

Следующее использует Flink SQL Cli для подключения к Hive.

Настройте sql-client-defaults.yaml

Файл sql-client-defaults.yaml — это файл конфигурации, используемый при запуске Flink SQL Cli. Он находится в папке conf каталога установки Flink. Конкретная конфигурация выглядит следующим образом, в основном, для настройки каталога:

catalogs:
    - name: myhive
      type: hive
      default-database: default
      hive-conf-dir: /opt/hive/conf/
      hadoop-conf-dir: /opt/hadoop/etc/hadoop/

В следующей таблице перечислены определения через файлы YAML или DDL.HiveCatalogПараметры поддерживаются, когда:

параметр обязательный По умолчанию тип описывать
type да (никто) String Тип каталога. При создании HiveCatalog для этого параметра необходимо установить значение'hive'.
name да (никто) String Название каталога. Требуется только при использовании файла YAML.
hive-conf-dir нет (никто) String URI, указывающий на каталог, содержащий файл hive-site.xml. URI должен относиться к типу, поддерживаемому файловой системой Hadoop. Если указан относительный URI, то есть не содержит схемы, по умолчанию используется локальная файловая система. Если этот параметр не указан, мы будем искать hive-site.xml в пути к классам.
default-database нет default String Текущая база данных по умолчанию для использования, когда каталог установлен в качестве текущего каталога.
hive-version нет (никто) String HiveCatalog может автоматически определять используемую версию Hive. мы предлагаемне хотетьВручную установите версию Hive, если механизм автоматического определения не дал сбой.
hadoop-conf-dir нет (никто) String Путь к каталогу файла конфигурации Hadoop. В настоящее время поддерживаются только пути локальной файловой системы. Мы рекомендуем использоватьHADOOP_CONF_DIRпеременные среды для указания конфигурации Hadoop. Поэтому рассмотрите возможность использования этого параметра только в том случае, если переменная среды не соответствует вашим потребностям, например, если вы хотите настроить конфигурацию Hadoop индивидуально для каждого HiveCatalog.

Управление таблицами в Hive

Сначала запустите FlinkSQL Cli, команда выглядит следующим образом:

sql-client.sh embedded

Запуск завершается со следующей ошибкой:

启动报错

Причина ошибки в том, что Flink не поддерживает встроенное хранилище метаданных при интеграции Hive.При настройке Hive необходимо запустить службу хранилища метаданных Hive и установить правильное значение свойства hive.metastore.uris в файле conf/hive-site. xml-файл конфигурации.

Запустите службу хранилища метаданных Hive.

hive --service metastore

Настройте hive.metastore.uris в файле hive-site.xml.

<property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
    <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
  </property>

Снова успешно запустите FlinkSQL Cli, после чего мы сможем просмотреть зарегистрированный каталог.

Flink SQL> show catalogs;
default_catalog
myhive

Используйте зарегистрированный каталог myhive

Flink SQL> use catalog myhive;

Запросите базу данных, proghive - это библиотека Hive, которую я создал, когда практиковал "Руководство по программированию Hive".

Flink SQL> show databases;
default
proghive

посмотреть все столы

Flink SQL> use proghive;
Flink SQL> show tables;
dividends
employees
stocks

Запросите таблицу сотрудников в Hive:

hive> select * from employees;
OK
John Doe	100000.0	["Mary Smith","Todd Jones"]	{"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1}	{"street":"1 Michigan Ave.","city":"Chicago","state":"IL","zip":60600}
Mary Smith	80000.0	["Bill King"]	{"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1}	{"street":"100 Ontario St.","city":"Chicago","state":"IL","zip":60601}
Todd Jones	70000.0	[]	{"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}	{"street":"200 Chicago Ave.","city":"Oak Park","state":"IL","zip":60700}
Bill King	60000.0	[]	{"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}	{"street":"300 Obscure Dr.","city":"Obscuria","state":"IL","zip":60100}
Boss Man	200000.0	["John Doe","Fred Finance"]	{"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05}	{"street":"1 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Fred Finance	150000.0	["Stacy Accountant"]	{"Federal Taxes":0.3,"State Taxes":0.07,"Insurance":0.05}	{"street":"2 Pretentious Drive.","city":"Chicago","state":"IL","zip":60500}
Stacy Accountant	60000.0	[]	{"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}	{"street":"300 Main St.","city":"Naperville","state":"IL","zip":60563}
Time taken: 0.21 seconds, Fetched: 7 row(s)

Теперь мы используем Flink SQL для запроса таблицы в Hive.

Flink SQL> select * from employees;

employees表数据

Затем мы создаем таблицу источника данных Kafka в FlinkSQL Cli:

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为发生的时间戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'localhost:9092', 
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

Посмотреть структуру таблицы

Flink SQL> DESCRIBE user_behavior;

user_behavior表结构

Мы можем выполнить следующую команду в клиенте Hive, чтобы просмотреть таблицу, только что созданную в Flink SQL Cli.

hive> desc formatted  user_behavior;
OK
# col_name            	data_type           	comment

# Detailed Table Information
Database:           	proghive
OwnerType:          	USER
Owner:              	null
CreateTime:         	Thu Dec 24 15:52:18 CST 2020
LastAccessTime:     	UNKNOWN
Retention:          	0
Location:           	hdfs://localhost:9000/user/hive/warehouse/proghive.db/user_behavior
Table Type:         	MANAGED_TABLE
Table Parameters:
	flink.connector     	kafka
	flink.format        	json
	flink.json.fail-on-missing-field	true
	flink.json.ignore-parse-errors	false
	flink.properties.bootstrap.servers	localhost:9092
	flink.properties.group.id	group1
	flink.scan.startup.mode	earliest-offset
	flink.schema.0.data-type	BIGINT
	flink.schema.0.name 	user_id
	flink.schema.1.data-type	BIGINT
	flink.schema.1.name 	item_id
	flink.schema.2.data-type	BIGINT
	flink.schema.2.name 	cat_id
	flink.schema.3.data-type	VARCHAR(2147483647)
	flink.schema.3.name 	action
	flink.schema.4.data-type	INT
	flink.schema.4.name 	province
	flink.schema.5.data-type	BIGINT
	flink.schema.5.name 	ts
	flink.schema.6.data-type	TIMESTAMP(3) NOT NULL
	flink.schema.6.expr 	PROCTIME()
	flink.schema.6.name 	proctime
	flink.schema.7.data-type	TIMESTAMP(3)
	flink.schema.7.expr 	TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
	flink.schema.7.name 	eventTime
	flink.schema.watermark.0.rowtime	eventTime
	flink.schema.watermark.0.strategy.data-type	TIMESTAMP(3)
	flink.schema.watermark.0.strategy.expr	`eventTime` - INTERVAL '5' SECOND
	flink.topic         	user_behavior
	is_generic          	true
	transient_lastDdlTime	1608796338

# Storage Information
SerDe Library:      	org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:        	org.apache.hadoop.mapred.TextInputFormat
OutputFormat:       	org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:         	No
Num Buckets:        	-1
Bucket Columns:     	[]
Sort Columns:       	[]
Storage Desc Params:
	serialization.format	1
Time taken: 0.212 seconds, Fetched: 54 row(s)                

Метаданные таблицы user_behavior, созданной Flink SQL Cli, будут сохранены в хранилище метаданных Hive.В этой статье используется MySQL. Выполните следующую команду:

SELECT 
    a.tbl_id, -- 表id
    from_unixtime(create_time) AS create_time, -- 创建时间
    a.db_id, -- 数据库id
    b.name AS db_name, -- 数据库名称
    a.tbl_name -- 表名称
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";

Hive元数据库