Kirjoittanut: Diptiman Raichaudhuri, henkilöstön kehittäjä asianajaja Confluentissa
Kirjoittanut: Diptiman Raichaudhuri, henkilöstön kehittäjä asianajaja Confluentissa
PyIceberg helpottaa yleisten tietojen tehtävien suorittamista, kuten tietojen luomista, lukemista, muuttamista tai poistamista Apache Icebergissä, ilman, että tarvitset suurta klusteria.
Monimutkaisten liiketoimintavaatimusten ja tarve analysoida suurempia määriä tietoa johtavat tietopalvelut ovat muuttuneet merkittävästi viime vuosina auttaakseen yrityksiä saamaan enemmän tietoa ja arvoa eri tietolähteistä.
Enterprise Analytics -käyttötapauksissa avoimen datalahden alusta on ollut tämän alustan kehityksen eturintamassa. Avoimen datalahden avulla tietoryhmät voivat luoda ”kompostoitavia” arkkitehtuuria ekosysteemissään. Tämän mallin avulla tietoryhmät voivat suunnitella datalähteitä valitsemillaan tallennus-, laskenta- ja tietohallinnon kerroksilla vastaamaan yritysten jatkuvasti muuttuviin tarpeisiin. Open Table Formats (OTF) kuten Apache Iceberg ovat moottoreita, jotka ohjaavat nykyaikaisten avoimien järvien käyttöönottoa.
Composable Data Platform -arkkitehtuuri
Apache Icebergin avulla rakennetuilla tietokoneilla on tavallisesti kolme kerrosta:
- Fyysiset tiedostot (yleensä Parquet-, Avro- tai ORC-muodot) tallennetaan tähän kerrokseen.
- Metatietokerros: Sen sijaan, että taulukoita lajitellaan yksittäisiin hakemistoihin, Iceberg ylläpitää tiedostojen luetteloa.
- Icebergin luettelo: Luettelo on keskusvarasto, joka helpottaa taulukoiden löytämistä, luomista ja muuttamista sekä varmistaa Icebergin taulukoiden hallinnan transaktionaalisen johdonmukaisuuden.
Tämä suosittu kaavio Apache Icebergin dokumentaatiosta kuvaa näitä kerroksia:
Jääkiekko Jääkiekko Jääkiekko (Lähteet)
Mikä on PyIceberg?
PyIceberg antaa analyytikoille ja data-insinööreille mahdollisuuden rakentaa kehittyneitä avoimia lakehouse-alustoja monenlaisissa pilvipalveluissa (AWS, Azure ja Google Cloud) ja paikallisessa tallennuksessa. PyIceberg on Python-sovellus Iceberg-taulukoiden käyttämiseen. PyIcebergin käyttäjät voivat käyttää pythonisia data-muunnoksia ilman, että heidän tarvitsee suorittaa korkean suorituskyvyn kyselymoottoreita Java Virtual Machine (JVM) -ryhmissä. PyIceberg käyttää luetteloita ladatakseen Iceberg-taulukoita ja suorittaakseen lukutaitoja. Se käsittelee Icebergin metatietojen ja taulukkoformaatin näkökohtia, jolloin tiet
PyIceberg voi toimia itsenäisenä ohjelmana tai Kubernetes-ryhmissä vian toleranssin varmistamiseksi. Sen natiivinen integrointi Iceberg-luetteloaineistoihin, kuten REST, SQLCatalog tai AWS Glue, tekee siitä suositun ja helpon vaihtoehdon Iceberg-taulukoiden kyselyyn ilman JVM/py4j-ryhmien tarvetta.
PyIcebergin tuotannon käyttöönotot integroivat usein tiedonsiirtotyökuormia, kutenTaulukkoFlow, jossa suoratoistotiedot tuotetaan Apache Kafkan aiheina ja materialisoidaan Iceberg-taulukoina.
Miksi PyIceberg
PyIceberg tarjoaa Python-ystävällisen tavan suorittaa tietojen manipulointikielen (DML) toimintoja Icebergin taulukoissa. Pienille ja keskisuurille tietokantajärjestelmille, jotka työskentelevät 100 gigatavun tietojen kanssa – kuten ne, jotka käsittelevät osastoanalyyseja, sisäistä raportointia tai erikoistuneita työkaluja – helppokäyttöisyys on usein yrityksille tärkeämpää kuin monimutkaiset ominaisuudet. Jos tietomäärä (sekä historiallinen että asteittainen) ei ole valtava, täysimittaisen klusterin käyttöönotto kyselyiden suorittamiseksi Icebergissä voi tuntua ylivoimaiselta ja ylivoimaiselta. Tämä johtuu siitä, että nämä kyselymoottorit (kuten Spark ja Flink) luotta
Seuraavassa osassa rakennetaan demo PyIcebergin kanssa ymmärtääksemme, miten Icebergin lukeminen ja kirjoittaminen toimivat.
Miten rakentaa Data Lakehouse PyIceberg, PyArrow ja DuckDB
Käytämme PyIcebergia ja PyArrowia Iceberg-tietojen lisäämiseen / lisäämiseen ja poistamiseen ja rakentamiseen Visual Studio Codeissa (VS Code).
Ensinnäkin luodaan uusi Python-virtuaaliympäristö nimeltä ‘pyiceberg_playground’, suorittamalla seuraava komento:
$>python -m venv iceberg_playground
Sitten tämä hakemisto - 'iceberg_playground' - 'avataan VS Codeissa, jossa PyIceberg-projekti olisi isännöity.
PyIceberg ja muut kirjastot asennetaan sitten virtuaaliympäristöön suorittamalla seuraavat kaksi komentoa:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
Tässä esimerkissä PyIceberg käyttää SqlCatalogia, joka tallentaa Icebergin taulukon tiedot paikalliseen SQLite-tietokantaan. Iceberg tukee myös luetteloita, kuten REST, Hive ja AWS Glue.
Konfiguraatiotiedosto .pyiceberg.yaml valmistetaan seuraavalla sisällöllä projektin juuressa:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Huomaa, miten Iceberg-luettelo on tallennettu pyiceberg_katalog -kansioon SQLite-tiedostona ja tietovarastoon, joka tallentaa kaikki tiedot ja metatiedot dw1-kansioon.
Molemmat hakemistoista on nyt luotu projektin juuritasolla.Tämä luettelo on nimeltään pyarrowtest.
Seuraavaksi PyIceberg-asennus tarkistetaan käyttämällä seuraavaa skriptiä:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Huomaa, miten PyIceberg lukee luettelon nimen YAML-tiedostosta ja luo paikallisen SQLite-tietokannan pyiceberg_catalog -hakemistoon. Koska SQLite jaetaan Python-asentajien kanssa, sitä ei tarvitse asentaa erikseen.
Jos skripti suoritetaan oikein, ”pyarrow_catalog” -ominaisuudet tulisi näyttää terminaalissa.
Skripti lasi luettelon .YAML-tiedostosta, koska ”PYICEBEG_HOME” -ympäristömuuttuja on määritetty projektin juureksi.
Seuraavaksi lisätään kaavio käyttämällä PyIcebergin kaavio-luokkaa. Koska tämä esimerkki tallentaa tietoja joukosta IoT-antureita, kaavio on rakennettu kolmella sarakkeella ja niiden vaadituilla tietotyypeillä.
Nimitila on looginen ryhmittely taulukoita varastossa (muista, miten varasto on jo luotu YAML-tiedoston määrittelyssä).
Alkuperäinen tietojen lataus tehdään käyttämällä PyArrow in-memory -luetteloa, ja sensor_table luetaan PyIceberg scan() -menetelmällä tietojen muuntamiseksi panda-tietokehykseen.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(sensor_table.scan().to_pandas())
Jos edellä oleva käsikirjoitus suoritetaan onnistuneesti, tämä tulos näkyy terminaalissa:
Kun insertio on onnistuneesti suoritettu, Icebergin kolme kerrosta voidaan validoida:
- Luettelo: Se on SQLite-tiedosto pyarrow_catalog.db, joka tarkistetaan hieman myöhemmin tässä artikkelissa.
- Metadata: Metadata-kansiossa luodaan metadata-tiedostoja, jotka ovat ratkaisevan tärkeitä Create, Read, Update, Delete (CRUD) -toimintojen mahdollistamiseksi. Luodaan kaksi metadata JSON-tiedostoa, toinen taulukon luomisen aikana ja toinen tietojen ensimmäisen lisäämisen jälkeen.
- Tiedot: Tiedostot kirjoitetaan .PARQUET-muodossa ja device_id on osioavain. Koska on olemassa kolme eri laitetta, luodaan kolme hakemistoa. Icebergin taulukko ’sensor_data’ luodaan varastossa ’dw1’ olevalla nimipaikalla ’sensor_ns.db’. Nämä tietokentät luodaan ’data’ -kansiossa ’sensor_data’ -taulukossa.
PyIceberg-ilmauksia voidaan käyttää rekistereiden suodattamiseen. Joitakin suodattamiseen käytettyjä yleisiä ilmauksia ovat: StartsWith, EqualTo, GreaterThan, And, Or, jne.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("\nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
PyIceberg tukee myös UPSERT-toimintoja. Seuraavassa koodinäytteessä päivitetään jo olemassa oleva device_make yhdelle antureista ”Siemens” – ”Kepware”.
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("\nUpsert operation completed")
print(sensor_table.scan().to_pandas())
Samalla tavoin DELETE-toimintoa tuetaan myös:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("\n After Delete")
print(sensor_table.scan().to_pandas())
On syytä mainita, että poistaminen missä tahansa varastossa on vivahteikas toiminta, ja Iceberg ei ole poikkeus. Icebergin rivitason toiminnot määritellään kahdella strategialla: Copy-on-Write (CoW) ja Merge-on-Read (MoR).
PyIceberg tukee tällä hetkellä MOR-poistoja, mutta joitakin vivahteita. Vaikka PyIceberg tarjoaa mahdollisuuden poistaa rivejä, se toteuttaa tämän ensisijaisesti käyttämällä CoW-poistoja oletusarvoisesti, mikä tarkoittaa, että tiedostoja kirjoitetaan uudelleen poistettujen tiedostojen luomisen sijaan.
Viimeisenä askeleena käytämme DuckDB:tä kyselemään SQLite Iceberg -luetteloa, joka on tallennettu pyarrow_catalog.db-tiedostoon.
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
Tämä avaa selaimen ikkunan portissa 4213 ( oletusarvoisesti), jossa SQL-kysely voidaan suorittaa Iceberg-luettelossa, kuten on esitetty:
Tämä tarjoaa helpon ja yksinkertaisen tavan poimia oivalluksia SQL-luettelosta
Data Insightsin avaaminen PyIcebergin avulla
Yrityksille, joiden tietomäärä on pienempi kuin teratavut, PyIceberg ja PyArrow ovat nopeita vaihtoehtoja interaktiivisten pyyntöjen suorittamiseen Pythonissa.
Tietotekniikan insinöörit voivat aloittaaPyIcebergin dokumentaatiojoka on säilytetty ja pidetty ajan tasalla.PaloaSivulla on hyviä esimerkkejä kaikista PyIceberg API: stä.
Hyvää koodausta!