Rakstījis: Diptiman Raichaudhuri, personāla izstrādātāja advokāts Confluent
Rakstījis: Diptiman Raichaudhuri, personāla izstrādātāja advokāts Confluent
Šis raksts parāda datu inženieriem, kā izmantot PyIceberg, vieglu un spēcīgu Python bibliotēku. PyIceberg padara to vieglāk veikt kopīgus datu uzdevumus, piemēram, izveidot, lasīt, modificēt vai dzēst datus Apache Iceberg, bez nepieciešamības lielu klasteri.
Pamatojoties uz sarežģītajām uzņēmējdarbības prasībām un nepieciešamību analizēt lielākus informācijas apjomus, datu platformas pēdējos gados ir ievērojami pārveidojušās, lai palīdzētu uzņēmumiem iegūt vairāk ieskatu un vērtību no dažādiem datu avotiem.
Uzņēmējdarbības analītikas lietojumu gadījumos atvērtā datu ezera platforma ir bijusi šīs platformas evolūcijas priekšgalā. Atvērtā datu ezera māja ļauj datu komandām radīt “kompostējamas” arhitektūras savā ekosistēmā. Ar šo modeli datu komandas var izstrādāt datu platformas ar glabāšanas, aprēķina un datu pārvaldības slāņiem pēc izvēles, lai apmierinātu uzņēmuma pastāvīgi mainīgās vajadzības. Atvērtā galda formāti (OTF) kā Apache Iceberg ir dzinēji, kas virza mūsdienu atvērto ezeru māju pieņemšanu.
Kompozitīvās datu platformas arhitektūra
Datu platformām, kas izveidotas ar Apache Iceberg, parasti ir trīs slāņi:
- Datu slānis: Fiziskie datu faili (parasti Parquet, Avro vai ORC formāti) tiek glabāti šajā slānī.
- Metadatu slānis: Tā vietā, lai klasificētu tabulas atsevišķās direktorijās, Iceberg uztur failu sarakstu.
- Iceberg katalogs ir centrālais repozitorijs, kas atvieglo tabulu atklāšanu, izveidi un modifikāciju un nodrošina Iceberg tabulu pārvaldības darījumu konsekvenci.
Šī populārā diagramma no Apache Iceberg dokumentācijas ilustrē šos slāņus:
Lāčplēša diena Lāčplēša diena (avots)
Kas ir PyIceberg?
PyIceberg ļauj analītiķiem un datu inženieriem veidot sarežģītas atvērtās ezeru platformas plašā mākoņu klāstā (AWS, Azure un Google Cloud) un vietējā uzglabāšanā. PyIceberg ir Python īstenošana, lai piekļūtu Iceberg tabulām. Izstrādātāji, kas izmanto PyIceberg, var izmantot Pythonic datu transformācijas bez nepieciešamības palaist augstas veiktspējas vaicājuma dzinējus Java Virtual Machine (JVM) klasteros. PyIceberg izmanto katalogus, lai ielādētu Iceberg tabulas un veiktu lasīšanas-rakstīšanas-upert operācijas. Tā apstrādā Iceberg metadatu un tabulas formāta aspektus, ļaujot datu inženieriem izmantot mazus, ātrus un
PyIceberg var darboties kā atsevišķa programma vai Kubernetes klasteros, lai nodrošinātu kļūdu toleranci.Tā dzimtā integrācija ar Iceberg kataloga protokoliem, piemēram, REST, SQLCatalog vai AWS Glue, padara to par populāru un vieglu izvēli Iceberg tabulu vaicāšanai bez JVM/py4j klasteru nepieciešamības.
PyIceberg ražošanas izvietošana bieži vien integrē datu straumēšanas darba slodzes, piemēram,Tabulas plūsma, kur straumēšanas dati tiek ražoti kā Apache Kafka tēmas un materializēti kā Iceberg tabulas.
Kāpēc PyCeberg?
PyIceberg nodrošina Python draudzīgu veidu, kā darbināt datu manipulācijas valodas (DML) operācijas uz Iceberg tabulām. Mazām un vidēja izmēra datu platformām, kas darbojas ar 100 gigabaitiem datu, piemēram, tām, kas nodarbojas ar departamenta analīzi, iekšējo ziņošanu vai specializētus rīkus, lietošanas ērtums bieži vien ir svarīgāks uzņēmumiem nekā sarežģītas funkcijas. Ja datu apjoms (gan vēsturiski, gan pakāpeniski) nav milzīgs, pilna blūza klāsta izvietošana, lai veiktu vaicājumus uz Iceberg var šķist apgrūtinoša un pārsteidzoša. Tas ir tāpēc, ka šīs vaicājumu sistēmas (piemēram, Spark un Flink) paļaujas uz Scala
Nākamajā sadaļā veidosim demo ar PyIceberg, lai saprastu, kā darbojas Iceberg lasīšanas un rakstīšanas modeļi.
Kā izveidot datu ezeru ar PyIceberg, PyArrow un DuckDB
Apmācīsimies un izveidosim IoT sensoru datu lauku māju ar PyIceberg. Šajā piemērā mēs izmantosim PyIceberg un PyArrow, lai ievietotu / ievietotu un dzēstu Iceberg datus un izveidotu Visual Studio Code (VS Code).
Pirmkārt, tiek izveidota jauna Python virtuālā vide, ko sauc par ‘pyiceberg_playground’, izpildot šādu komandu:
$>python -m venv iceberg_playground
Tad šis direktorijs - 'iceberg_playground' - 'ir atvērts VS kodā, kur tiks hostēts PyIceberg projekts.
Tad PyIceberg un citas bibliotēkas tiek instalētas virtuālajā vidē, izpildot šādas divas komandas:
$>source bin/activate
(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas
Šajā piemērā PyIceberg izmantotu SqlCatalog, kas glabā Iceberg tabulas informāciju vietējā SQLite datubāzē. Iceberg atbalsta arī katalogus, tostarp REST, Hive un AWS Glue.
Konfigurācijas fails .pyiceberg.yaml tiek sagatavots ar šādu saturu projekta saknē:
catalog:
pyarrowtest:
uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db
warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1
Ievērojiet, kā Iceberg katalogs tiek glabāts zem pyiceberg_catalog direktorijas kā SQLite fails un datu noliktavā, kas glabā visus datus un metadatus adresē dw1.
Abas šīs direktorijas tagad tiek izveidotas projekta sakņu līmenī. Šis katalogs tiek saukts par pyarrowtest.
Pēc tam PyIceberg iestatījums tiek pārbaudīts, izmantojot šādu skriptu:
import os
from pyiceberg.catalog import load_catalog
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
Ievērojiet, kā PyIceberg lasa kataloga nosaukumu no YAML faila un izveido lokālu SQLite datubāzi pyiceberg_catalog direktorijā.
Ja skripts tiek darbināts pareizi, terminālā jāuzrāda “pyarrow_catalog” īpašības.
Skripts ielādēja katalogu no .YAML faila, jo vides mainīgais ‘PYICEBEG_HOME’ ir norādīts kā projekta sakne.
Pēc tam tiek pievienota shēma, izmantojot PyIceberg shēmas klasi. Tā kā šis piemērs saglabā datus no IoT sensoru kopas, shēma tiek veidota ar trim kolonnām un to nepieciešamajiem datu veidiem. lauka device_id ir iestatīts gan kā primārā atslēga, gan kā dalīšanas atslēga.
Nosaukuma telpa ir loģiska tabulu grupēšana noliktavā (atcerieties, kā noliktava jau ir izveidota, definējot YAML failu).
Sākotnējā datu ielādēšana tiek veikta, izmantojot PyArrow in-memory sarakstu, un sensor_table tiek lasīts, izmantojot PyIceberg scan() metodi, lai pārvērstu datus panda datu rāmī.
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (NestedField,
StringType, FloatType)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
import pyarrow as pa
os.environ["PYICEBERG_HOME"] = os.getcwd()
catalog = load_catalog(name='pyarrowtest')
print(catalog.properties)
# Define the schema
schema = Schema(
NestedField(1, "device_id", StringType(), required=True),
NestedField(2, "ampere_hour", FloatType(), required=True),
NestedField(3, "device_make", StringType(), required=True),
identifier_field_ids=[1] # 'device_id' is the primary key
)
# Define the partition spec - device_id as partition key
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")
)
# Create a namespace and an iceberg table
catalog.create_namespace_if_not_exists('sensor_ns')
sensor_table = catalog.create_table_if_not_exists(
identifier='sensor_ns.sensor_data',
schema=schema,
partition_spec=partition_spec
)
# Insert initial data
initial_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Siemens'])
], schema=schema.as_arrow())
# Write initial data
sensor_table.overwrite(initial_data)
# Print a Pandas dataframe representation of the table data
print("\nInsert operation completed")
print(sensor_table.scan().to_pandas())
Ja iepriekš minētais skripts tiek veiksmīgi palaists, šis rezultāts tiek parādīts terminālā:
Kad ievietošana ir veiksmīgi pabeigta, var validēt trīs Iceberg slāņus:
- Katalogs: Tas ir SQLite fails pyarrow_catalog.db, kas tiks pārbaudīts nedaudz vēlāk šajā rakstā.
- Metadati: “metadatu” direktorijā tiek izveidoti metadatu faili, kas ir būtiski, lai varētu veikt Create, Read, Update, Delete (CRUD) operācijas. Divi metadatu JSON faili tiek izveidoti, viens tabulas izveides laikā un otrs pēc pirmā datu ievietošanas.
- Dati: Datu faili tiek rakstīti .PARQUET formātā, ar device_id kā nodalījuma atslēgu. Tā kā ir trīs atsevišķas ierīces, tiek izveidotas trīs direktorijas. Iceberg tabula ‘sensor_data’ tiek izveidota ar nosaukuma telpu ‘sensor_ns.db’ noliktavā ‘dw1’. Šie datu lauki tiek izveidoti tabulas ‘sensor_data’ “datu” direktorijā.
PyIceberg izteiksmes var izmantot, lai filtrētu ierakstus.Daži no kopējiem izteiksmēm, ko izmanto filtrēšanai, ir: StartsWith, EqualTo, GreaterThan, And, Or, utt.
from pyiceberg.expressions import StartsWith, EqualTo
# Filter columns
print("\nFilter records with device_make == ABB ")
print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())
Turpmākais koda paraugs atjaunina esošo device_make vienam no sensoriem no “Siemens” līdz “Kepware”.
# Create an UPSERT batch of Arrow records where one fot he device_make is changed
upsert_data = pa.table([
pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),
pa.array([21.43, 17.86, 31.65]),
pa.array(['ABB', 'Honeywell', 'Kepware'])
], schema=schema.as_arrow())
# UPSERT changed data
try:
join_columns = ["device_id"]
upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))
except Exception as e:
print(e)
print("\nUpsert operation completed")
print(sensor_table.scan().to_pandas())
Līdzīgā veidā tiek atbalstīta arī operācija DELETE:
# Delete row
sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))
print("\n After Delete")
print(sensor_table.scan().to_pandas())
Ir vērts pieminēt, ka dzēšana jebkurā noliktavā ir niansēta operācija, un Iceberg nav izņēmums. Iceberg līmeņa operācijas ir definētas ar divām stratēģijām: Copy-on-Write (CoW) un Merge-on-Read (MoR). dzēšanas operācijas arī rada dzēšanas failus MoR stratēģijai.
PyIceberg pašlaik atbalsta MOR dzēšanu, bet ar dažām niansēm. Kaut arī PyIceberg piedāvā iespēju dzēst rindas, tas galvenokārt īsteno to, izmantojot CoW dzēš pēc noklusējuma, kas nozīmē, ka datu faili tiek pārrakstīti, nevis izveidoti dzēsti faili.
Kā pēdējais solis, izmantojiet DuckDB, lai vaicātu SQLite Iceberg katalogu, kas tiek glabāts failā pyarrow_catalog.db.
duckdb -ui pyiceberg_catalog/pyarrow_catalog.db
Tas atvērs pārlūkprogrammas logu portā 4213 (parastā), kur SQL vaicājumu varētu palaist Iceberg katalogā, kā parādīts:
Tas nodrošina vienkāršu un vienkāršu veidu, kā iegūt ieskatu no SQL kataloga.
Datu ieskatu atbloķēšana ar PyIceberg
Uzņēmumiem ar datu apjomiem, kas ir mazāki par terabaitiem, PyIceberg un PyArrow ir ātras iespējas, lai palaistu interaktīvus vaicājumus Python.
Datu inženieri var sākt arPyIceberg dokumentācijakas tiek saglabāts un saglabāts līdz šim.Ugunsgrēkslapā ir lieliski piemēri visiem PyIceberg API.
Laimīgs kods!