Установка Pyflink и настройка среды разработки Windows

Flink
Установка Pyflink и настройка среды разработки Windows

Screenshot2021053019231001.jpg

В области распределенных вычислений больших данных наиболее часто используемый Apache Spark уже поддерживает запись на языке Python, иML (машинное обучение) и DM (интеллектуальный анализ данных)Также есть поддержка API, а flink, как вычислительный движок третьего поколения, начинается с версии 1.9.0.Добавлена ​​поддержка Python.(PyFlink), во Flink 1.10 в PyFlink добавлена ​​поддержкаPython UDFs(пользовательская функция) поддержку можно найти вTable API/SQLРегистрация и использование пользовательских функций в Flink 1.11 также поддерживает локальное выполнение заданий PyFlink в Windows, поэтому вы можете разрабатывать и отлаживать задания PyFlink в Windows.

Установка Пифлинка

Установка Pyflink очень проста

  • Сначала посмотрите на версию Python вашей системы (для PyFlink требуется версия Python (3.6, 3.7 или 3.8))
$ python3 --version
# the version printed here must be 3.6, 3.7 or 3.8
  • Конфигурация среды Поскольку система может содержать несколько версий Python, она также содержит несколько двоичных исполняемых файлов Python. запустите следующееlsкоманда, чтобы узнать, какие бинарные исполняемые файлы Python доступны в системе:
$ ls /usr/bin/python*
  • выберите мягкую ссылкуpythonуказать на свойpython3устный переводчик
ln -s /usr/bin/python3 python
  • Установить Пифлинк Так как pyflink все еще подвергается горячим обновлениям, каждая версия сильно изменилась, поэтому просто устанавливайте последнюю версию без мозгов (последняя версия apache-flink1.13.2, когда эта статья обновляется)
$ python3 -m pip install apache-flink

Конфигурация среды разработки Windows

Здесь мы выбираем Pycharm IDE для разработки Windows Pyflink.

  • Сначала настройте виртуальную среду Python. путь конфигурацииPyCharm -> Preferences -> Project Interpreter

image.png

Помните, что выбранная версия Python должна быть 3.6, 3.7 или 3.8.

  • Создайте новый проект, выберите среду, которую мы только что настроили, виртуальную среду python.

image.png

  • Установить Пифлинк Войдите в интерфейс терминалаimage.pngСначала посмотрите на версию Pythonimage.png

    Затем установите Pyflinkimage.pngПосле завершения вы можетеsite-packagesнайдено нижеpyflinkкаталог, как показано нижеimage.png

  • Привет мир пример

Создайте новый файл .py и введите следующий код

#!/usr/bin/env python38
#-*- coding:utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

def hello_world():
    """
    从随机Source读取数据,然后直接利用PrintSink输出。
    """
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    source_ddl = """
                    CREATE TABLE random_source (
                        f_sequence INT,
                        f_random INT,
                        f_random_str STRING
                    ) WITH (
                        'connector' = 'datagen',
                        'rows-per-second'='5',
                        'fields.f_sequence.kind'='sequence',
                        'fields.f_sequence.start'='1',
                        'fields.f_sequence.end'='1000',
                        'fields.f_random.min'='1',
                        'fields.f_random.max'='1000',
                        'fields.f_random_str.length'='10'
                    )
                    """

    sink_ddl = """
                  CREATE TABLE print_sink (
                    f_sequence INT,
                    f_random INT,
                    f_random_str STRING 
                ) WITH (
                  'connector' = 'print'
                )
        """

    # 注册source和sink
    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    # 数据提取
    tab = t_env.from_path("random_source")
    # 这里我们暂时先使用 标注了 deprecated 的API, 因为新的异步提交测试有待改进...
    tab.execute_insert("print_sink").wait()
    # 执行作业
    t_env.execute_sql("Flink Hello World")



if __name__ == '__main__':
    hello_world()

Результат выглядит следующим образомimage.png

Добро пожаловать на обучение по обмену

личный блог

домашняя страница csdn