Escrito por: Diptiman Raichaudhuri, Abogado de Desarrollo de Personal en Confluent
Escrito por: Diptiman Raichaudhuri, Abogado de Desarrollo de Personal en Confluent
Este artículo muestra a los ingenieros de datos cómo usar PyIceberg, una biblioteca Python ligera y potente. PyIceberg hace que sea más fácil realizar tareas de datos comunes como crear, leer, modificar o eliminar datos en Apache Iceberg, sin necesidad de un gran clúster.
Driven por las complejas demandas de los negocios y la necesidad de analizar volúmenes más grandes de información, las plataformas de datos se han transformado significativamente en los últimos años para ayudar a las empresas a extraer más insights y valor de sus diversas fuentes de datos.
Para los casos de uso de análisis empresarial, la plataforma de lagos de datos abiertos ha estado en la vanguardia de esta evolución de la plataforma. Un lago de datos abiertos permite a los equipos de datos crear arquitecturas «compostables» dentro de su ecosistema. Con este patrón, los equipos de datos pueden diseñar plataformas de datos con capas de almacenamiento, computación y gobernanza de datos de su elección, para satisfacer las necesidades en constante evolución de las empresas. Formatos de tabla abierta (OTF) como Apache Iceberg son los motores que impulsan la adopción de lagos abiertos modernos.
Arquitectura de Plataforma de Datos Composables
Las plataformas de datos construidas con Apache Iceberg normalmente tienen tres capas:
- La capa de datos: Los archivos de datos físicos (normalmente formatos Parquet, Avro o ORC) se almacenan en esta capa.
- La capa de metadatos: En lugar de ordenar las tablas en directorios individuales, Iceberg mantiene una lista de archivos.
- Catálogo de Iceberg: El catálogo es el repositorio central que facilita el descubrimiento, creación y modificación de tablas y asegura la coherencia transaccional de la gestión de tablas de Iceberg.
Este popular diagrama de la documentación de Apache Iceberg ilustra estas capas:
Página web de la Fundación Iceberg (Source)
¿Qué es PyIceberg?
PyIceberg empodera a los ingenieros de análisis y datos para construir sofisticadas plataformas de lago abierto en una amplia variedad de nubes (AWS, Azure y Google Cloud) y almacenamiento on-premise. PyIceberg es una implementación de Python para acceder a las tablas de Iceberg. Los desarrolladores que utilizan PyIceberg pueden utilizar transformaciones de datos Pythonic sin necesidad de ejecutar motores de consulta de alto rendimiento en los clústeres de Java Virtual Machine (JVM). PyIceberg utiliza catálogos para cargar tablas de Iceberg y realizar operaciones de lectura-escritura-superación. Trata los aspectos de formato de metadatos y tablas de Iceberg, permitiendo a los ingenieros de datos utilizar motores de
PyIceberg puede ejecutarse como un programa independiente o en los clústeres de Kubernetes para la tolerancia de errores. Su integración nativa con protocolos de catálogo de Iceberg como REST, SQLCatalog o AWS Glue, lo hace una opción popular y fácil para la consulta de tablas de Iceberg sin la necesidad de clústeres de JVM/py4j.
Las implementaciones de producción de PyIceberg a menudo integran cargas de trabajo de transmisión de datos comoTabla de flujo, donde los datos de streaming se producen como temas de Apache Kafka y se materializan como tablas de iceberg. Esta es una poderosa herramienta para cerrar la brecha entre los sistemas de datos operativos y los sistemas de datos analíticos.
¿Por qué PyIceberg?
PyIceberg proporciona una forma amigable a Python para ejecutar operaciones de lenguaje de manipulación de datos (DML) en las tablas de Iceberg. Para plataformas de datos de tamaño pequeño a mediano que trabajan con 100 gigabytes de datos, como las que manejan análisis departamentales, informes internos o herramientas especializadas, la facilidad de uso a menudo es más importante para las empresas que las características complejas. Si el volumen de datos (tanto histórico como incremental) no es enorme, el despliegue de un clúster lleno para ejecutar consultas en Iceberg puede parecer abrumador y overkill. Esto es porque estos motores de consultas (como Spark y Flink) dependen de lenguajes de programación Scala o Java que funcionan en Java Virtual Machine (JVM)
En la siguiente sección, vamos a construir una demostración con PyIceberg para comprender cómo funcionan los patrones de lectura y escritura de Iceberg.
Cómo construir un lago de datos con PyIceberg, PyArrow y DuckDB
Vamos a practicar y construir una demostración de un lago de datos de sensores de IoT con PyIceberg. Para este ejemplo, usaremos PyIceberg y PyArrow para insertar/upertar y borrar datos de Iceberg y construir en Visual Studio Code (VS Code).
Primero, se crea un nuevo entorno virtual de Python llamado 'pyiceberg_playground', ejecutando el siguiente comando:
$>python -m venv iceberg_playground
Entonces este directorio - 'iceberg_playground' - 'se abre en VS Code donde se alojaría el proyecto PyIceberg.
PyIceberg y otras bibliotecas se instalan en el entorno virtual ejecutando los siguientes dos comandos:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
Para este ejemplo, PyIceberg usaría SqlCatalog, que almacena la información de la tabla de Iceberg en una base de datos local SQLite. Iceberg también admite catálogos como REST, Hive y AWS Glue.
Se prepara un archivo de configuración .pyiceberg.yaml con el siguiente contenido, en la raíz del proyecto:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Observe cómo el catálogo Iceberg se almacena bajo el directorio pyiceberg_catalog como un archivo SQLite y en el almacén de datos, que almacena todos los datos y metadatos en el directorio dw1.
Ambos directorios se crean ahora dentro del nivel raíz del proyecto. Este catálogo se llama pyarrowtest.
Luego, la configuración de PyIceberg se verifica utilizando el siguiente script:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Observe cómo PyIceberg lee el nombre del catálogo del archivo YAML y crea una base de datos SQLite local en el directorio pyiceberg_catalog.
Si el script se ejecuta correctamente, las propiedades 'pyarrow_catalog' deben aparecer en el terminal.
El script cargó el catálogo del archivo .YAML, ya que la variable de entorno 'PYICEBEG_HOME' ha sido especificada como la raíz del proyecto.
A continuación, se añade un esquema utilizando la clase de esquema de PyIceberg. Dado que este ejemplo almacena datos de un conjunto de sensores de IoT, el esquema se construye con tres columnas y sus tipos de datos requeridos.
A continuación, se crea un espacio de nombres junto con la tabla Iceberg con el esquema. Un espacio de nombres es un agrupamiento lógico de tablas dentro de un almacén (recordemos cómo se crea un almacén ya al definir el archivo YAML).
La carga inicial de datos se realiza utilizando una lista de memoria de PyArrow, y la sensor_table se lee con el método PyIceberg scan() para convertir los datos en un marco de datos de pandas.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(sensor_table.scan().to_pandas())
Si el script anterior se ejecuta con éxito, este resultado se muestra en el terminal:
Con la inserción concluida con éxito, se pueden validar las tres capas de Iceberg:
- Catálogo: Es el archivo SQLite pyarrow_catalog.db, que se verificará un poco más tarde en este artículo.
- Metadatos: En el directorio de ‘metadatos’, se crean archivos de metadatos, que son cruciales para permitir las operaciones de Crear, Leer, Actualizar, Eliminar (CRUD). Se crean dos archivos JSON de metadatos, uno mientras se creó la tabla, y el otro después de la primera inserción de datos. El archivo ‘snap-*.avro’ es la lista manifiesta, y el archivo manifiesto es el otro archivo .avro.
- Datos: Los archivos de datos se escriben en el formato .PARQUET, con device_id como clave de partición. Dado que existen tres dispositivos distintos, se crean tres directorios. La tabla Iceberg ‘sensor_data’ se crea con el espacio de nombres ‘sensor_ns.db’ en el almacén ‘dw1’. Estos campos de datos se crean en el directorio ‘data’ de la tabla ‘sensor_data’.
Las expresiones PyIceberg se pueden utilizar para filtrar registros.Algunas de las expresiones comunes utilizadas para filtrar son: StartsWith, EqualTo, GreaterThan, And, Or, etc.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("\nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
PyIceberg también soporta las operaciones de UPSERT.La muestra de código siguiente actualiza el dispositivo_make existente para uno de los sensores de “Siemens” a “Kepware”.
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("\nUpsert operation completed")
print(sensor_table.scan().to_pandas())
De manera similar, la operación DELETE también se apoya:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("\n After Delete")
print(sensor_table.scan().to_pandas())
Vale la pena mencionar que la eliminación en cualquier almacén es una operación matizada, y Iceberg no es una excepción. las operaciones de nivel de rango en Iceberg se definen por dos estrategias: Copy-on-Write (CoW) y Merge-on-Read (MoR).
PyIceberg actualmente admite la eliminación de MOR, pero con algunos matices. Mientras PyIceberg ofrece la capacidad de borrar líneas, primariamente lo implementa usando la eliminación de CoW por defecto, lo que significa que los archivos de datos se reescriben en lugar de crear archivos eliminados. Sin embargo, hay trabajos en curso para mejorar el MoR de PyIceberg para soportar la eliminación de igualdad y hacer que sea más eficiente para actualizaciones frecuentes y pequeñas.
Como último paso, usemos DuckDB para consultar el catálogo SQLite Iceberg, almacenado en el archivo pyarrow_catalog.db. El siguiente comando se ejecuta en el terminal VS Code:
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
Esto abrirá una ventana del navegador en el puerto 4213 (default), donde se podría ejecutar una consulta SQL en el catálogo de Iceberg, como se muestra:
Esto proporciona una forma fácil y sencilla de extraer insights del catálogo SQL
Desbloqueo de datos con PyIceberg
Para empresas con volúmenes de datos que son menores que terabytes, PyIceberg y PyArrow son opciones rápidas para ejecutar consultas interactivas en Python.
Los ingenieros de datos pueden comenzarDocumentación de PyIcebergque se mantiene y mantiene hasta el día de hoy.FuegoLa página tiene excelentes ejemplos de todas las API de PyIceberg.
¡Feliz Codificación!