В области распределенных вычислений больших данных наиболее часто используемый Apache Spark уже поддерживает запись на языке Python, иML (машинное обучение) и DM (интеллектуальный анализ данных)Также есть поддержка API, а flink, как вычислительный движок третьего поколения, начинается с версии 1.9.0.Добавлена поддержка Python.(PyFlink), во Flink 1.10 в PyFlink добавлена поддержкаPython UDFs(пользовательская функция) поддержку можно найти вTable API/SQLРегистрация и использование пользовательских функций в Flink 1.11 также поддерживает локальное выполнение заданий PyFlink в Windows, поэтому вы можете разрабатывать и отлаживать задания PyFlink в Windows.
Установка Пифлинка
Установка Pyflink очень проста
- Сначала посмотрите на версию Python вашей системы (для PyFlink требуется версия Python (3.6, 3.7 или 3.8))
$ python3 --version
# the version printed here must be 3.6, 3.7 or 3.8
- Конфигурация среды
Поскольку система может содержать несколько версий Python, она также содержит несколько двоичных исполняемых файлов Python. запустите следующее
ls
команда, чтобы узнать, какие бинарные исполняемые файлы Python доступны в системе:
$ ls /usr/bin/python*
- выберите мягкую ссылку
python
указать на свойpython3
устный переводчик
ln -s /usr/bin/python3 python
- Установить Пифлинк Так как pyflink все еще подвергается горячим обновлениям, каждая версия сильно изменилась, поэтому просто устанавливайте последнюю версию без мозгов (последняя версия apache-flink1.13.2, когда эта статья обновляется)
$ python3 -m pip install apache-flink
Конфигурация среды разработки Windows
Здесь мы выбираем Pycharm IDE для разработки Windows Pyflink.
- Сначала настройте виртуальную среду Python.
путь конфигурации
PyCharm -> Preferences -> Project Interpreter
Помните, что выбранная версия Python должна быть 3.6, 3.7 или 3.8.
- Создайте новый проект, выберите среду, которую мы только что настроили, виртуальную среду python.
-
Установить Пифлинк Войдите в интерфейс терминала
Сначала посмотрите на версию Python
Затем установите Pyflink
После завершения вы можете
site-packages
найдено нижеpyflink
каталог, как показано ниже -
Привет мир пример
Создайте новый файл .py и введите следующий код
#!/usr/bin/env python38
#-*- coding:utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
def hello_world():
"""
从随机Source读取数据,然后直接利用PrintSink输出。
"""
settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
source_ddl = """
CREATE TABLE random_source (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
)
"""
sink_ddl = """
CREATE TABLE print_sink (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
)
"""
# 注册source和sink
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
# 数据提取
tab = t_env.from_path("random_source")
# 这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
tab.execute_insert("print_sink").wait()
# 执行作业
t_env.execute_sql("Flink Hello World")
if __name__ == '__main__':
hello_world()
Результат выглядит следующим образом