1,799 판독값
1,799 판독값

Salting 기술을 사용하여 Apache Spark에서 데이터 스키우를 수정하는 방법

~에 의해 Islam Elbanna7m2025/06/27
Read on Terminal Reader

너무 오래; 읽다

Apache Spark에서 salting 기술을 사용하여 성능 향상 및 Scala 및 PySpark에서 균형 잡힌 파티션을 해결하는 방법에 대해 알아보십시오.
featured image - Salting 기술을 사용하여 Apache Spark에서 데이터 스키우를 수정하는 방법
Islam Elbanna HackerNoon profile picture
0-item
1-item

큰 데이터 세트와 함께 작업할 때Apache Spark일반적인 성능 문제는data skew이것은 몇 개의 열쇠가dominate데이터의 분배, 가져오는uneven파티션 및 느린 쿼리.그것은 주로 필요한 작업 중에 발생합니다.shuffling마치joins또는 규칙적인aggregations.


스케이트를 줄이는 실용적인 방법은salting, 인공적으로 여러 파티션을 통해 무거운 키를 확산하는 것을 포함합니다.이 포스트에서, 나는 실용적인 예를 들어 이것을 통해 당신을 안내 할 것입니다.


Salting Data Skew 문제를 해결하는 방법

A 추가함으로써randomly결합 키에 숫자를 생성한 다음 결합 키를 통해 결합하면 큰 키를 더 균등하게 배포할 수 있습니다.이것은 데이터의 분포를 더 균등하게 만듭니다.This makes the data distribution more uniform and spreads the load across more workers, instead of sending most of the data to one worker and leaving the others idle.

소금의 혜택

  • Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

  • Improved Performance: Speeds up joins and aggregations by balancing the workload.

  • Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.


샐러드를 사용할 때

구부러진 키로 조합 또는 집합하는 동안, 오랜 흐름 시간이나 데이터 구부러지기 때문에 실행기 실패를 발견할 때 소금을 사용합니다.It is also helpful in real-time streaming applications where partitioning affects data processing efficiency, or when most workers are idle while a few are stuck in a running state.


Scala 에서 Salting Example

몇 가지 데이터를 생성해 보자.unbalanced하나는 큰 데이터 집합이고 다른 하나는 작은 데이터 집합이라고 가정할 수 있습니다.We can assume there are two datasets we need to join: one is a large data set, and the other is a small data set.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Simulated large dataset with skew
val largeDF = Seq(
  (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")

// Small dataset
val smallDF = Seq(
  (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")


우리가 사용하는 큰 데이터 세트에 소금 열을 추가하자.randomization큰 키의 값을 더 작은 파티션으로 확산하려면

// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
    withColumn("salt", (rand() * numBuckets).cast("int")).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
|          1|       txn1|   1|               1_1|
|          1|       txn2|   1|               1_1|
|          1|       txn3|   2|               1_2|
|          2|       txn4|   2|               2_2|
|          3|       txn5|   0|               3_0|
+-----------+-----------+----+------------------+


대용량 데이터 세트의 모든 가능한 무작위 소금 키를 다루는지 확인하려면, we need toexplode모든 가능한 소금 값을 가진 작은 데이터 세트

// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
    crossJoin(smallDF).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 

saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id|  name|salted_customer_id|
+----+-----------+------+------------------+
|   0|          1| Ahmed|               1_0|
|   1|          1| Ahmed|               1_1|
|   2|          1| Ahmed|               1_2|
|   0|          2|   Ali|               2_0|
|   1|          2|   Ali|               2_1|
|   2|          2|   Ali|               2_2|
|   0|          3|Hassan|               3_0|
|   1|          3|Hassan|               3_1|
|   2|          3|Hassan|               3_2|
+----+-----------+------+------------------+


이제 두 데이터 세트를 쉽게 연결할 수 있습니다.

// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
    join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
    select("customer_id", "transaction", "name")

joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction|  name|
+-----------+-----------+------+
|          1|       txn2| Ahmed|
|          1|       txn1| Ahmed|
|          1|       txn3| Ahmed|
|          2|       txn4|   Ali|
|          3|       txn5|Hassan|
+-----------+-----------+------+


Python 에서 샘플링

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType

# Simulated large dataset with skew
largeDF = spark.createDataFrame([
    (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])

# Small dataset
smallDF = spark.createDataFrame([
    (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])

# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
    saltedSmallDF,
    on=["salted_customer_id", "customer_id"],
    how="inner"
).select("customer_id", "transaction", "name")

노트

  • 이 코드는 spark.range(...)를 사용하여 Scala의 (0 까지 numBuckets).toDF("소금")를 모방합니다.
  • 열 표현은 col(...), lit(...), and concat(...)를 사용하여 처리됩니다.
  • 캐스트는 .cast(IntegerType())를 사용합니다.The cast to integer uses .cast(IntegerType()).


튜닝 팁: 선택numBuckets

숫자
  • numBuckets = 100을 설정하면 각 키가 100개의 하위 파티션으로 나눌 수 있습니다. 그러나 너무 많은 버킷을 사용하면 특히 데이터가 적은 키의 경우 성능이 낮아질 수 있기 때문에 조심하십시오.
  • If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0, e.x.

    •     // Step 1: create a salting key in the large dataset
              val numBuckets = 3
              val saltedLargeDF = largeDF.
                  withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
        
          // Step 2: Explode rows in smallDF for possible salted keys
              val saltedSmallDF = (0 until numBuckets).toDF("salt").
                  crossJoin(smallDF.filter($"customer_id" === 1)).
                  select("customer_id", "salt", "name").
                  union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
      


Rule of Thumb:소규모(예를 들어, 10-20)를 시작하고 관찰된 shuffle 크기와 작업 실행 시간에 따라 점차적으로 증가합니다.


최종 생각

Apache Spark에서 전통적인 파티션 또는 힌트를 관리하는 효과적이고 간단한 방법입니다 (SKEWED JOIN올바른 조정 및 모니터링을 통해 이 기술은 매우 왜곡된 데이터 세트에서 작업 실행 시간을 크게 줄일 수 있습니다.


원래 https://practical-software.com에 게시되었으며, 2025년 5월 11일에 게시되었습니다.

원래 출판된 athttps://practical-software.com2025년 5월 11일https://practical-software.comhttps://practical-software.com

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks