4,230 čtení
4,230 čtení

Průvodce datovým inženýrem do PyIceberg

podle Confluent2025/06/20
Read on Terminal Reader

Příliš dlouho; Číst

Tento průvodce prochází datovými inženýry pomocí PyIceberg, knihovny Python pro správu tabulek Apache Iceberg bez velkých klastrů JVM. Pokrývá nastavení, vytváření schémat, operace CRUD a dotazování s DuckDB. Ideální pro týmy pracující s malými až středně velkými daty, PyIceberg zjednodušuje pracovní postupy otevřeného datového jezera pomocí nástrojů, jako jsou PyArrow a DuckDB.
featured image - Průvodce datovým inženýrem do PyIceberg
Confluent HackerNoon profile picture
0-item

Scénář: Diptiman Raichaudhuri, advokát pro vývoj zaměstnanců v Confluent

Scénář: Diptiman Raichaudhuri, advokát pro vývoj zaměstnanců v Confluent


Tento článek ukazuje datovým inženýrům, jak používat PyIceberg, lehkou a výkonnou knihovnu Python. PyIceberg usnadňuje provádění běžných datových úkolů, jako je vytváření, čtení, úprava nebo smazání dat v Apache Iceberg, aniž byste potřebovali velký klastr.


Díky složitým obchodním požadavkům a potřebě analyzovat větší množství informací se datové platformy v posledních letech výrazně změnily, aby pomohly firmám získat více poznatků a hodnoty z různých zdrojů dat.


Pro případy použití podnikové analytiky byla otevřená datová jezírková platforma v popředí této evoluce platformy. Otevřené datové jezírko umožňuje datovým týmům vytvářet „kompozitní“ architektury v rámci svého ekosystému. S tímto vzorem mohou datové týmy navrhnout datové platformy s úložnými, výpočetními a datovými úrovněmi správy podle svého výběru, aby uspokojily neustále se vyvíjející potřeby podniků. Otevřené tabulkové formáty (OTF) jako Apache Iceberg jsou hnací silou přijetí moderních otevřených jezírkových domů.



Composable Data Platform Architecture


Architektura kompozitní datové platformy


Datové platformy postavené pomocí Apache Iceberg mají obvykle tři vrstvy:


  1. Fyzické datové soubory (obvykle formáty Parquet, Avro nebo ORC) jsou uloženy v této vrstvě.
  2. Metadatová vrstva: Namísto třídění tabulek do jednotlivých adresářů, Iceberg udržuje seznam souborů.
  3. Katalog Iceberg je centrální úložiště, které usnadňuje objevování, vytváření a modifikaci tabulek a zajišťuje transakční konzistenci při správě tabulek Iceberg.


Tento populární diagram z dokumentace Apache Iceberg ilustruje tyto vrstvy:


 

Původní název: Apache Iceberg Layers (Zdroje)



Co je PyIceberg?


PyIceberg umožňuje analytikům a datovým inženýrům vytvářet sofistikované platformy s otevřeným jezerem na široké škále cloudů (AWS, Azure a Google Cloud) a on-premise úložiště. PyIceberg je implementace Python pro přístup k tabulkám Iceberg. Vývojáři používající PyIceberg mohou používat transformace pythonových dat bez nutnosti spouštět vysoce výkonné dotazovací motory v klastrech Java Virtual Machine (JVM). PyIceberg používá katalogy k načítání tabulek Iceberg a provádění operací čtení a psaní. Řeší metadata a tabulkové formáty aspekty Iceberg, což umožňuje datovým inženýrům používat malé, rychlé a vysoce efektivní


Jeho nativní integrace s katalógovými protokoly Iceberg, jako jsou REST, SQLCatalog nebo AWS Glue, z něj činí populární a snadnou volbu pro dotazování tabulek Iceberg bez potřeby klastrů JVM/py4j.


Výrobní nasazení PyIceberg často integrují pracovní zatížení pro streamování dat, jako jsou:TabulkyFlow, kde je streamování dat produkováno jako témata Apache Kafka a materializováno jako tabulky Iceberg.


Proč právě PyCeberg?


PyIceberg poskytuje Python-přátelský způsob, jak spustit operace v jazyce manipulace s daty (DML) na tabulkách Iceberg. Pro malé až středně velké datové platformy, které pracují se 100 gigabajty dat – jako jsou ty, které se zabývají oddělenou analýzou, interním reportováním nebo specializovanými nástroji – je snadné použití pro podniky často důležitější než složité funkce. Pokud objem dat (jak historický, tak inkrementální) není obrovský, nasazení plně vypuštěného klastru pro spuštění dotazů na Iceberg se může zdát ohromující a zabíjející. To je proto, že tyto vyhledávače (jako Spark a Flink) se spoléhají na programovací jazy


V následující sekci vytvořme demo s PyIcebergem, abychom pochopili, jak fungují Icebergovy vzorce čtení a psaní.


Jak vytvořit datový jezírko s PyIceberg, PyArrow a DuckDB


Pro tento příklad použijeme PyIceberg a PyArrow pro vložení / upert a smazání dat Iceberg a budování v Visual Studio Code (VS Code).


Nejprve je vytvořeno nové virtuální prostředí Python s názvem „pyiceberg_playground“, a to spuštěním následujícího příkazu:


$>python -m venv iceberg_playground


Pak se tento adresář - 'iceberg_playground' - 'otevře ve VS Code, kde by byl projekt PyIceberg hostován.



PyIceberg a další knihovny jsou pak instalovány ve virtuálním prostředí spuštěním následujících dvou příkazů:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


Pro tento příklad by PyIceberg použil SqlCatalog, který ukládá informace o tabulce Iceberg v lokální databázi SQLite. Iceberg také podporuje katalogy včetně REST, Hive a AWS Glue.


Konfigurační soubor .pyiceberg.yaml je připraven s následujícím obsahem v kořeni projektu:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Všimněte si, jak je katalog Iceberg uložen pod adresářem pyiceberg_catalog jako soubor SQLite a v datovém skladu, který ukládá veškerá data a metadata v adresáři dw1.


Oba tyto adresáře jsou nyní vytvořeny v rámci úrovně kořenového projektu. Tento katalog se nazývá pyarrowtest.


Následně je nastavení PyIceberg zkontrolováno pomocí následujícího skriptu:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Všimněte si, jak PyIceberg čte název katalogu z souboru YAML a vytváří lokální databázi SQLite v adresáři pyiceberg_catalog.



Pokud je skript spuštěn správně, vlastnosti ‚pyarrow_catalog‘ by měly být zobrazeny v terminálu.



Skript nahrál katalog z souboru .YAML, protože environmentální proměnná ‚PYICEBEG_HOME‘ byla specifikována jako projektový kořen.


Protože tento příklad ukládá data ze sady senzorů IoT, je schéma konstruováno se třemi sloupci a jejich požadovanými datovými typy.


Namespace je logické seskupení tabulek v rámci skladu (pamatujte si, jak je sklad již vytvořen při definici souboru YAML).


Počáteční načítání dat se provádí pomocí seznamu v paměti PyArrow a sensor_table se čte pomocí metody PyIceberg scan() pro konverzi dat do pandového datového rámce.


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Pokud je výše uvedený skript úspěšně spuštěn, tento výsledek se zobrazí v terminálu:



Po úspěšném dokončení vložení lze ověřit tři vrstvy Iceberg:


  1. Katalog: Jedná se o SQLite soubor pyarrow_catalog.db, který bude ověřen trochu později v tomto článku.
  2. Metadata: V rámci adresáře „metadata“ jsou vytvořeny metadatové soubory, které jsou zásadní pro umožnění operací Create, Read, Update, Delete (CRUD). Dva metadatové JSON soubory jsou vytvořeny, jeden během vytváření tabulky a druhý po prvním vložení dat. Soubor „snap-*.avro“ je manifestní seznam a manifestní soubor je druhý .avro soubor.
  3. Data: Datové soubory jsou psány ve formátu .PARQUET, s device_id jako oddílovým klíčem. Vzhledem k tomu, že existují tři odlišná zařízení, jsou vytvořeny tři adresáře. Icebergská tabulka 'sensor_data' je vytvořena s názvovým prostorem 'sensor_ns.db' ve skladu 'dw1'. Tato datová pole jsou vytvořena v adresáři 'data' tabulky 'sensor_data'.



Některé běžné výrazy používané pro filtrování jsou: StartsWith, EqualTo, GreaterThan, And, Or, atd.


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


Následující vzorek kódu aktualizuje stávající device_make pro jeden z senzorů od „Siemens“ po „Kepware“.


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


Podobným způsobem je podporována i operace DELETE:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


Stojí za zmínku, že smazání v jakémkoli skladu je nuanční operace a Iceberg není výjimkou. Operace na úrovni řad v Iceberg jsou definovány dvěma strategiemi: Copy-on-Write (CoW) a Merge-on-Read (MoR).


PyIceberg v současné době podporuje MOR smazání, ale s některými nuance. Zatímco PyIceberg nabízí možnost smazat řádky, to primárně provádí pomocí CoW smazání ve výchozím nastavení, což znamená, že datové soubory jsou přepisovány namísto vytváření smazaných souborů.


Jako poslední krok použijeme DuckDB k vyhledávání katalogu SQLite Iceberg, uloženého v souboru pyarrow_catalog.db.


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


Tím se otevře okno prohlížeče na portu 4213 (default), kde lze spustit dotaz SQL v katalogu Iceberg, jak je znázorněno:



To poskytuje snadný a jednoduchý způsob, jak extrahovat poznatky z katalogu SQL


Odemknutí datových poznatků pomocí PyIceberg


Pro společnosti s datovými objemy menšími než terabyty jsou PyIceberg a PyArrow rychlé možnosti pro spuštění interaktivních dotazů v Pythonu.


Inženýři mohou začít sPyCeberg dokumentacekterý je udržován a udržován až do současnosti.OheňStránka má skvělé příklady všech API PyIceberg.


Šťastný kód!

L O A D I N G
. . . comments & more!

About Author

Confluent HackerNoon profile picture
Confluent@confluent
Confluent is pioneering a fundamentally new category of data infrastructure focused on data in motion.

ZAVĚŠIT ZNAČKY

TENTO ČLÁNEK BYL PŘEDSTAVEN V...

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks