4,230 ընթերցումներ
4,230 ընթերցումներ

Data Engineer- ը PyIceberg- ի համար

կողմից Confluent2025/06/20
Read on Terminal Reader

Չափազանց երկար; Կարդալ

Այս ուղեցույցում տվյալների ինժեներները օգտագործում են PyIceberg, մի Python գրասենյակ, որը կառավարում է Apache Iceberg տեքստեր, առանց մեծ JVM կլտերներ: Այն ներառում է տեղադրման, schema creation, CRUD գործառույթներ, եւ querying հետ DuckDB. Հիմնական համար թիմերի աշխատելու համար փոքր եւ միջին չափի տվյալների, PyIceberg հեշտացնել open data lakehouse workflows օգտագործելով գործիքներ, ինչպիսիք են PyArrow եւ DuckDB.
featured image - Data Engineer- ը PyIceberg- ի համար
Confluent HackerNoon profile picture
0-item

Նկարագրված է: Diptiman Raichaudhuri, Staff Developer Advocate at Confluent

Written By: Diptiman Raichaudhuri, Staff Developer Advocate at Confluent 


Այս հոդվածը ցույց է տալիս տվյալների ինժեներներին, թե ինչպես օգտագործել PyIceberg- ը, հեշտ եւ հզոր Python գրասենյակ: PyIceberg- ը հեշտությամբ կատարում է սովորական տվյալների գործիքները, ինչպիսիք են ստեղծել, կարդալ, փոխել կամ փրկել տվյալները Apache Iceberg- ում, առանց մեծ բաղադրիչի անհրաժեշտության:


Բարձր բիզնեսի պահանջների եւ ավելի մեծ մանրամասն տեղեկատվության վերլուծման անհրաժեշտության համար, տվյալների պլանտոմսերը մեծապես փոխել են վերջին տարիների ընթացքում, որպեսզի օգնում են ընկերությունները ավելի շատ տեղեկատվություն եւ արժեքը արտադրել իրենց տարբեր տվյալների մանրամասները:


Արդյոք, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս կարող եք ստանալ, թե ինչպե՞ս ստանալ:



Composable Data Platform Architecture


Composable Data Platform- ի կառուցվածքը


Apache Iceberg- ի հետ կառուցված տվյալների պլատֆորմերը սովորաբար ունի երեք մակերեսներ:


  1. Data layer: Physical data files (հարկե Parquet, Avro, կամ ORC ֆայլերը) տեղադրվում են այս layer- ում:
  2. The metadata layer: Մինչեւ սերտիֆիկների սերտիֆիկների համար, Iceberg- ը պահպանում է ֆայլերի ցուցակը: The metadata layer manages table snapshots, schema, and file statistics.
  3. Iceberg Catalog: Կատեգորիան է կենտրոնական բաղադրիչը, որը օգնում է դիզայնի, ստեղծման եւ փոխարինման տեքստի եւ ապահովում է Iceberg տեքստի կառավարման առեւտրային միավորությունը:


Apache Iceberg Documentation- ի այս հայտնի դիզայնը ցույց է տալիս այդ մակարդակները:


 

Հիմնական հոդված՝ «Տեսանյութ» (Հիմնական)



Ի՞նչ է PyIceberg


PyIceberg- ը թույլ է տալիս analytics- ի եւ տվյալների ինժեներների ստեղծել հարմարեցված open lakehouse- ի պլատֆորմերը լայն տարբերակներում (AWS, Azure, եւ Google Cloud) եւ տեղական սարքավորումների վրա: PyIceberg- ը Iceberg- ի տեքստի հասնելի համար Python- ի գործառույթ է: PyIceberg- ի օգտագործման մշակողները կարող են օգտագործել Pythonic data transformations- ը, առանց օգտագործման բարձր արդյունավետության հարցման մեքենաներ Java Virtual Machine- ի (JVM- ի) բաղադրիչների մեջ: PyIceberg- ը օգտագործում է տեքստի բաղադրիչները Iceberg- ի տեքստի բաղադրիչների համար եւ կատարում է read-write-upsert


PyIceberg- ը կարող է աշխատել որպես ինքնաթիռ ծրագրի կամ Kubernetes- ի բաղադրիչների վրա սխալների tolerance- ի համար: Այս բաղադրիչի բաղադրիչը REST, SQLCatalog, կամ AWS Glue- ի նման Iceberg- ի բաղադրիչների բաղադրիչների հետ բաղադրիչ է, ինչպիսիք են Iceberg- ի բաղադրիչների բաղադրիչների բաղադրիչների բաղադրիչների բաղադրիչների բաղադրիչները:


PyIceberg- ի արտադրանքի տեղադրումները հաճախ ներառում են տվյալների պրեմիման աշխատանքային բեռնություններ, ինչպիսիք են:Տեղինակային Flow, որտեղ Streaming տվյալները արտադրվում են որպես Apache Kafka տեքստեր եւ բաղադրված են որպես Iceberg տեքստեր. Սա հզոր գործիք է բաղադրել թափանցման միջեւ օպերացիոն տվյալների համակարգերի եւ analytical տվյալների համակարգերի.


Ինչու է PyIceberg


PyIceberg- ը առաջարկում է Python- ի համար հարմարավետ մեթոդ, որը օգտագործում է Data Manipulation Language (DML) գործառույթները Iceberg- ի տեքստերում: Երկու փոքր եւ միջին մանրամասն տվյալների պլատֆորմերի համար, որոնք աշխատում են 100 gigabytes- ի տվյալների հետ, ինչպիսիք են, որոնք աշխատում են departmental analytics, internal reporting, կամ մասնագիտացված գործիքների հետ, օգտագործման հեշտությունը հաճախ ավելի կարեւոր է բիզնեսի համար, քան միասնական առանձնահատկություններ: Եթե տվյալների քանակը (կամ պատմական եւ կասկածված) չի մեծ, օգտագործելով ամբողջ խոշոր պլատֆորմը Iceberg- ի վրա փորձարկելու համար կարող է դառնալ խոշոր եւ overkill: Սա


Հաջորդ մասում, խնդրում ենք կառուցել Demo- ը PyIceberg- ի հետ, որպեսզի իմանալ, թե ինչպես է Iceberg- ի կարդալ եւ գրել մոդելները աշխատում:


Ինչպես ստեղծել Data Lakehouse- ը PyIceberg, PyArrow եւ DuckDB- ի հետ


Խնդրում ենք գործել եւ կառուցել Iceberg- ի IoT- ի սենսորների տվյալների նավահանգիստը PyIceberg- ի հետ: Այս օրինակում, մենք օգտագործենք PyIceberg- ը եւ PyArrow- ը Iceberg- ի տվյալների տեղադրման / վերահսկման եւ սեղմման համար եւ ստեղծելու համար Visual Studio Code- ում (VS Code).


Առաջին, մի նոր Python թվային միջավայք ստեղծվում է, որը կոչվում է `pyiceberg_playground`:


$>python -m venv iceberg_playground


«Iceberg_playground» -ը բացվում է VS Code- ում, որտեղ PyIceberg- ի նախագծը կունենա: Հաջորդ տեսակը ցույց է տալիս VS Code-ը, որը կունենա clean slate- ում:



PyIceberg- ը եւ այլ գրասենյակները ապա տեղադրվում են virtualized- ում հետեւյալ երկու գրասենյակներում:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


Այս օրինակում PyIceberg- ը օգտագործում է SqlCatalog- ը, որը մատակարարում է Iceberg- ի տեքստային տվյալները տեղական SQLite- ի բազանում: Iceberg- ը նաեւ աջակցում է կատեգորիաները, ինչպիսիք են REST, Hive եւ AWS Glue.


Համակարգչային ֆայլը .pyiceberg.yaml- ը պատրաստվում է հետեւյալ բաղադրիչների հետ, project root- ում:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Տեսեք, թե ինչպես է Iceberg կատեգորիան տեղադրվում է pyiceberg_catalog ֆայլում, ինչպես նաեւ SQLite ֆայլում, եւ տվյալների սարքավորում, որը տեղադրում է բոլոր տվյալները եւ մետաբատոմսերը կատեգորիան dw1- ում:


Երկու այս մետաղադրամները այժմ ստեղծվում են project root level- ում: Այս մետաղադրամը կոչվում է pyarrowtest.


Հաջորդը, PyIceberg տեղադրման ստուգվում է օգտագործելով հետեւյալ գրասենյակ:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Գիտեք, թե ինչպես PyIceberg- ը YAML ֆայլից թարգմանում է թարգմանական անունը եւ ստեղծում է տեղական SQLite բազան pyiceberg_catalog թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգմանական թարգ



Եթե սեղմիչը կատարվում է ճիշտ, «pyarrow_catalog» բաղադրիչները պետք է ցույց են տալիս սեղմիչում:



Սպիտակը բեռնում է կատեգորիանը .YAML ֆայլից, քանի որ «PYICEBEG_HOME» միջավայրի փոխանակը նշված է project root- ում:


Հաջորդը, սմարթը ավելացվում է, օգտագործելով PyIceberg- ի սմարթային դասընթացը: Երբ այս օրինակը տեղադրում է տվյալները մի շարք IoT- ի սմարթների հետ, սմարթը կառուցված է երեք սմարթների եւ իրենց պահանջվող տվյալների տեսակի հետ: Սմարթը device_id- ը տեղադրվել է որպես առաջին սմարթը եւ որպես փաթեթային սմարթը:


Հաջորդը, Iceberg սմարթի հետ մի անունային տարածք ստեղծվում է: A name space is a logical grouping of tables within a warehouse (մոռացեք, թե ինչպես է տեղադրել YAML ֆայլը):


Առաջին տվյալների բեռնելը կատարվում է PyArrow in-memory ցուցակով, եւ sensor_table- ը կարդալ է PyIceberg scan() մեթոդով, որպեսզի տվյալները փոխել են panda dataframe- ում:


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Եթե վերեւում է վերեւում, այս արդյունքը ցույց է տալիս Terminal- ում:



Երբ տեղադրումը հաջողությամբ ավարտվում է, Iceberg- ի երեք մակերեսները կարող են ստուգվել:


  1. Դա SQLite ֆայլը pyarrow_catalog.db, որը պետք է ստուգվել մի փոքր հետո այս հոդվածում.
  2. MetaData: «MetaData» ֆայլերը ստեղծվում են, որոնք կարեւոր են Create, Read, Update, Delete (CRUD) գործառույթների կատարման համար: Երկու MetaData JSON ֆայլերը ստեղծվում են, մեկը այն ժամանակ, երբ թերթը ստեղծվել է, եւ մեկը առաջին տվյալների տեղադրմանից հետո: «Snap-*.avro» ֆայլը պիտակային ցուցակ է, եւ պիտակային ֆայլը պիտակային ֆայլ է.
  3. Տեսակներ: Տեսակները գրվում են .PARQUET ձեւով, device_id- ի համար միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին միասին մի



PyIceberg ցուցանիշները կարող են օգտագործվել գրառումների փաթեթավորման համար: Նրանք օգտագործվում են փաթեթավորման համար: StartsWith, EqualTo, GreaterThan, And, Or, եւ այլն:


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


PyIceberg- ը աջակցում է UPSERT- ի գործառույթները: Հաջորդ կոդը նմուշը `Siemens`- ից `Kepware`- ից `Siemens`- ից `Siemens_Make`- ում կատարվում է:


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


Ավելի լավ է, որ Delete գործառույթը աջակցվում է:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


Արդյոք, ինչպիսիք են այն, որ Iceberg- ում գործառույթները տեղադրվում են երկու ռեժիմով: Copy-on-Write (CoW) եւ Merge-on-Read (MoR) - ում: The Delete operations also create delete files for the MoR strategy.


PyIceberg- ը այժմ աջակցում է MOR սխալները, բայց մի քանի սխալների հետ: Երբ PyIceberg- ը առաջարկում է սխալների սխալման հզորությունը, դա հիմնականում օգտագործում է CoW սխալները, որը նշանակում է, որ տվյալների ֆայլերը վերլուծվում են, այլեւ ստեղծում են սխալված ֆայլերը: Այնուամենայնիվ, աշխատում է PyIceberg- ի MoR- ի բարելավման համար, որը աջակցում է սխալների հզորությունը եւ դա ավելի արդյունավետ է հաճախական, փոքր update- ի համար:


Որպես վերջնական քայլը, օգտագործենք DuckDB- ը SQLite Iceberg կատեգորիանը, որը տեղադրվում է pyarrow_catalog.db ֆայլում:


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


Այսպիսով բացվում է բեռնելը բեռնելը նավահանգիստ 4213 (դիմնական), որտեղ SQL փոստի կարող է կատարվել Iceberg կատեգորիանում, ինչպես ցույց է տալիս:



Դա ապահովում է հեշտ եւ հեշտ մեթոդ է արտադրել տեսանյութերը SQL կատեգորիանից


Data Insights- ի բացառումը PyIceberg- ի հետ


PyIceberg- ը եւ PyArrow- ը պրոֆեսիոնալ հարցերի կատարման արագ տարբերակներ են Python- ում: Այս համատեղի օգտագործումը կարող է մեծապես հեշտացնել ժամանակը, որը պետք է ստեղծել փոքր-սահմանափակ նավահանգիստների համար:


Data Engineers- ը կարող է սկսելPyIceberg գրասենյակԱհա, թե ինչ է անում, եւ ինչ է անում, այնպես էլԱպրիլԱյս էջը ունի բոլոր PyIceberg API-ների մեծ օրինակներ:


Շնորհակալություն Coding!

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks