4,230 читања
4,230 читања

Водич за инженер за податоци за PyIceberg

од страна на Confluent2025/06/20
Read on Terminal Reader

Премногу долго; Да чита

Овој водич ги води инженерите за податоци преку користење на PyIceberg, Python библиотека за управување со Apache Iceberg табели без големи JVM кластери. Тоа опфаќа поставување, креирање на шема, CRUD операции и прашање со DuckDB. Идеален за тимови кои работат со мали до средни податоци, PyIceberg го поедноставува работниот тек на отворен data lakehouse со користење на алатки како PyArrow и DuckDB.
featured image - Водич за инженер за податоци за PyIceberg
Confluent HackerNoon profile picture
0-item

Напишано од: Diptiman Raichaudhuri, Адвокат за развој на персонал во Confluent

Напишано од: Diptiman Raichaudhuri, Адвокат за развој на персонал во Confluent


Оваа статија им покажува на инженерите за податоци како да го користат PyIceberg, лесна и моќна библиотека за Python. PyIceberg го олеснува извршувањето на заеднички задачи за податоци како што се создавање, читање, модифицирање или бришење на податоци во Apache Iceberg, без да е потребен голем кластер.


Движени од сложените деловни барања и потребата да се анализираат поголеми количини на информации, платформите за податоци значително се трансформирале во изминатите неколку години за да им помогнат на бизнисите да извлечат повеќе увид и вредност од нивните разновидни извори на податоци.


For enterprise analytics use cases, the open data lakehouse platform has been at the forefront of this platform evolution. An open data lakehouse enables data teams to create ‘composable’ architectures within their ecosystem. With this pattern, data teams can design data platforms with storage, compute and data governance layers of their choice, to cater to businesses' ever-evolving needs. Open Table Formats (OTF) like Apache Iceberg are the engines driving the adoption of modern open lakehouses.



Composable Data Platform Architecture


Архитектура на композитна платформа за податоци


Податочните платформи изградени со Apache Iceberg обично имаат три слоеви:


  1. Физички датотеки со податоци (обично Parquet, Avro или ORC формати) се чуваат во овој слој.
  2. Метаподаточниот слој: Наместо да ги сортирате табелите во поединечни директориуми, Iceberg одржува листа на датотеки.
  3. Iceberg Каталог: Каталогот е централен депозитар кој го олеснува откривањето, креирањето и модификацијата на табелите и обезбедува трансакциска конзистентност на управувањето со Iceberg табелите.


Овој популарен дијаграм од документацијата Apache Iceberg ги илустрира овие слоеви:


 

Апсолутната егзистенција на јаглехидратите (Извор)



Што е PyIceberg?


PyIceberg им овозможува на аналитичарите и инженерите за податоци да изградат софистицирани платформи за отворени езера на широк спектар на облаци (AWS, Azure и Google Cloud) и на локално складирање. PyIceberg е имплементација на Python за пристап до табелите на Iceberg. Развивачите кои користат PyIceberg можат да користат Pythonic трансформации на податоци без потреба да работат високо ефикасни мотори за барање во Java Virtual Machine (JVM) кластери. PyIceberg користи каталози за да ги вчитаат табелите на Iceberg и да извршуваат операции за читање и пишување. Тој се занимава со аспектите на метаподатоците и табелите на Iceberg, овозможувајќи им


PyIceberg може да работи како самостојна програма или на кластери на Kubernetes за толеранција на грешки. Неговата природна интеграција со протоколите за каталог на Iceberg како што се REST, SQLCatalog или AWS Glue, го прави популарен и лесен избор за прашање на табели на Iceberg без потреба од JVM/py4j кластери.


Производствените имплементации на PyIceberg често интегрираат работни оптоварувања за протокол на податоци како што се:ТабелаFlow, каде што податоците за стриминг се произведуваат како теми на Апаче Кафка и се материјализираат како табели Ицеберг.


Зошто Пјонгјанг?


PyIceberg обезбедува Python-пријателски начин за извршување на операции на јазикот за манипулација на податоци (DML) на табелите на Iceberg. За мали до средни податоци платформи кои работат со 100 гигабајти на податоци - како оние кои се занимаваат со одделни анализи, внатрешно известување или специјализирани алатки - леснотијата за користење честопати е поважна за бизнисите отколку комплексни карактеристики. Ако обемот на податоци (и историски и инкорпорирани) не е огромен, распоредувањето на целосен кластер за извршување на барања на Iceberg може да изгледа преоптоварувачки и претежно. Тоа е затоа што овие мотори за барање (како Spark и Flink) се потпираат на Scala или Java програмски ја


Во следниот дел, да изградиме демо со PyIceberg за да разбереме како функционираат шемите за читање и пишување на Iceberg.


Како да се изгради Data Lakehouse со PyIceberg, PyArrow и DuckDB


Ајде да вежбаме и да изградиме демонстрација на IoT сензорска база на податоци со PyIceberg. За овој пример, ние ќе ги користиме PyIceberg и PyArrow за да ги вметнеме/upert и избришеме податоците на Iceberg и да ги изградиме во Visual Studio Code (VS Code).


Прво, се создава нова виртуелна средина на Python наречена 'pyiceberg_playground', со извршување на следната команда:


$>python -m venv iceberg_playground


Потоа овој директориум - 'iceberg_playground' - 'се отвора во VS Code каде што ќе биде хостиран проектот PyIceberg.



PyIceberg и другите библиотеки потоа се инсталираат во виртуелната средина со извршување на следните две команди:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


За овој пример, PyIceberg ќе го користи SqlCatalog, кој ги складира информациите за табелата Iceberg во локална база на податоци SQLite. Iceberg исто така поддржува каталози вклучувајќи REST, Hive и AWS Glue.


Конфигурациската датотека .pyiceberg.yaml е подготвена со следнава содржина, во коренот на проектот:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Забележете како каталогот Iceberg се чува под директориумот pyiceberg_catalog како датотека SQLite и во складиштето на податоци, кое ги чува сите податоци и метаподатоци во директориумот dw1.


И двата директориуми сега се создаваат на ниво на проектот. Овој каталог се вика pyarrowtest.


Потоа, поставувањето на PyIceberg се проверува со користење на следниот скрипт:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Забележете како PyIceberg го чита името на каталогот од датотеката YAML и создава локална SQLite база на податоци во директориумот pyiceberg_catalog.



Ако скриптот се изведува правилно, својствата „pyarrow_catalog“ треба да се прикажат во терминалот.



Скриптот го вчита каталогот од .YAML датотеката, бидејќи променливата на животната средина 'PYICEBEG_HOME' е наведена како корен на проектот.


Следно, се додава шема со користење на класата на шемата на PyIceberg. Бидејќи овој пример ги складира податоците од збир на сензори на IoT, шемата се конструира со три колони и нивните потребни типови на податоци.


Потоа се создава именуван простор заедно со табелата Iceberg со шемата. Именуван простор е логичко групирање на табели во рамките на складот (сетете се како складот веќе е создаден при дефинирањето на датотеката YAML).


Почеточното вчитање на податоците се врши со користење на листата во меморијата на PyArrow, а сензорната табела се чита со методот PyIceberg scan() за да се конвертираат податоците во панда рамка на податоци.


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Ако горенаведениот скрипт е успешно извршен, овој резултат се прикажува во терминалот:



Со успешното завршување на вметнувањето, може да се валидираат трите слоеви на Iceberg:


  1. Каталог: Тоа е SQLite датотека pyarrow_catalog.db, која ќе биде проверена малку подоцна во овој напис.
  2. Метаподатоци: Во директориумот „метаподатоци“ се создаваат метаподатоците, кои се од суштинско значење за овозможување на операциите за креирање, читање, ажурирање, бришење (CRUD). се создаваат две метаподатоци JSON датотеки, едната додека е создадена табелата, а другата по првото вметнување на податоците.
  3. Датотеки: Датотеките се пишуваат во .PARQUET формат, со device_id како клуч за партиција. Бидејќи постојат три различни уреди, се создаваат три директориуми. Табелата „sensor_data“ на Iceberg се создава со имениот простор „sensor_ns.db“ во складот „dw1“. Овие податоци се создаваат во директориумот „data“ на табелата „sensor_data“.



PyIceberg expressions can be used to filter records. Some of the common expressions used for filtering are: StartsWith, EqualTo, GreaterThan, And, Or, etc.


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


Следниот примерок на код го ажурира постоечкиот device_make за еден од сензорите од „Siemens“ до „Kepware“.


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


На сличен начин, операцијата DELETE исто така е поддржана:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


Вреди да се спомене дека бришењето во секој магацин е нијансирана операција, а Iceberg не е исклучок. Операциите на ниво на ред во Iceberg се дефинирани со две стратегии: Copy-on-Write (CoW) и Merge-on-Read (MoR).


PyIceberg во моментов поддржува MOR бришења, но со некои нијанси. Додека PyIceberg нуди способност за бришење на редови, првенствено го имплементира ова со користење на CoW бришења по претпоставка, што значи дека датотеките со податоци се препишуваат наместо да се создадат избришани датотеки.


Како последен чекор, да го користиме DuckDB за да го прашаме каталогот SQLite Iceberg, кој се чува во датотеката pyarrow_catalog.db. Следната команда се изведува на терминалот VS Code:


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


Ова ќе отвори прозорец на прелистувачот на порта 4213 (подрачливо), каде што може да се изврши SQL прашање на каталогот Iceberg, како што е прикажано:



Ова обезбедува лесен и едноставен начин за извлекување на увид од SQL каталогот


Отклучување на податоци со PyIceberg


За компании со волумени на податоци кои се помали од терабајти, PyIceberg и PyArrow се брзи опции за извршување на интерактивни прашања во Python.


Инженерите за податоци можат да започнат соДокументација на PyIcebergкој се одржува и одржува до ден-денес.ОгнотСтраницата има одлични примери на сите PyIceberg API.


Среќно кодирање

L O A D I N G
. . . comments & more!

About Author

Confluent HackerNoon profile picture
Confluent@confluent
Confluent is pioneering a fundamentally new category of data infrastructure focused on data in motion.

ВИСЕТЕ ТАГОВИ

ОВОЈ СТАТИЈА БЕШЕ ПРЕТСТАВЕН ВО...

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks