Напишано од: Diptiman Raichaudhuri, Адвокат за развој на персонал во Confluent
Напишано од: Diptiman Raichaudhuri, Адвокат за развој на персонал во Confluent
Оваа статија им покажува на инженерите за податоци како да го користат PyIceberg, лесна и моќна библиотека за Python. PyIceberg го олеснува извршувањето на заеднички задачи за податоци како што се создавање, читање, модифицирање или бришење на податоци во Apache Iceberg, без да е потребен голем кластер.
Движени од сложените деловни барања и потребата да се анализираат поголеми количини на информации, платформите за податоци значително се трансформирале во изминатите неколку години за да им помогнат на бизнисите да извлечат повеќе увид и вредност од нивните разновидни извори на податоци.
For enterprise analytics use cases, the open data lakehouse platform has been at the forefront of this platform evolution. An open data lakehouse enables data teams to create ‘composable’ architectures within their ecosystem. With this pattern, data teams can design data platforms with storage, compute and data governance layers of their choice, to cater to businesses' ever-evolving needs. Open Table Formats (OTF) like Apache Iceberg are the engines driving the adoption of modern open lakehouses.
Архитектура на композитна платформа за податоци
Податочните платформи изградени со Apache Iceberg обично имаат три слоеви:
- Физички датотеки со податоци (обично Parquet, Avro или ORC формати) се чуваат во овој слој.
- Метаподаточниот слој: Наместо да ги сортирате табелите во поединечни директориуми, Iceberg одржува листа на датотеки.
- Iceberg Каталог: Каталогот е централен депозитар кој го олеснува откривањето, креирањето и модификацијата на табелите и обезбедува трансакциска конзистентност на управувањето со Iceberg табелите.
Овој популарен дијаграм од документацијата Apache Iceberg ги илустрира овие слоеви:
Апсолутната егзистенција на јаглехидратите (Извор)
Што е PyIceberg?
PyIceberg им овозможува на аналитичарите и инженерите за податоци да изградат софистицирани платформи за отворени езера на широк спектар на облаци (AWS, Azure и Google Cloud) и на локално складирање. PyIceberg е имплементација на Python за пристап до табелите на Iceberg. Развивачите кои користат PyIceberg можат да користат Pythonic трансформации на податоци без потреба да работат високо ефикасни мотори за барање во Java Virtual Machine (JVM) кластери. PyIceberg користи каталози за да ги вчитаат табелите на Iceberg и да извршуваат операции за читање и пишување. Тој се занимава со аспектите на метаподатоците и табелите на Iceberg, овозможувајќи им
PyIceberg може да работи како самостојна програма или на кластери на Kubernetes за толеранција на грешки. Неговата природна интеграција со протоколите за каталог на Iceberg како што се REST, SQLCatalog или AWS Glue, го прави популарен и лесен избор за прашање на табели на Iceberg без потреба од JVM/py4j кластери.
Производствените имплементации на PyIceberg често интегрираат работни оптоварувања за протокол на податоци како што се:ТабелаFlow, каде што податоците за стриминг се произведуваат како теми на Апаче Кафка и се материјализираат како табели Ицеберг.
Зошто Пјонгјанг?
PyIceberg обезбедува Python-пријателски начин за извршување на операции на јазикот за манипулација на податоци (DML) на табелите на Iceberg. За мали до средни податоци платформи кои работат со 100 гигабајти на податоци - како оние кои се занимаваат со одделни анализи, внатрешно известување или специјализирани алатки - леснотијата за користење честопати е поважна за бизнисите отколку комплексни карактеристики. Ако обемот на податоци (и историски и инкорпорирани) не е огромен, распоредувањето на целосен кластер за извршување на барања на Iceberg може да изгледа преоптоварувачки и претежно. Тоа е затоа што овие мотори за барање (како Spark и Flink) се потпираат на Scala или Java програмски ја
Во следниот дел, да изградиме демо со PyIceberg за да разбереме како функционираат шемите за читање и пишување на Iceberg.
Како да се изгради Data Lakehouse со PyIceberg, PyArrow и DuckDB
Ајде да вежбаме и да изградиме демонстрација на IoT сензорска база на податоци со PyIceberg. За овој пример, ние ќе ги користиме PyIceberg и PyArrow за да ги вметнеме/upert и избришеме податоците на Iceberg и да ги изградиме во Visual Studio Code (VS Code).
Прво, се создава нова виртуелна средина на Python наречена 'pyiceberg_playground', со извршување на следната команда:
$>python -m venv iceberg_playground
Потоа овој директориум - 'iceberg_playground' - 'се отвора во VS Code каде што ќе биде хостиран проектот PyIceberg.
PyIceberg и другите библиотеки потоа се инсталираат во виртуелната средина со извршување на следните две команди:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
За овој пример, PyIceberg ќе го користи SqlCatalog, кој ги складира информациите за табелата Iceberg во локална база на податоци SQLite. Iceberg исто така поддржува каталози вклучувајќи REST, Hive и AWS Glue.
Конфигурациската датотека .pyiceberg.yaml е подготвена со следнава содржина, во коренот на проектот:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Забележете како каталогот Iceberg се чува под директориумот pyiceberg_catalog како датотека SQLite и во складиштето на податоци, кое ги чува сите податоци и метаподатоци во директориумот dw1.
И двата директориуми сега се создаваат на ниво на проектот. Овој каталог се вика pyarrowtest.
Потоа, поставувањето на PyIceberg се проверува со користење на следниот скрипт:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Забележете како PyIceberg го чита името на каталогот од датотеката YAML и создава локална SQLite база на податоци во директориумот pyiceberg_catalog.
Ако скриптот се изведува правилно, својствата „pyarrow_catalog“ треба да се прикажат во терминалот.
Скриптот го вчита каталогот од .YAML датотеката, бидејќи променливата на животната средина 'PYICEBEG_HOME' е наведена како корен на проектот.
Следно, се додава шема со користење на класата на шемата на PyIceberg. Бидејќи овој пример ги складира податоците од збир на сензори на IoT, шемата се конструира со три колони и нивните потребни типови на податоци.
Потоа се создава именуван простор заедно со табелата Iceberg со шемата. Именуван простор е логичко групирање на табели во рамките на складот (сетете се како складот веќе е создаден при дефинирањето на датотеката YAML).
Почеточното вчитање на податоците се врши со користење на листата во меморијата на PyArrow, а сензорната табела се чита со методот PyIceberg scan() за да се конвертираат податоците во панда рамка на податоци.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(sensor_table.scan().to_pandas())
Ако горенаведениот скрипт е успешно извршен, овој резултат се прикажува во терминалот:
Со успешното завршување на вметнувањето, може да се валидираат трите слоеви на Iceberg:
- Каталог: Тоа е SQLite датотека pyarrow_catalog.db, која ќе биде проверена малку подоцна во овој напис.
- Метаподатоци: Во директориумот „метаподатоци“ се создаваат метаподатоците, кои се од суштинско значење за овозможување на операциите за креирање, читање, ажурирање, бришење (CRUD). се создаваат две метаподатоци JSON датотеки, едната додека е создадена табелата, а другата по првото вметнување на податоците.
- Датотеки: Датотеките се пишуваат во .PARQUET формат, со device_id како клуч за партиција. Бидејќи постојат три различни уреди, се создаваат три директориуми. Табелата „sensor_data“ на Iceberg се создава со имениот простор „sensor_ns.db“ во складот „dw1“. Овие податоци се создаваат во директориумот „data“ на табелата „sensor_data“.
PyIceberg expressions can be used to filter records. Some of the common expressions used for filtering are: StartsWith, EqualTo, GreaterThan, And, Or, etc.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("\nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
Следниот примерок на код го ажурира постоечкиот device_make за еден од сензорите од „Siemens“ до „Kepware“.
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("\nUpsert operation completed")
print(sensor_table.scan().to_pandas())
На сличен начин, операцијата DELETE исто така е поддржана:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("\n After Delete")
print(sensor_table.scan().to_pandas())
Вреди да се спомене дека бришењето во секој магацин е нијансирана операција, а Iceberg не е исклучок. Операциите на ниво на ред во Iceberg се дефинирани со две стратегии: Copy-on-Write (CoW) и Merge-on-Read (MoR).
PyIceberg во моментов поддржува MOR бришења, но со некои нијанси. Додека PyIceberg нуди способност за бришење на редови, првенствено го имплементира ова со користење на CoW бришења по претпоставка, што значи дека датотеките со податоци се препишуваат наместо да се создадат избришани датотеки.
Како последен чекор, да го користиме DuckDB за да го прашаме каталогот SQLite Iceberg, кој се чува во датотеката pyarrow_catalog.db. Следната команда се изведува на терминалот VS Code:
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
Ова ќе отвори прозорец на прелистувачот на порта 4213 (подрачливо), каде што може да се изврши SQL прашање на каталогот Iceberg, како што е прикажано:
Ова обезбедува лесен и едноставен начин за извлекување на увид од SQL каталогот
Отклучување на податоци со PyIceberg
За компании со волумени на податоци кои се помали од терабајти, PyIceberg и PyArrow се брзи опции за извршување на интерактивни прашања во Python.
Инженерите за податоци можат да започнат соДокументација на PyIcebergкој се одржува и одржува до ден-денес.ОгнотСтраницата има одлични примери на сите PyIceberg API.
Среќно кодирање