4,230 lukemat
4,230 lukemat

Tietojenkäsittelyteknikon opas PyIcebergille

kirjoittaja Confluent2025/06/20
Read on Terminal Reader

Liian pitkä; Lukea

Tämä opas opastaa data-insinöörejä käyttämällä PyIcebergia, Python-kirjastoa Apache Icebergin taulukoiden hallintaan ilman suuria JVM-klustereita. Se kattaa asennuksen, kaavion luomisen, CRUD-toiminnot ja DuckDB: n kanssa tehtävät kyselyt. Ihanteellinen tiimille, jotka työskentelevät pienten ja keskisuurten tietojen kanssa, PyIceberg yksinkertaistaa avoimen datalahden työnkulkuja käyttämällä työkaluja, kuten PyArrow ja DuckDB.
featured image - Tietojenkäsittelyteknikon opas PyIcebergille
Confluent HackerNoon profile picture
0-item

Kirjoittanut: Diptiman Raichaudhuri, henkilöstön kehittäjä asianajaja Confluentissa

Kirjoittanut: Diptiman Raichaudhuri, henkilöstön kehittäjä asianajaja Confluentissa


PyIceberg helpottaa yleisten tietojen tehtävien suorittamista, kuten tietojen luomista, lukemista, muuttamista tai poistamista Apache Icebergissä, ilman, että tarvitset suurta klusteria.


Monimutkaisten liiketoimintavaatimusten ja tarve analysoida suurempia määriä tietoa johtavat tietopalvelut ovat muuttuneet merkittävästi viime vuosina auttaakseen yrityksiä saamaan enemmän tietoa ja arvoa eri tietolähteistä.


Enterprise Analytics -käyttötapauksissa avoimen datalahden alusta on ollut tämän alustan kehityksen eturintamassa. Avoimen datalahden avulla tietoryhmät voivat luoda ”kompostoitavia” arkkitehtuuria ekosysteemissään. Tämän mallin avulla tietoryhmät voivat suunnitella datalähteitä valitsemillaan tallennus-, laskenta- ja tietohallinnon kerroksilla vastaamaan yritysten jatkuvasti muuttuviin tarpeisiin. Open Table Formats (OTF) kuten Apache Iceberg ovat moottoreita, jotka ohjaavat nykyaikaisten avoimien järvien käyttöönottoa.



Composable Data Platform Architecture


Composable Data Platform -arkkitehtuuri


Apache Icebergin avulla rakennetuilla tietokoneilla on tavallisesti kolme kerrosta:


  1. Fyysiset tiedostot (yleensä Parquet-, Avro- tai ORC-muodot) tallennetaan tähän kerrokseen.
  2. Metatietokerros: Sen sijaan, että taulukoita lajitellaan yksittäisiin hakemistoihin, Iceberg ylläpitää tiedostojen luetteloa.
  3. Icebergin luettelo: Luettelo on keskusvarasto, joka helpottaa taulukoiden löytämistä, luomista ja muuttamista sekä varmistaa Icebergin taulukoiden hallinnan transaktionaalisen johdonmukaisuuden.


Tämä suosittu kaavio Apache Icebergin dokumentaatiosta kuvaa näitä kerroksia:


 

Jääkiekko Jääkiekko Jääkiekko (Lähteet)



Mikä on PyIceberg?


PyIceberg antaa analyytikoille ja data-insinööreille mahdollisuuden rakentaa kehittyneitä avoimia lakehouse-alustoja monenlaisissa pilvipalveluissa (AWS, Azure ja Google Cloud) ja paikallisessa tallennuksessa. PyIceberg on Python-sovellus Iceberg-taulukoiden käyttämiseen. PyIcebergin käyttäjät voivat käyttää pythonisia data-muunnoksia ilman, että heidän tarvitsee suorittaa korkean suorituskyvyn kyselymoottoreita Java Virtual Machine (JVM) -ryhmissä. PyIceberg käyttää luetteloita ladatakseen Iceberg-taulukoita ja suorittaakseen lukutaitoja. Se käsittelee Icebergin metatietojen ja taulukkoformaatin näkökohtia, jolloin tiet


PyIceberg voi toimia itsenäisenä ohjelmana tai Kubernetes-ryhmissä vian toleranssin varmistamiseksi. Sen natiivinen integrointi Iceberg-luetteloaineistoihin, kuten REST, SQLCatalog tai AWS Glue, tekee siitä suositun ja helpon vaihtoehdon Iceberg-taulukoiden kyselyyn ilman JVM/py4j-ryhmien tarvetta.


PyIcebergin tuotannon käyttöönotot integroivat usein tiedonsiirtotyökuormia, kutenTaulukkoFlow, jossa suoratoistotiedot tuotetaan Apache Kafkan aiheina ja materialisoidaan Iceberg-taulukoina.


Miksi PyIceberg


PyIceberg tarjoaa Python-ystävällisen tavan suorittaa tietojen manipulointikielen (DML) toimintoja Icebergin taulukoissa. Pienille ja keskisuurille tietokantajärjestelmille, jotka työskentelevät 100 gigatavun tietojen kanssa – kuten ne, jotka käsittelevät osastoanalyyseja, sisäistä raportointia tai erikoistuneita työkaluja – helppokäyttöisyys on usein yrityksille tärkeämpää kuin monimutkaiset ominaisuudet. Jos tietomäärä (sekä historiallinen että asteittainen) ei ole valtava, täysimittaisen klusterin käyttöönotto kyselyiden suorittamiseksi Icebergissä voi tuntua ylivoimaiselta ja ylivoimaiselta. Tämä johtuu siitä, että nämä kyselymoottorit (kuten Spark ja Flink) luotta


Seuraavassa osassa rakennetaan demo PyIcebergin kanssa ymmärtääksemme, miten Icebergin lukeminen ja kirjoittaminen toimivat.


Miten rakentaa Data Lakehouse PyIceberg, PyArrow ja DuckDB


Käytämme PyIcebergia ja PyArrowia Iceberg-tietojen lisäämiseen / lisäämiseen ja poistamiseen ja rakentamiseen Visual Studio Codeissa (VS Code).


Ensinnäkin luodaan uusi Python-virtuaaliympäristö nimeltä ‘pyiceberg_playground’, suorittamalla seuraava komento:


$>python -m venv iceberg_playground


Sitten tämä hakemisto - 'iceberg_playground' - 'avataan VS Codeissa, jossa PyIceberg-projekti olisi isännöity.



PyIceberg ja muut kirjastot asennetaan sitten virtuaaliympäristöön suorittamalla seuraavat kaksi komentoa:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


Tässä esimerkissä PyIceberg käyttää SqlCatalogia, joka tallentaa Icebergin taulukon tiedot paikalliseen SQLite-tietokantaan. Iceberg tukee myös luetteloita, kuten REST, Hive ja AWS Glue.


Konfiguraatiotiedosto .pyiceberg.yaml valmistetaan seuraavalla sisällöllä projektin juuressa:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Huomaa, miten Iceberg-luettelo on tallennettu pyiceberg_katalog -kansioon SQLite-tiedostona ja tietovarastoon, joka tallentaa kaikki tiedot ja metatiedot dw1-kansioon.


Molemmat hakemistoista on nyt luotu projektin juuritasolla.Tämä luettelo on nimeltään pyarrowtest.


Seuraavaksi PyIceberg-asennus tarkistetaan käyttämällä seuraavaa skriptiä:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Huomaa, miten PyIceberg lukee luettelon nimen YAML-tiedostosta ja luo paikallisen SQLite-tietokannan pyiceberg_catalog -hakemistoon. Koska SQLite jaetaan Python-asentajien kanssa, sitä ei tarvitse asentaa erikseen.



Jos skripti suoritetaan oikein, ”pyarrow_catalog” -ominaisuudet tulisi näyttää terminaalissa.



Skripti lasi luettelon .YAML-tiedostosta, koska ”PYICEBEG_HOME” -ympäristömuuttuja on määritetty projektin juureksi.


Seuraavaksi lisätään kaavio käyttämällä PyIcebergin kaavio-luokkaa. Koska tämä esimerkki tallentaa tietoja joukosta IoT-antureita, kaavio on rakennettu kolmella sarakkeella ja niiden vaadituilla tietotyypeillä.


Nimitila on looginen ryhmittely taulukoita varastossa (muista, miten varasto on jo luotu YAML-tiedoston määrittelyssä).


Alkuperäinen tietojen lataus tehdään käyttämällä PyArrow in-memory -luetteloa, ja sensor_table luetaan PyIceberg scan() -menetelmällä tietojen muuntamiseksi panda-tietokehykseen.


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Jos edellä oleva käsikirjoitus suoritetaan onnistuneesti, tämä tulos näkyy terminaalissa:



Kun insertio on onnistuneesti suoritettu, Icebergin kolme kerrosta voidaan validoida:


  1. Luettelo: Se on SQLite-tiedosto pyarrow_catalog.db, joka tarkistetaan hieman myöhemmin tässä artikkelissa.
  2. Metadata: Metadata-kansiossa luodaan metadata-tiedostoja, jotka ovat ratkaisevan tärkeitä Create, Read, Update, Delete (CRUD) -toimintojen mahdollistamiseksi. Luodaan kaksi metadata JSON-tiedostoa, toinen taulukon luomisen aikana ja toinen tietojen ensimmäisen lisäämisen jälkeen.
  3. Tiedot: Tiedostot kirjoitetaan .PARQUET-muodossa ja device_id on osioavain. Koska on olemassa kolme eri laitetta, luodaan kolme hakemistoa. Icebergin taulukko ’sensor_data’ luodaan varastossa ’dw1’ olevalla nimipaikalla ’sensor_ns.db’. Nämä tietokentät luodaan ’data’ -kansiossa ’sensor_data’ -taulukossa.



PyIceberg-ilmauksia voidaan käyttää rekistereiden suodattamiseen. Joitakin suodattamiseen käytettyjä yleisiä ilmauksia ovat: StartsWith, EqualTo, GreaterThan, And, Or, jne.


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


PyIceberg tukee myös UPSERT-toimintoja. Seuraavassa koodinäytteessä päivitetään jo olemassa oleva device_make yhdelle antureista ”Siemens” – ”Kepware”.


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


Samalla tavoin DELETE-toimintoa tuetaan myös:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


On syytä mainita, että poistaminen missä tahansa varastossa on vivahteikas toiminta, ja Iceberg ei ole poikkeus. Icebergin rivitason toiminnot määritellään kahdella strategialla: Copy-on-Write (CoW) ja Merge-on-Read (MoR).


PyIceberg tukee tällä hetkellä MOR-poistoja, mutta joitakin vivahteita. Vaikka PyIceberg tarjoaa mahdollisuuden poistaa rivejä, se toteuttaa tämän ensisijaisesti käyttämällä CoW-poistoja oletusarvoisesti, mikä tarkoittaa, että tiedostoja kirjoitetaan uudelleen poistettujen tiedostojen luomisen sijaan.


Viimeisenä askeleena käytämme DuckDB:tä kyselemään SQLite Iceberg -luetteloa, joka on tallennettu pyarrow_catalog.db-tiedostoon.


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


Tämä avaa selaimen ikkunan portissa 4213 ( oletusarvoisesti), jossa SQL-kysely voidaan suorittaa Iceberg-luettelossa, kuten on esitetty:



Tämä tarjoaa helpon ja yksinkertaisen tavan poimia oivalluksia SQL-luettelosta


Data Insightsin avaaminen PyIcebergin avulla


Yrityksille, joiden tietomäärä on pienempi kuin teratavut, PyIceberg ja PyArrow ovat nopeita vaihtoehtoja interaktiivisten pyyntöjen suorittamiseen Pythonissa.


Tietotekniikan insinöörit voivat aloittaaPyIcebergin dokumentaatiojoka on säilytetty ja pidetty ajan tasalla.PaloaSivulla on hyviä esimerkkejä kaikista PyIceberg API: stä.


Hyvää koodausta!

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks