Напісанне: Diptiman Raichaudhuri, Advocate Developer на Confluent
Напісанне: Diptiman Raichaudhuri, Advocate Developer на Confluent
Гэты артыкул паказвае інжынерам дадзеных, як выкарыстоўваць PyIceberg, лёгкую і магутную бібліятэку Python. PyIceberg дазваляе лёгка выканаць агульныя задачы дадзеных, такія як стварэння, чытання, мадыфікацыя або выдаленне дадзеных у Apache Iceberg, без патрабавання вялікага кластера.
Фактычна, прысвечаныя такому спорту як бокс, онлайн гульні заўсёды прызнаваліся нашмат больш цікавымі чым звычайныя аднакарыстальніцкія цацкі.
Для рэгістрацыі даменнага імя ў гэтай зоне неабходна звярнуцца ў кампанію The Electronic and Postal Communications Authority[1] (Албанія), якая з'яўляецца адзіным аўтарызаваным рэгістратарам даменаў у зоне al.
Composable Data Platform архітэктура
Дадатковыя платформы, пабудаваныя з Apache Iceberg, звычайна маюць тры пласты:
- Фізічныя файлы дадзеных (звычайна форматы Parquet, Avro або ORC) захоўваюцца ў гэтым пласце.
- Метаданныя пласты: Замест размяшчэння табліц у асобныя каталогі, Iceberg падтрымлівае спіс файлаў.
- Iceberg каталог: каталог з'яўляецца цэнтральным рэпазітарам, які дазваляе адкрыць, стварыць і змяняць табліцы і забяспечвае транзакцыйную поспешнасць кіравання Iceberg табліцамі.
Гэтая папулярная схема з дакументацыі Apache Iceberg ілюструе гэтыя пласты:
Назіральнік: Авіямеханік 1-га класа Персі Бонэр (Крыніца)
Што такое PyIceberg?
PyIceberg аб'ядноўвае аналітыкі і інжынеры дадзеных для будаўніцтва складаных адкрытых платформаў на шырокім асяроддзі хмараў (AWS, Azure, і Google Cloud) і нармальнага захоўвання. PyIceberg з'яўляецца рэалізацыяй Python для доступу да табліц Iceberg. Распрацоўшчыкі, якія выкарыстоўваюць PyIceberg, могуць выкарыстоўваць пераўтварэнні Pythonic дадзеных без неабходнасці запускаць выдатныя рухавікі запытаў у кластарах Java Virtual Machine (JVM). PyIceberg выкарыстоўвае каталогі для загрузкі табліц Iceberg і выканання аперацый read-write-upsert.
PyIceberg можа працаваць як самастойнае праграмнае забеспячэнне або на кластэрах Kubernetes для талерантнасці да няправільнасцяў. Яго натуральная інтэграцыя з каталогамі Iceberg, такімі як REST, SQLCatalog або AWS Glue, робіць яго папулярным і лёгкім выбарам для даследавання Iceberg табліц без патрабавання кластэраў JVM/py4j.
Вытворчыя размяшчэнні PyIceberg часта інтэгравалі працоўныя нагрузкі для праток дадзеных, такія якТабліцаДля рэгістрацыі даменнага імя ў гэтай зоне неабходна звярнуцца ў кампанію The Electronic and Postal Communications Authority[1] (Албанія), якая з'яўляецца адзіным аўтарызаваным рэгістратарам даменаў у зоне al.
Чаму ў Пінску?
PyIceberg забяспечвае Python-прыемны спосаб запускаць аперацыі мовы маніпуляцыі дадзенымі (DML) на табліцах Iceberg. Для малых да сярэдняга размера платформ дадзеных, якія працуюць з 100 гігабайтамі дадзеных — такіх, якія займаюцца аддзелнай аналітыкай, ўнутранымі даследаваннямі або спецыялізаванымі інструментамі — лёгкасць выкарыстання часта важней для прадпрыемстваў, чым складаныя функцыі. Калі аб'ём дадзеных (і гістарычны, і індывідуальны) не велізарны, размяшчэнне поўных кластыроў для запуску запытаў на Iceberg можа здацца надзвычай вялікім і надзвычайным. Гэта таму, што гэтыя анкеты (як Spark і
У наступным раздзеле, давайце будзем будаваць дэма з PyIceberg, каб зразумець, як Iceberg's прачытаць і пісаць мадэлі працуюць.
Як пабудаваць Data Lakehouse з PyIceberg, PyArrow і DuckDB
Для гэтага мы будзем выкарыстоўваць PyIceberg і PyArrow для ўвядзення / ўпрыгожвання і выдалення Iceberg дадзеных і будаўніцтва ў Visual Studio Code (VS Code).
Перш за ўсё, новае віртуальнае асяроддзе Python ствараецца з назвай «pyiceberg_playground», запускаючы наступную каманда:
$>python -m venv iceberg_playground
Затым гэты каталог — «iceberg_playground» — «абвяшчаецца ў VS Code, дзе будзе размешчаны праект PyIceberg.
PyIceberg і іншыя бібліятэкі затым усталяваны ў віртуальнай асяроддзі, запускаючы наступныя дзве каманды:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
Для гэтага прыкладу PyIceberg будзе выкарыстоўваць SqlCatalog, які захоўвае інфармацыю табліцы Iceberg у локальнай базе баз SQLite. Iceberg таксама падтрымлівае каталогі, у тым ліку REST, Hive і AWS Glue.
Файл канфігурацыі .pyiceberg.yaml падрыхтоўваецца з наступным зместам, у коране праекта:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Памятаеце, як каталог Iceberg захоўваецца пад каталогам pyiceberg_catalog як файл SQLite і ў дадзеных, якія захоўваюць усе дадзеныя і метаданныя ў каталогу dw1.
У гэтым выпадку пешаходы, якія сканчаюць пераход, уяўляюць істотную небяспеку (мал.
Наступным чынам, усталяванне PyIceberg працэдуруецца з дапамогай наступнага сцэна:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Памятаеце, як PyIceberg чытае назву каталога з файла YAML і стварае локальную базу SQLite ў каталогу pyiceberg_catalog.
Калі сцэнар запускаецца правільна, у термінале паказваюцца ўласцівасці «pyarrow_catalog».
Скрипт загрузіў каталог з файла .YAML, бо прамысловасць «PYICEBEG_HOME» была зададзена ў якасці корана праекта.
Акрамя таго, для некаторых элементаў у экспазіцыі вядома нават імя майстра, які іх вырабіў – гэта знакаміты нямецкі даспешнік Кольман Хельмшміт, які выконваў заказы для каралеўскіх дамоў і найбуйнейшых магнатаў Еўропы.
Імпрасце — гэта логічная групоўка табліц у складзе (памятайце, як склад ужо створаны пры вызначэнні файла YAML).
Першапачатковая загрузка дадзеных робяцца з выкарыстаннем спісу PyArrow in-memory, а sensor_table прачытаецца з метадам PyIceberg scan() для пераўтварэння дадзеных у пэндзэму дадзеных.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(sensor_table.scan().to_pandas())
Калі вышэйшы сцэнар запушчаны паспяхова, гэты вынік паказваецца ў термінале:
Калі ўкладка паспяхова завершана, можна валідаваць тры пласты Iceberg:
- Каталог: Гэта SQLite файл pyarrow_catalog.db, які будзе праверыцца трохі пазней у гэтай артыкуле.
- Метаданныя: У каталогу «метаданныя» створаны метаданныя файлы, якія важныя для ўдасканалення аперацый Create, Read, Update, Delete (CRUD). Два метаданныя JSON файлы створаны, адзін у той час, калі была створана табліца, а другі пасля першага ўстаўкі дадзеных.
- Дадзеныя: Файлы дадзеных напісаны ў фармаце .PARQUET, з device_id як клавіш-парцію. Паколькі ёсць тры розныя прылады, створаны тры каталогі. Табліца Iceberg «sensor_data» створана з намесным прасторам «sensor_ns.db» у складзе «dw1». Гэтыя поля дадзеных створаны ў каталогу «data» табліцы «sensor_data».
Выразы PyIceberg могуць быць выкарыстаны для фільтравання запісаў. Некаторыя з агульных выраз, якія выкарыстоўваюцца для фільтравання, з'яўляюцца: StartsWith, EqualTo, GreaterThan, And, Or, і г.д.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("\nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
PyIceberg таксама падтрымлівае аперацыі UPSERT. Наступны шаблон кода абнаўляе існуючы device_make для аднаго з датнікаў ад «Siemens» да «Kepware».
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("\nUpsert operation completed")
print(sensor_table.scan().to_pandas())
У такім выпадку функцыя DELETE таксама падтрымліваецца:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("\n After Delete")
print(sensor_table.scan().to_pandas())
Варта адзначыць, што вылучэнне ў любым складзе з'яўляецца нюансавай аперацыяй, і Iceberg не выключэнне. Аперацыі на узроўні роўлі ў Iceberg вызначаюцца двума стратэгіямі: Copy-on-Write (CoW) і Merge-on-Read (MoR).
PyIceberg ў цяперашні час падтрымлівае MOR вылучэнняў, але з некаторых нюансаў. Хоць PyIceberg прапануе магчымасць вылучэнне рэйсаў, ён у першую чаргу ўжывае гэта з выкарыстаннем CoW вылучэнняў па параўнанні, што азначае, што файлы дадзеных перапісаны, а не стварыць выдаленыя файлы.
Як апошні крок, давайце выкарыстоўвайце DuckDB, каб запытаць каталог SQLite Iceberg, захоўваны ў файле pyarrow_catalog.db.
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
Гэта адкрые вокно браўзэра ў порту 4213 (падраздзяленне), дзе можна запускаць SQL-пытанне на каталогу Iceberg, як паказана:
Гэта дае лёгкі і просты спосаб вывучэння ўгляду з каталога SQL
Разблокаванне дадзеных з PyIceberg
Для кампаній з аб'ёмамі дадзеных, якія меншыя за тэрабайт, PyIceberg і PyArrow з'яўляюцца хуткай варыянтамі для запуску інтэрактыўных запытаў у Python.
Аналіз мёду можна зрабіць самомуПапярэдні Тэкст ДокументацыяПра тое, што ліст з Ліды, кажа толькі вайсковая печатка (мал.ПажарНа сайце ёсць вялікія прыклады усіх PyIceberg API.
Удачы ў кодэкс!