4,230 odczyty
4,230 odczyty

Przewodnik inżyniera danych do PyIceberg

przez Confluent2025/06/20
Read on Terminal Reader

Za długo; Czytać

Ten przewodnik prowadzi inżynierów danych za pomocą PyIceberg, biblioteki Python do zarządzania tabelami Apache Iceberg bez dużych klastrów JVM. Obejmuje konfigurację, tworzenie schematów, operacje CRUD i kwerendowanie z DuckDB. Idealny dla zespołów pracujących z małymi do średnich danych, PyIceberg usprawnia przepływy pracy otwartych jezior danych za pomocą narzędzi, takich jak PyArrow i DuckDB.
featured image - Przewodnik inżyniera danych do PyIceberg
Confluent HackerNoon profile picture
0-item

Autor: Diptiman Raichaudhuri, adwokat ds. rozwoju pracowników w Confluent

Autor: Diptiman Raichaudhuri, adwokat ds. rozwoju pracowników w Confluent


Ten artykuł pokazuje inżynierom danych, jak korzystać z PyIceberg, lekkiej i potężnej biblioteki Python. PyIceberg ułatwia wykonywanie wspólnych zadań danych, takich jak tworzenie, czytanie, modyfikowanie lub usuwanie danych w Apache Iceberg, bez konieczności tworzenia dużego klastra.


Napędzane złożonymi wymaganiami biznesowymi i potrzebą analizy większych ilości informacji, platformy danych zmieniły się znacznie w ciągu ostatnich kilku lat, aby pomóc firmom wyodrębnić więcej wglądu i wartości z ich różnorodnych źródeł danych.


W przypadku zastosowań analitycznych przedsiębiorstw, platforma Open Data Lakehouse stała na czele tej ewolucji platformy. Open Data Lakehouse umożliwia zespołom danych tworzenie „kompostowalnych” architektur w ich ekosystemie. Dzięki temu wzorcowi zespoły danych mogą projektować platformy danych z warstwami przechowywania, obliczeń i zarządzania danymi według własnego wyboru, aby zaspokoić stale zmieniające się potrzeby przedsiębiorstw.



Composable Data Platform Architecture


Architektura platformy danych kompozytowych


Platformy danych zbudowane z Apache Iceberg zazwyczaj mają trzy warstwy:


  1. Warstwa danych: Fizyczne pliki danych (zwykle w formacie Parquet, Avro lub ORC) są przechowywane w tej warstwie.
  2. Warstwa metadanych: Zamiast sortować tabele w poszczególne katalogi, Iceberg utrzymuje listę plików.
  3. Katalog Iceberg jest centralnym repozytorium, które ułatwia odkrywanie, tworzenie i modyfikowanie tabel i zapewnia spójność transakcyjną zarządzania tabelami Iceberg.


Ten popularny diagram z dokumentacji Apache Iceberg ilustruje te warstwy:


 

Pochodzący z Puszczy Górniczej (Źródło)



What is PyIceberg?


PyIceberg umożliwia analitykom i inżynierom danych budowanie zaawansowanych otwartych platform jeziorowych na szerokiej gamie chmur (AWS, Azure i Google Cloud) i magazynowania lokalnego. PyIceberg jest implementacją Python do dostępu do tabel Iceberg. Deweloperzy korzystający z PyIceberg mogą korzystać z transformacji danych Pythonic bez konieczności uruchamiania wysoce wydajnych silników zapytań w klastrach Java Virtual Machine (JVM). PyIceberg używa katalogów do ładowania tabel Iceberg i wykonywania operacji czytania i pisania. Obsługuje metadane i formaty tabel Iceberg, umożliwiając inżynierom danych korzystanie z małych, szybkich i wysoce wydajnych silników danych,


PyIceberg może być uruchamiany jako samodzielny program lub na klastrach Kubernetes dla tolerancji błędów.Naturalna integracja z protokołami katalogu Iceberg, takimi jak REST, SQLCatalog lub AWS Glue, sprawia, że jest to popularny i łatwy wybór do kwerendowania tabel Iceberg bez potrzeby klastrów JVM/py4j.


Rozmieszczenia produkcyjne PyIceberg często integrują obciążenia strumieniowe danych, takie jak:Tabliczka Flow, gdzie strumieniowe dane są produkowane jako tematy Apache Kafka i materializowane jako tabele Iceberg.


Dlaczego PyCeberg?


PyIceberg zapewnia Python-friendly sposób na uruchomienie operacji języka manipulacji danymi (DML) na tabelach Iceberg. Dla małych i średnich platform danych, które pracują z 100 gigabajtów danych – takich jak te, które obsługują analizy departamentalne, wewnętrzne raportowanie lub wyspecjalizowane narzędzia – łatwość użytkowania jest często ważniejsza dla przedsiębiorstw niż złożone funkcje. Jeśli objętość danych (zarówno historyczne, jak i stopniowe) nie jest ogromna, wdrażanie pełnego klastra do uruchamiania zapytań na Iceberg może wydawać się przytłaczające i nadmierne. To dlatego, że te silniki zapytań (takie jak Spark i Flink) polegają na językach programowania Scala lub Java działających na


W następnej sekcji zbudujmy demo z PyIceberg, aby zrozumieć, jak działają wzorce czytania i pisania Iceberg.


Jak zbudować jezioro danych z PyIceberg, PyArrow i DuckDB


W tym przykładzie będziemy używać PyIceberg i PyArrow do wstawiania / usuwania i usuwania danych Iceberg i budowania w Visual Studio Code (VS Code).


Po pierwsze, tworzy się nowe środowisko wirtualne Python o nazwie „pyiceberg_playground”, wykonując następujące polecenie:


$>python -m venv iceberg_playground


Następnie ten katalog – ‘iceberg_playground’ – ‘otwiera się w VS Code, gdzie miałby być hostowany projekt PyIceberg.



PyIceberg i inne biblioteki są następnie instalowane w środowisku wirtualnym, wykonując następujące dwa polecenia:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


W tym przykładzie PyIceberg użyłby SqlCatalog, który przechowuje informacje tabeli Iceberg w lokalnej bazie danych SQLite. Iceberg obsługuje również katalogi, w tym REST, Hive i AWS Glue.


Plik konfiguracyjny .pyiceberg.yaml jest przygotowywany z następującą zawartością w korzeniu projektu:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Zwróć uwagę, jak katalog Iceberg jest przechowywany w katalogu pyiceberg_catalog jako plik SQLite i w magazynie danych, który przechowuje wszystkie dane i metadane w katalogu dw1.


Oba te katalogi są teraz tworzone na poziomie korzenia projektu. Ten katalog nazywa się pyarrowtest.


Następnie konfiguracja PyIceberg jest sprawdzana przy użyciu następującego scenariusza:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Zauważ, jak PyIceberg odczytuje nazwę katalogu z pliku YAML i tworzy lokalną bazę danych SQLite w katalogu pyiceberg_catalog.



Jeśli skrypt jest prawidłowo uruchamiany, właściwości „pyarrow_catalog” powinny być wyświetlane w terminalu.



Skrypt pobrał katalog z pliku .YAML, ponieważ zmienna środowiska ‘PYICEBEG_HOME’ została określona jako korzeń projektu.


Następnie dodaje się schemat za pomocą klasy schematu PyIceberg. Ponieważ ten przykład przechowuje dane z zestawu czujników IoT, schemat jest zbudowany z trzech kolumn i ich wymaganych typów danych.


Przestrzeń nazw jest logiczną grupą tabel w magazynie (pamiętaj, jak magazyn został już utworzony przy definiowaniu pliku YAML).


Początkowe załadowanie danych odbywa się za pomocą listy w pamięci PyArrow, a sensor_table jest odczytywany za pomocą metody PyIceberg scan() w celu przekształcenia danych w ramę danych panda.


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Jeśli powyższy scenariusz zostanie uruchomiony pomyślnie, ten wynik zostanie wyświetlony w terminalu:



With the insertion successfully completed, the three layers of Iceberg can be validated: 


  1. Katalog: Jest to plik SQLite pyarrow_catalog.db, który zostanie zweryfikowany nieco później w tym artykule.
  2. Metadane: W katalogu „metadane” tworzone są pliki metadanych, które są kluczowe dla umożliwienia operacji tworzenia, odczytu, aktualizacji, usuwania (CRUD). tworzone są dwa pliki metadane JSON, jeden podczas tworzenia tabeli, a drugi po pierwszym wstawieniu danych. plik „snap-*.avro” jest manifestową listą, a manifestowy plik jest drugim plikiem .avro.
  3. Dane: Pliki danych są zapisywane w formacie .PARQUET, z device_id jako kluczem partycji. Ponieważ istnieją trzy odrębne urządzenia, tworzone są trzy katalogi. Tabela Iceberg ‘sensor_data’ jest tworzona przy użyciu przestrzeni nazw ‘sensor_ns.db’ w magazynie ‘dw1’. Pole te danych są tworzone w katalogu ‘dane’ tabeli ‘sensor_data’.



Wyrażenia PyIceberg można użyć do filtrowania rekordów.Niektóre z najczęściej używanych wyrazów do filtrowania to: StartsWith, EqualTo, GreaterThan, And, Or, itp.


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


PyIceberg obsługuje również operacje UPSERT.Następujący przykład kodu aktualizuje istniejący device_make dla jednego z czujników od „Siemens” do „Kepware”.


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


W podobny sposób obsługiwana jest również operacja DELETE:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


Warto wspomnieć, że usunięcie w dowolnym magazynie jest operacją niuansowaną, a Iceberg nie jest wyjątkiem. Operacje na poziomie linii w Iceberg są definiowane przez dwie strategie: Copy-on-Write (CoW) i Merge-on-Read (MoR).


PyIceberg obecnie obsługuje usuwania MOR, ale z pewnymi niuansami. Podczas gdy PyIceberg oferuje możliwość usuwania wierszy, wdraża to przede wszystkim za pomocą usuwania CoW domyślnie, co oznacza, że pliki danych są ponownie zapisywane zamiast tworzenia usuniętych plików.


Jako ostatni krok użyj DuckDB, aby wyszukiwać katalog SQLite Iceberg, przechowywany w pliku pyarrow_catalog.db. Następujące polecenie jest uruchamiane na terminalu VS Code:


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


W ten sposób otworzy się okno przeglądarki w porcie 4213 (domyślnie), w którym można uruchomić zapytanie SQL w katalogu Iceberg, jak pokazano:



Zapewnia to łatwy i prosty sposób wyodrębnienia informacji z katalogu SQL


Odblokowywanie danych z PyIceberg


Dla firm o objętości danych mniejszych niż terabajty, PyIceberg i PyArrow są szybkimi opcjami do uruchamiania interaktywnych zapytań w Pythonie.


Inżynierowie mogą zacząć odDokumentacja PyCebergktóre są utrzymywane i utrzymywane na bieżąco.OgieńStrona zawiera świetne przykłady wszystkich API PyIceberg.


Szczęśliwy kod!

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks