4,230 lượt đọc
4,230 lượt đọc

Hướng dẫn của một kỹ sư dữ liệu đến PyIceberg

từ tác giả Confluent2025/06/20
Read on Terminal Reader

dài quá đọc không nổi

Hướng dẫn này hướng dẫn các kỹ sư dữ liệu bằng cách sử dụng PyIceberg, một thư viện Python để quản lý các bảng Apache Iceberg mà không cần các cụm JVM lớn. Nó bao gồm thiết lập, tạo sơ đồ, hoạt động CRUD và truy vấn với DuckDB. Lý tưởng cho các nhóm làm việc với dữ liệu nhỏ đến trung bình, PyIceberg đơn giản hóa quy trình làm việc hồ dữ liệu mở bằng cách sử dụng các công cụ như PyArrow và DuckDB.
featured image - Hướng dẫn của một kỹ sư dữ liệu đến PyIceberg
Confluent HackerNoon profile picture
0-item

Viết bởi: Diptiman Raichaudhuri, Luật sư phát triển nhân sự tại Confluent

Viết bởi: Diptiman Raichaudhuri, Luật sư phát triển nhân sự tại Confluent


Bài viết này cho thấy các kỹ sư dữ liệu làm thế nào để sử dụng PyIceberg, một thư viện Python nhẹ và mạnh mẽ. PyIceberg làm cho nó dễ dàng hơn để thực hiện các nhiệm vụ dữ liệu phổ biến như tạo, đọc, sửa đổi hoặc xóa dữ liệu trong Apache Iceberg, mà không cần một cụm lớn.


Được thúc đẩy bởi nhu cầu kinh doanh phức tạp và nhu cầu phân tích khối lượng thông tin lớn hơn, các nền tảng dữ liệu đã thay đổi đáng kể trong vài năm qua để giúp các doanh nghiệp trích xuất nhiều hiểu biết và giá trị hơn từ các nguồn dữ liệu đa dạng của họ.


Đối với các trường hợp sử dụng phân tích doanh nghiệp, nền tảng hồ dữ liệu mở đã đi đầu trong sự phát triển của nền tảng này. hồ dữ liệu mở cho phép các nhóm dữ liệu tạo ra các kiến trúc có thể “sản xuất” trong hệ sinh thái của họ. Với mô hình này, các nhóm dữ liệu có thể thiết kế các nền tảng dữ liệu với các lớp lưu trữ, tính toán và quản trị dữ liệu của sự lựa chọn của họ, để đáp ứng nhu cầu liên tục phát triển của doanh nghiệp. Các định dạng bảng mở (OTF) như Apache Iceberg là động cơ thúc đẩy việc áp dụng các hồ nước mở hiện đại.



Composable Data Platform Architecture


Composable Data Platform Architecture


Các nền tảng dữ liệu được xây dựng với Apache Iceberg thường có ba lớp:


  1. Lớp dữ liệu: Các tệp dữ liệu vật lý (thường là định dạng Parquet, Avro hoặc ORC) được lưu trữ trong lớp này.
  2. The metadata layer: Instead of sorting tables into individual directories, Iceberg maintains a list of files. The metadata layer manages table snapshots, schema, and file statistics. 
  3. Iceberg Catalog: Catalog là kho lưu trữ trung tâm tạo điều kiện cho việc khám phá, tạo và sửa đổi các bảng và đảm bảo tính nhất quán của giao dịch trong việc quản lý các bảng Iceberg.


Biểu đồ phổ biến này từ tài liệu Apache Iceberg minh họa các lớp này:


 

Bảng xếp hạng Bảng xếp hạng Bảng xếp hạng (Nguồn)



PyIceberg là gì?


PyIceberg trao quyền cho các kỹ sư phân tích và dữ liệu để xây dựng các nền tảng hồ nước mở tinh vi trên nhiều loại đám mây (AWS, Azure và Google Cloud) và lưu trữ tại chỗ. PyIceberg là một ứng dụng Python để truy cập vào các bảng Iceberg. Các nhà phát triển sử dụng PyIceberg có thể sử dụng chuyển đổi dữ liệu Pythonic mà không cần phải chạy các công cụ truy vấn hiệu suất cao trong các cụm máy ảo Java (JVM). PyIceberg sử dụng danh mục để tải các bảng Iceberg và thực hiện các hoạt động đọc-viết-thiết. Nó xử lý các khía cạnh định dạng siêu dữ liệu và bảng của Iceberg, cho phép các kỹ sư dữ liệu sử dụng các công cụ dữ liệu nhỏ, nhanh chóng và hiệu quả cao như PyArrow


PyIceberg có thể chạy như một chương trình độc lập hoặc trên các cụm Kubernetes cho khả năng dung nạp lỗi. Tích hợp bản địa với các giao thức danh mục Iceberg như REST, SQLCatalog hoặc AWS Glue, làm cho nó trở thành một lựa chọn phổ biến và dễ dàng để truy vấn các bảng Iceberg mà không cần các cụm JVM/py4j.


Việc triển khai sản xuất của PyIceberg thường tích hợp tải công việc truyền dữ liệu nhưBảng flow, nơi dữ liệu phát trực tuyến được sản xuất dưới dạng các chủ đề Apache Kafka và được thực hiện dưới dạng bảng Iceberg. Đây là một công cụ mạnh mẽ để thu hẹp khoảng cách giữa các hệ thống dữ liệu hoạt động và hệ thống dữ liệu phân tích.


Tại sao PyIceberg?


PyIceberg cung cấp một cách thân thiện với Python để chạy các hoạt động ngôn ngữ thao tác dữ liệu (DML) trên các bảng Iceberg. Đối với các nền tảng dữ liệu nhỏ đến trung bình làm việc với 100 gigabytes dữ liệu - chẳng hạn như những người xử lý phân tích bộ phận, báo cáo nội bộ hoặc công cụ chuyên dụng - dễ sử dụng thường quan trọng hơn đối với các doanh nghiệp so với các tính năng phức tạp. Nếu khối lượng dữ liệu (cả lịch sử và gia tăng) không lớn, triển khai một cụm phun đầy đủ để chạy truy vấn trên Iceberg có thể có vẻ áp đảo và gây chết người. Đó là bởi vì các công cụ truy vấn (như Spark và Flink) dựa vào các ngôn ngữ lập trình Scala hoặc Java chạy trên Máy ảo Java (JVM) để áp đặt và tối ưu hóa xử lý đa


Trong phần tiếp theo, chúng ta hãy xây dựng một bản demo với PyIceberg để hiểu cách các mô hình đọc và viết của Iceberg hoạt động.


Làm thế nào để xây dựng một hồ dữ liệu với PyIceberg, PyArrow và DuckDB


Chúng ta hãy thực hành và xây dựng một bản demo của hồ dữ liệu cảm biến IoT với PyIceberg.Đối với ví dụ này, chúng ta sẽ sử dụng PyIceberg và PyArrow để chèn/upert và xóa dữ liệu Iceberg và xây dựng trong Visual Studio Code (VS Code).


Đầu tiên, một môi trường ảo Python mới được tạo ra được gọi là 'pyiceberg_playground', bằng cách chạy lệnh sau:


$>python -m venv iceberg_playground


Sau đó thư mục này - 'iceberg_playground' - 'được mở trong VS Code nơi dự án PyIceberg sẽ được lưu trữ.



PyIceberg và các thư viện khác sau đó được cài đặt trong môi trường ảo bằng cách chạy hai lệnh sau:


$>source bin/activate

(iceberg_playground)$>pip install pyiceberg daft duckdb sqlalchemy pandas


Đối với ví dụ này, PyIceberg sẽ sử dụng SqlCatalog, lưu trữ thông tin bảng Iceberg trong cơ sở dữ liệu SQLite cục bộ. Iceberg cũng hỗ trợ các danh mục bao gồm REST, Hive và AWS Glue.


Một tệp cấu hình .pyiceberg.yaml được chuẩn bị với nội dung sau, tại gốc dự án:


catalog:

 pyarrowtest:

   uri: sqlite:///pyiceberg_catalog/pyarrow_catalog.db

   warehouse: file:////Users/diptimanraichaudhuri/testing_space/iceberg_playground/dw1


Lưu ý cách catalog Iceberg được lưu trữ dưới thư mục pyiceberg_catalog dưới dạng tệp SQLite và trong kho dữ liệu, lưu trữ tất cả dữ liệu và siêu dữ liệu trong thư mục dw1.


Cả hai thư mục này bây giờ được tạo ra trong mức gốc của dự án. danh mục này được gọi là pyarrowtest.


Tiếp theo, cài đặt PyIceberg được kiểm tra bằng cách sử dụng kịch bản sau:


import os

from pyiceberg.catalog import load_catalog

os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)


Lưu ý cách PyIceberg đọc tên thư mục từ tệp YAML và tạo cơ sở dữ liệu SQLite cục bộ trong thư mục pyiceberg_catalog. vì SQLite được phân phối với các trình cài đặt Python, nó không cần phải được cài đặt riêng.



Nếu kịch bản được chạy đúng cách, các thuộc tính 'pyarrow_catalog' nên được hiển thị trong thiết bị đầu cuối.



Kịch bản đã tải danh mục từ tệp .YAML, vì biến môi trường ‘PYICEBEG_HOME’ đã được chỉ định là gốc dự án.


Tiếp theo, một sơ đồ được thêm vào bằng cách sử dụng lớp sơ đồ của PyIceberg. Vì ví dụ này lưu trữ dữ liệu từ một tập hợp các cảm biến IoT, sơ đồ được xây dựng với ba cột và các loại dữ liệu cần thiết của chúng.


Sau đó, một namespace được tạo ra cùng với bảng Iceberg với sơ đồ. một namespace là một nhóm hợp lý của các bảng trong một kho (hãy nhớ làm thế nào một kho đã được tạo ra khi xác định tập tin YAML).


Việc tải dữ liệu ban đầu được thực hiện bằng cách sử dụng danh sách trong bộ nhớ PyArrow, và bảng sensor_table được đọc bằng phương pháp PyIceberg scan() để chuyển đổi dữ liệu thành khung dữ liệu pandas.


import os

from pyiceberg.catalog import load_catalog

from pyiceberg.schema import Schema

from pyiceberg.types import (NestedField,

                             StringType, FloatType)

from pyiceberg.partitioning import PartitionSpec, PartitionField

from pyiceberg.transforms import IdentityTransform

import pyarrow as pa


os.environ["PYICEBERG_HOME"] = os.getcwd()

catalog = load_catalog(name='pyarrowtest')

print(catalog.properties)

# Define the schema

schema = Schema(

    NestedField(1, "device_id", StringType(), required=True),

    NestedField(2, "ampere_hour", FloatType(), required=True),

    NestedField(3, "device_make", StringType(), required=True),

    identifier_field_ids=[1]  # 'device_id' is the primary key

)

# Define the partition spec - device_id as partition key

partition_spec = PartitionSpec(

    PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="device_id")

)

# Create a namespace and an iceberg table

catalog.create_namespace_if_not_exists('sensor_ns')

sensor_table = catalog.create_table_if_not_exists(

    identifier='sensor_ns.sensor_data',

    schema=schema,

    partition_spec=partition_spec

)

# Insert initial data

initial_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Siemens'])

], schema=schema.as_arrow())

# Write initial data

sensor_table.overwrite(initial_data)

# Print a Pandas dataframe representation of the table data

print("\nInsert operation completed")

print(sensor_table.scan().to_pandas())


Nếu kịch bản trên được chạy thành công, kết quả này sẽ được hiển thị trong thiết bị đầu cuối:



Với việc chèn thành công, ba lớp của Iceberg có thể được xác nhận:


  1. Catalog: Đây là tệp SQLite pyarrow_catalog.db, sẽ được xác minh một chút sau trong bài viết này.
  2. Metadata: Trong thư mục ‘metadata’, các tập tin siêu dữ liệu được tạo ra, điều này rất quan trọng để cho phép các hoạt động Create, Read, Update, Delete (CRUD). hai tập tin siêu dữ liệu JSON được tạo ra, một trong khi bảng được tạo ra, và thứ hai sau khi chèn dữ liệu đầu tiên. tập tin ‘snap-*.avro’ là danh sách hiển nhiên, và tập tin hiển nhiên là tập tin .avro khác.
  3. Dữ liệu: Các tệp dữ liệu được viết ở định dạng .PARQUET, với device_id làm phím phân vùng. Vì có ba thiết bị riêng biệt, ba thư mục được tạo ra. Bảng Iceberg 'sensor_data' được tạo bằng không gian tên 'sensor_ns.db' trong kho 'dw1'. Những trường dữ liệu này được tạo trong thư mục 'dữ liệu' của bảng 'sensor_data'.



Các biểu thức PyIceberg có thể được sử dụng để lọc hồ sơ. Một số biểu thức phổ biến được sử dụng để lọc là: StartsWith, EqualTo, GreaterThan, And, Or, v.v.


from pyiceberg.expressions import StartsWith, EqualTo

# Filter columns

print("\nFilter records with device_make == ABB ")

print(sensor_table.scan(row_filter=EqualTo('device_make', 'ABB')).to_pandas())


PyIceberg cũng hỗ trợ các hoạt động UPSERT. mẫu mã sau đây cập nhật thiết bị_make hiện có cho một trong các cảm biến từ 'Siemens' đến 'Kepware'.


# Create an UPSERT batch of Arrow records where one fot he device_make is changed

upsert_data = pa.table([

    pa.array(['b8:27:eb:bf:9d:51', '00:0f:00:70:91:0a', '1c:bf:ce:15:ec:4d']),

    pa.array([21.43, 17.86, 31.65]),

    pa.array(['ABB', 'Honeywell', 'Kepware'])

], schema=schema.as_arrow())

# UPSERT changed data

try:

    join_columns = ["device_id"]

    upsert_result = sensor_table.upsert(upsert_data.select(["device_id", "ampere_hour", "device_make"]))

except Exception as e:

    print(e)

print("\nUpsert operation completed")

print(sensor_table.scan().to_pandas())


Tương tự, hoạt động DELETE cũng được hỗ trợ:


# Delete row

sensor_table.delete(delete_filter=EqualTo('device_id', '1c:bf:ce:15:ec:4d'))

print("\n After Delete")

print(sensor_table.scan().to_pandas())


Điều đáng nói là xóa trong bất kỳ kho là một hoạt động sắc thái, và Iceberg không phải là ngoại lệ. hoạt động cấp hàng ở Iceberg được xác định bởi hai chiến lược: Copy-on-Write (CoW) và Merge-on-Read (MoR). hoạt động xóa cũng tạo ra các tập tin xóa cho chiến lược MoR.


PyIceberg hiện đang hỗ trợ xóa MOR, nhưng với một số sắc thái. Trong khi PyIceberg cung cấp khả năng xóa hàng, nó chủ yếu thực hiện điều này bằng cách sử dụng xóa CoW theo mặc định, có nghĩa là các tệp dữ liệu được viết lại thay vì tạo các tệp bị xóa. tuy nhiên, có công việc đang được tiến hành để tăng cường MoR của PyIceberg để hỗ trợ xóa bình đẳng và làm cho nó hiệu quả hơn cho các bản cập nhật nhỏ thường xuyên.


Như một bước cuối cùng, chúng ta hãy sử dụng DuckDB để truy vấn danh mục SQLite Iceberg, được lưu trữ trong tệp pyarrow_catalog.db.


duckdb -ui pyiceberg_catalog/pyarrow_catalog.db


Điều này sẽ mở ra một cửa sổ trình duyệt tại cổng 4213 (mặc định), nơi một truy vấn SQL có thể được chạy trên danh mục Iceberg, như được hiển thị:



Điều này cung cấp một cách dễ dàng và đơn giản để trích xuất những hiểu biết từ danh mục SQL


Mở khóa dữ liệu với PyIceberg


Đối với các công ty có khối lượng dữ liệu nhỏ hơn terabyte, PyIceberg và PyArrow là các lựa chọn nhanh để chạy các truy vấn tương tác trong Python. Sử dụng sự kết hợp này có thể giảm đáng kể thời gian để tạo ra thông tin chi tiết cho các tòa nhà hồ nhỏ đến trung bình.


Kỹ sư dữ liệu có thể bắt đầu vớiTài liệu PyIceberg, được duy trì và duy trì cho đến nay.LửaTrang này có những ví dụ tuyệt vời về tất cả các API PyIceberg.


Hạnh phúc coding!

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks