инструмент Python ETL pyetl

Python

pyetl — это платформа ETL, разработанная на чистом python.По сравнению с инструментами ETL, такими как sqoop и datax, pyetl может добавлять функции udf в каждое поле, делая процесс преобразования данных более гибким.По сравнению с профессиональными инструментами ETL, pyetl легче и содержит чистый код python. операция, больше соответствует привычкам разработчиков

Установить

pip3 install pyetl

Пример использования

база данныхСинхронизация данных между таблицами

from pyetl import Task, DatabaseReader, DatabaseWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = DatabaseWriter("sqlite:///db2.sqlite3", table_name="target")
Task(reader, writer).start()

Синхронизация таблицы базы данных с таблицей куста

from pyetl import Task, DatabaseReader, HiveWriter2
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = HiveWriter2("hive://localhost:10000/default", table_name="target")
Task(reader, writer).start()

синхронизация таблиц базы данных

from pyetl import Task, DatabaseReader, ElasticSearchWriter
reader = DatabaseReader("sqlite:///db1.sqlite3", table_name="source")
writer = ElasticSearchWriter(hosts=["localhost"], index_name="tartget")
Task(reader, writer).start()

Исходное имя поля целевой таблицы отличается, вам нужно добавить сопоставление полей Добавить к

# 原始表source包含uuid,full_name字段
reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
# 目标表target包含id,name字段
writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
# columns配置目标表和原始表的字段映射关系
columns = {"id": "uuid", "name": "full_name"}
Task(reader, writer, columns=columns).start()

Отображение полей UDF, проверка правил, стандартизация данных, очистка данных и т. д.

# functions配置字段的udf映射,如下id转字符串,name去除前后空格
functions={"id": str, "name": lambda x: x.strip()}
Task(reader, writer, columns=columns, functions=functions).start()

Наследуйте класс Task для гибкого расширения задач ETL.

import json
from pyetl import Task, DatabaseReader, DatabaseWriter

class NewTask(Task):
  reader = DatabaseReader("sqlite:///db.sqlite3", table_name="source")
  writer = DatabaseWriter("sqlite:///db.sqlite3", table_name="target")
  
  def get_columns(self):
    """通过函数的方式生成字段映射配置,使用更灵活"""
    # 以下示例将数据库中的字段映射配置取出后转字典类型返回
    sql = "select columns from task where name='new_task'"
    columns = self.writer.db.read_one(sql)["columns"]
    return json.loads(columns)
   
  def get_functions(self):
    """通过函数的方式生成字段的udf映射"""
    # 以下示例将每个字段类型都转换为字符串
    return {col: str for col in self.columns}
   
  def apply_function(self, record):
    """数据流中对一整条数据的udf"""
    record["flag"] = int(record["id"]) % 2
    return record

  def before(self):
    """任务开始前要执行的操作, 如初始化任务表,创建目标表等"""
    sql = "create table destination_table(id int, name varchar(100))"
    self.writer.db.execute(sql)
  
  def after(self):
    """任务完成后要执行的操作,如更新任务状态等"""
    sql = "update task set status='done' where name='new_task'"
    self.writer.db.execute(sql)

NewTask().start()

В настоящее время реализованы списки чтения и записи

Reader вводить
DatabaseReader Поддерживает чтение из всех реляционных баз данных
FileReader Чтение структурированных текстовых данных, таких как файлы csv
ExcelReader Чтение листового файла Excel
Writer вводить
DatabaseWriter Поддерживает запись во все реляционные базы данных
ElasticSearchWriter Пакетная запись данных в индекс es
HiveWriter Массовая вставка в таблицу улья
HiveWriter2 Импорт таблицы куста в режиме загрузки данных (рекомендуется)
FileWriter записать данные в текстовый файл

Суммировать

Пока что эта статья об инструменте ETL для Python pyetl представлена ​​здесь.