4,230 чытанні
4,230 чытанні

Дадатковы інжынер Руководство для PyIceberg

па Confluent2025/06/20
Read on Terminal Reader

Занадта доўга; Чытаць

Гэты кіраўнік праводзіць інжынеры дадзеных, выкарыстоўваючы PyIceberg, бібліятэку Python для кіравання табліцамі Apache Iceberg без вялікіх кластараў JVM. Ён ахоплівае ўстаноўку, стварэння схемы, CRUD-аперацыі і запыт з DuckDB. Ідэальны для камандаў, якія працуюць з малымі да сярэдніх дадзеных, PyIceberg рэгулюе адкрытыя працоўныя працэсы дадзеных, выкарыстоўваючы інструменты, такія як PyArrow і DuckDB.
featured image - Дадатковы інжынер Руководство для PyIceberg
Confluent HackerNoon profile picture
0-item

Напісанне: Diptiman Raichaudhuri, Advocate Developer на Confluent

Напісанне: Diptiman Raichaudhuri, Advocate Developer на Confluent


Гэты артыкул паказвае інжынерам дадзеных, як выкарыстоўваць PyIceberg, лёгкую і магутную бібліятэку Python. PyIceberg дазваляе лёгка выканаць агульныя задачы дадзеных, такія як стварэння, чытання, мадыфікацыя або выдаленне дадзеных у Apache Iceberg, без патрабавання вялікага кластера.


Фактычна, прысвечаныя такому спорту як бокс, онлайн гульні заўсёды прызнаваліся нашмат больш цікавымі чым звычайныя аднакарыстальніцкія цацкі.


Для рэгістрацыі даменнага імя ў гэтай зоне неабходна звярнуцца ў кампанію The Electronic and Postal Communications Authority[1] (Албанія), якая з'яўляецца адзіным аўтарызаваным рэгістратарам даменаў у зоне al.



Composable Data Platform Architecture


Composable Data Platform архітэктура


Дадатковыя платформы, пабудаваныя з Apache Iceberg, звычайна маюць тры пласты:


  1. Фізічныя файлы дадзеных (звычайна форматы Parquet, Avro або ORC) захоўваюцца ў гэтым пласце.
  2. Метаданныя пласты: Замест размяшчэння табліц у асобныя каталогі, Iceberg падтрымлівае спіс файлаў.
  3. Iceberg каталог: каталог з'яўляецца цэнтральным рэпазітарам, які дазваляе адкрыць, стварыць і змяняць табліцы і забяспечвае транзакцыйную поспешнасць кіравання Iceberg табліцамі.


Гэтая папулярная схема з дакументацыі Apache Iceberg ілюструе гэтыя пласты:


 

Назіральнік: Авіямеханік 1-га класа Персі Бонэр (Крыніца)



Што такое PyIceberg?


PyIceberg аб'ядноўвае аналітыкі і інжынеры дадзеных для будаўніцтва складаных адкрытых платформаў на шырокім асяроддзі хмараў (AWS, Azure, і Google Cloud) і нармальнага захоўвання. PyIceberg з'яўляецца рэалізацыяй Python для доступу да табліц Iceberg. Распрацоўшчыкі, якія выкарыстоўваюць PyIceberg, могуць выкарыстоўваць пераўтварэнні Pythonic дадзеных без неабходнасці запускаць выдатныя рухавікі запытаў у кластарах Java Virtual Machine (JVM). PyIceberg выкарыстоўвае каталогі для загрузкі табліц Iceberg і выканання аперацый read-write-upsert.


PyIceberg можа працаваць як самастойнае праграмнае забеспячэнне або на кластэрах Kubernetes для талерантнасці да няправільнасцяў. Яго натуральная інтэграцыя з каталогамі Iceberg, такімі як REST, SQLCatalog або AWS Glue, робіць яго папулярным і лёгкім выбарам для даследавання Iceberg табліц без патрабавання кластэраў JVM/py4j.


Вытворчыя размяшчэнні PyIceberg часта інтэгравалі працоўныя нагрузкі для праток дадзеных, такія якТабліцаДля рэгістрацыі даменнага імя ў гэтай зоне неабходна звярнуцца ў кампанію The Electronic and Postal Communications Authority[1] (Албанія), якая з'яўляецца адзіным аўтарызаваным рэгістратарам даменаў у зоне al.


Чаму ў Пінску?


PyIceberg забяспечвае Python-прыемны спосаб запускаць аперацыі мовы маніпуляцыі дадзенымі (DML) на табліцах Iceberg. Для малых да сярэдняга размера платформ дадзеных, якія працуюць з 100 гігабайтамі дадзеных — такіх, якія займаюцца аддзелнай аналітыкай, ўнутранымі даследаваннямі або спецыялізаванымі інструментамі — лёгкасць выкарыстання часта важней для прадпрыемстваў, чым складаныя функцыі. Калі аб'ём дадзеных (і гістарычны, і індывідуальны) не велізарны, размяшчэнне поўных кластыроў для запуску запытаў на Iceberg можа здацца надзвычай вялікім і надзвычайным. Гэта таму, што гэтыя анкеты (як Spark і


У наступным раздзеле, давайце будзем будаваць дэма з PyIceberg, каб зразумець, як Iceberg's прачытаць і пісаць мадэлі працуюць.


Як пабудаваць Data Lakehouse з PyIceberg, PyArrow і DuckDB


Для гэтага мы будзем выкарыстоўваць PyIceberg і PyArrow для ўвядзення / ўпрыгожвання і выдалення Iceberg дадзеных і будаўніцтва ў Visual Studio Code (VS Code).


Перш за ўсё, новае віртуальнае асяроддзе Python ствараецца з назвай «pyiceberg_playground», запускаючы наступную каманда:


$>python -m venv iceberg_playground


Затым гэты каталог — «iceberg_playground» — «абвяшчаецца ў VS Code, дзе будзе размешчаны праект PyIceberg.



PyIceberg і іншыя бібліятэкі затым усталяваны ў віртуальнай асяроддзі, запускаючы наступныя дзве каманды:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


Для гэтага прыкладу PyIceberg будзе выкарыстоўваць SqlCatalog, які захоўвае інфармацыю табліцы Iceberg у локальнай базе баз SQLite. Iceberg таксама падтрымлівае каталогі, у тым ліку REST, Hive і AWS Glue.


Файл канфігурацыі .pyiceberg.yaml падрыхтоўваецца з наступным зместам, у коране праекта:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Памятаеце, як каталог Iceberg захоўваецца пад каталогам pyiceberg_catalog як файл SQLite і ў дадзеных, якія захоўваюць усе дадзеныя і метаданныя ў каталогу dw1.


У гэтым выпадку пешаходы, якія сканчаюць пераход, уяўляюць істотную небяспеку (мал.


Наступным чынам, усталяванне PyIceberg працэдуруецца з дапамогай наступнага сцэна:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Памятаеце, як PyIceberg чытае назву каталога з файла YAML і стварае локальную базу SQLite ў каталогу pyiceberg_catalog.



Калі сцэнар запускаецца правільна, у термінале паказваюцца ўласцівасці «pyarrow_catalog».



Скрипт загрузіў каталог з файла .YAML, бо прамысловасць «PYICEBEG_HOME» была зададзена ў якасці корана праекта.


Акрамя таго, для некаторых элементаў у экспазіцыі вядома нават імя майстра, які іх вырабіў – гэта знакаміты нямецкі даспешнік Кольман Хельмшміт, які выконваў заказы для каралеўскіх дамоў і найбуйнейшых магнатаў Еўропы.


Імпрасце — гэта логічная групоўка табліц у складзе (памятайце, як склад ужо створаны пры вызначэнні файла YAML).


Першапачатковая загрузка дадзеных робяцца з выкарыстаннем спісу PyArrow in-memory, а sensor_table прачытаецца з метадам PyIceberg scan() для пераўтварэння дадзеных у пэндзэму дадзеных.


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Калі вышэйшы сцэнар запушчаны паспяхова, гэты вынік паказваецца ў термінале:



Калі ўкладка паспяхова завершана, можна валідаваць тры пласты Iceberg:


  1. Каталог: Гэта SQLite файл pyarrow_catalog.db, які будзе праверыцца трохі пазней у гэтай артыкуле.
  2. Метаданныя: У каталогу «метаданныя» створаны метаданныя файлы, якія важныя для ўдасканалення аперацый Create, Read, Update, Delete (CRUD). Два метаданныя JSON файлы створаны, адзін у той час, калі была створана табліца, а другі пасля першага ўстаўкі дадзеных.
  3. Дадзеныя: Файлы дадзеных напісаны ў фармаце .PARQUET, з device_id як клавіш-парцію. Паколькі ёсць тры розныя прылады, створаны тры каталогі. Табліца Iceberg «sensor_data» створана з намесным прасторам «sensor_ns.db» у складзе «dw1». Гэтыя поля дадзеных створаны ў каталогу «data» табліцы «sensor_data».



Выразы PyIceberg могуць быць выкарыстаны для фільтравання запісаў. Некаторыя з агульных выраз, якія выкарыстоўваюцца для фільтравання, з'яўляюцца: StartsWith, EqualTo, GreaterThan, And, Or, і г.д.


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


PyIceberg таксама падтрымлівае аперацыі UPSERT. Наступны шаблон кода абнаўляе існуючы device_make для аднаго з датнікаў ад «Siemens» да «Kepware».


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


У такім выпадку функцыя DELETE таксама падтрымліваецца:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


Варта адзначыць, што вылучэнне ў любым складзе з'яўляецца нюансавай аперацыяй, і Iceberg не выключэнне. Аперацыі на узроўні роўлі ў Iceberg вызначаюцца двума стратэгіямі: Copy-on-Write (CoW) і Merge-on-Read (MoR).


PyIceberg ў цяперашні час падтрымлівае MOR вылучэнняў, але з некаторых нюансаў. Хоць PyIceberg прапануе магчымасць вылучэнне рэйсаў, ён у першую чаргу ўжывае гэта з выкарыстаннем CoW вылучэнняў па параўнанні, што азначае, што файлы дадзеных перапісаны, а не стварыць выдаленыя файлы.


Як апошні крок, давайце выкарыстоўвайце DuckDB, каб запытаць каталог SQLite Iceberg, захоўваны ў файле pyarrow_catalog.db.


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


Гэта адкрые вокно браўзэра ў порту 4213 (падраздзяленне), дзе можна запускаць SQL-пытанне на каталогу Iceberg, як паказана:



Гэта дае лёгкі і просты спосаб вывучэння ўгляду з каталога SQL


Разблокаванне дадзеных з PyIceberg


Для кампаній з аб'ёмамі дадзеных, якія меншыя за тэрабайт, PyIceberg і PyArrow з'яўляюцца хуткай варыянтамі для запуску інтэрактыўных запытаў у Python.


Аналіз мёду можна зрабіць самомуПапярэдні Тэкст ДокументацыяПра тое, што ліст з Ліды, кажа толькі вайсковая печатка (мал.ПажарНа сайце ёсць вялікія прыклады усіх PyIceberg API.


Удачы ў кодэкс!

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks