Geschreven door: Diptiman Raichaudhuri, Staff Developer Advocate bij Confluent
Geschreven door: Diptiman Raichaudhuri, Staff Developer Advocate bij Confluent
Dit artikel laat data-ingenieurs zien hoe ze PyIceberg kunnen gebruiken, een lichte en krachtige Python-bibliotheek. PyIceberg maakt het gemakkelijker om gemeenschappelijke gegevenstaken uit te voeren, zoals het maken, lezen, wijzigen of verwijderen van gegevens in Apache Iceberg, zonder dat er een grote cluster nodig is.
Geleid door complexe bedrijfseisen en de noodzaak om grotere hoeveelheden informatie te analyseren, zijn dataplatforms de afgelopen jaren aanzienlijk getransformeerd om bedrijven te helpen meer inzichten en waarde te halen uit hun diverse gegevensbronnen.
Voor gebruiksgebieden voor bedrijfsanalyses staat het open data lakehouse-platform voorop in de evolutie van dit platform. Een open data lakehouse stelt data-teams in staat om ‘composable’ architecturen te creëren in hun ecosysteem. Met dit patroon kunnen data-teams data-platforms ontwerpen met opslag-, computing- en data governance-lagen van hun keuze om tegemoet te komen aan de steeds evoluerende behoeften van bedrijven. Open Table Formats (OTF) zoals Apache Iceberg zijn de motoren die de adoptie van moderne open lakehouses stimuleren.
Composable Data Platform Architectuur
Gegevensplatforms gebouwd met Apache Iceberg hebben normaal gesproken drie lagen:
- De gegevenslaag: In deze laag worden fysieke gegevensbestanden (meestal Parquet, Avro of ORC-indelingen) opgeslagen.
- De metagegevenslaag: In plaats van tabellen in afzonderlijke mappen te sorteren, onderhoudt Iceberg een lijst met bestanden.
- Iceberg Catalog: De catalogus is de centrale repository die het ontdekken, creëren en wijzigen van tabellen vergemakkelijkt en zorgt voor transactieconsistentie bij het beheren van Iceberg tabellen.
Dit populaire diagram uit de Apache Iceberg-documentatie illustreert deze lagen:
Iceberg Layers van Apache (bron)
Wat is PyIceberg eigenlijk?
PyIceberg machtigt analytics en data engineers om geavanceerde open lakehouse platforms te bouwen op een breed scala aan clouds (AWS, Azure en Google Cloud) en on-premise opslag. PyIceberg is een Python-implementatie voor toegang tot Iceberg-tabellen. Ontwikkelaars die PyIceberg gebruiken, kunnen Pythonic-gegevenstransformaties gebruiken zonder de noodzaak om hoogwaardige query-motoren in Java Virtual Machine (JVM) clusters uit te voeren. PyIceberg gebruikt catalogi om Iceberg-tabellen te laden en lees-schrijf-upsert-operaties uit te voeren. Het behandelt de metadata- en tabelformaataspecten van Iceberg, waardoor data-ingenieurs kleine, snelle en zeer efficiënte
De native integratie met Iceberg catalogus protocollen zoals REST, SQLCatalog, of AWS Glue, maakt het een populaire en gemakkelijke keuze voor het queryen Iceberg tabellen zonder de behoefte aan JVM/py4j clusters.
Productie-implementaties van PyIceberg integreren vaak data-streamingwerklasten zoalsTabelflow, waar streaminggegevens worden geproduceerd als Apache Kafka-thema's en materialiseren als Iceberg-tabellen.Dit is een krachtig hulpmiddel om de kloof tussen besturingssystemen en analytische datasystemen te overbruggen.
Waarom PyIceberg?
PyIceberg biedt een Python-vriendelijke manier om gegevensmanipulatietaal (DML) operaties op Iceberg tabellen uit te voeren. Voor kleine tot middelgrote gegevensplatforms die werken met 100 gigabytes van gegevens - zoals die met afdelingsanalyse, interne rapportage of gespecialiseerde gereedschappen - gebruiksgemak is vaak belangrijker voor bedrijven dan complexe functies. Als het gegevensvolume (zowel historisch als incrementeel) niet enorm is, kan het implementeren van een full-blown cluster om queries op Iceberg uit te voeren overweldigend en overweldigend lijken. Dat komt omdat deze query engines (zoals Spark en Flink) vertrouwen op Scala of Java-programmeertaal op Java Virtual Machine (JVM) om multithreaded en mul
In de volgende sectie maken we een demo met PyIceberg om te begrijpen hoe Iceberg's lees- en schrijfpatronen werken.
Hoe maak je een Data Lakehouse bouwen met PyIceberg, PyArrow en DuckDB
Laten we oefenen en een demo bouwen van een IoT sensor data lakehouse met PyIceberg. Voor dit voorbeeld zullen we PyIceberg en PyArrow gebruiken om Iceberg-gegevens in te voeren / te uploaden en te verwijderen en te bouwen in Visual Studio Code (VS Code).
Eerst wordt een nieuwe virtuele Python-omgeving gemaakt met de naam 'pyiceberg_playground', door het volgende commando uit te voeren:
$>python -m venv iceberg_playground
Dan wordt deze directory - 'iceberg_playground' - 'geopend in VS Code waar het PyIceberg-project zou worden gehost.
PyIceberg en andere bibliotheken worden vervolgens geïnstalleerd in de virtuele omgeving door de volgende twee opdrachten uit te voeren:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
Voor dit voorbeeld zou PyIceberg SqlCatalog gebruiken, die Iceberg-tabelinformatie opslaat in een lokale SQLite-database.
Een configuratiebestand .pyiceberg.yaml wordt voorbereid met de volgende inhoud, bij de wortel van het project:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Merk op hoe de Iceberg catalogus wordt opgeslagen onder de pyiceberg_catalog directory als een SQLite-bestand en in de data warehouse, die alle gegevens en metagegevens opslaat in de directory dw1.
Beide mappen worden nu op het projectwortelniveau gemaakt.Deze catalogus heet pyarrowtest.
Vervolgens wordt de PyIceberg-installatie gecontroleerd met behulp van het volgende script:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Merk op hoe PyIceberg de catalogusnaam leest uit het YAML-bestand en een lokale SQLite-database maakt in de pyiceberg_catalog directory.
Als het script correct wordt uitgevoerd, moeten de ‘pyarrow_catalog’-eigenschappen in de terminal worden weergegeven.
Het script heeft de catalogus geladen uit het .YAML-bestand, omdat de ‘PYICEBEG_HOME’-omgevingsvariabele is gespecificeerd als de projectwortel.
Vervolgens wordt een schema toegevoegd met behulp van de schema-klasse van PyIceberg. Aangezien dit voorbeeld gegevens van een set IoT-sensoren opslaat, wordt het schema opgebouwd met drie kolommen en hun vereiste gegevenstypen.
Een namespace is een logische groepering van tabellen binnen een warehouse (vergeet niet hoe een warehouse al is gemaakt bij het definiëren van het YAML-bestand).
De eerste gegevenslading wordt gedaan met behulp van een PyArrow in-memory lijst, en de sensor_table wordt gelezen met de PyIceberg scan() methode om de gegevens om te zetten in een pandas dataframe.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(sensor_table.scan().to_pandas())
Als het bovenstaande script succesvol wordt uitgevoerd, wordt dit resultaat weergegeven in de terminal:
Wanneer de insertie met succes is voltooid, kunnen de drie lagen van Iceberg worden gevalideerd:
- Catalogus: het is het SQLite-bestand pyarrow_catalog.db, dat later in dit artikel zal worden gecontroleerd.
- Metadata: In de ‘metadata’ directory worden metadata-bestanden gemaakt, die cruciaal zijn voor het mogelijk maken van Create, Read, Update, Delete (CRUD) operaties. Twee metadata JSON-bestanden worden gemaakt, een terwijl de tabel werd gemaakt, en de andere na de eerste invoeging van gegevens.
- Gegevens: Gegevensbestanden worden geschreven in het .PARQUET-formaat, met device_id als partitie sleutel. Aangezien er drie verschillende apparaten zijn, worden drie mappen gemaakt. De Iceberg-tabel 'sensor_data' wordt gemaakt met de naamruimte 'sensor_ns.db' in de warehouse 'dw1'. Deze gegevensvelden worden gemaakt in de 'data' directory van de tabel 'sensor_data'.
PyIceberg-uitdrukkingen kunnen worden gebruikt om records te filteren.Sommige van de gemeenschappelijke uitdrukkingen die worden gebruikt voor het filteren zijn: StartsWith, EqualTo, GreaterThan, And, Or, etc.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("\nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
PyIceberg ondersteunt ook UPSERT-operaties.De volgende code sample actualiseert de bestaande device_make voor een van de sensoren van ‘Siemens’ naar ‘Kepware’.
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("\nUpsert operation completed")
print(sensor_table.scan().to_pandas())
Op dezelfde manier wordt de DELETE-operatie ook ondersteund:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("\n After Delete")
print(sensor_table.scan().to_pandas())
Het is vermeldenswaard dat het verwijderen in elk magazijn een nuancerende operatie is en Iceberg is geen uitzondering. Row-level operaties in Iceberg worden gedefinieerd door twee strategieën: Copy-on-Write (CoW) en Merge-on-Read (MoR).
PyIceberg ondersteunt momenteel MOR-verwijderingen, maar met enkele nuances. Terwijl PyIceberg de mogelijkheid biedt om rijen te verwijderen, implementeert het dit voornamelijk met behulp van CoW-verwijderingen standaard, wat betekent dat gegevensbestanden worden opnieuw geschreven in plaats van verwijderde bestanden te maken.
Als laatste stap gebruiken we DuckDB om de SQLite Iceberg-catalogus te query, opgeslagen in het pyarrow_catalog.db-bestand.
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
Dit opent een browservenster bij poort 4213 (default), waar een SQL-query op de Iceberg-catalogus kan worden uitgevoerd, zoals weergegeven:
Dit biedt een eenvoudige en eenvoudige manier om inzichten uit de SQL-catalogus te halen.
Data Insights ontgrendelen met PyIceberg
Voor bedrijven met gegevensvolumes die kleiner zijn dan terabytes, zijn PyIceberg en PyArrow snelle opties om interactieve query's in Python uit te voeren.
Data engineers kunnen beginnen metPyIceberg documentatiedie nog steeds wordt gehandhaafd en bijgehouden; ook deVuurpagina heeft geweldige voorbeelden van alle PyIceberg API's.
Gelukkige codering!