1,799 讀數
1,799 讀數

如何用 Salting 技术在 Apache Spark 中修复数据转移

经过 Islam Elbanna7m2025/06/27
Read on Terminal Reader

太長; 讀書

了解如何在 Apache Spark 中使用盐化技术修复数据偏差,以提高 Scala 和 PySpark 中的性能和分区平衡。
featured image - 如何用 Salting 技术在 Apache Spark 中修复数据转移
Islam Elbanna HackerNoon profile picture
0-item
1-item

当您在大数据集中工作时Apache Spark一个共同的性能问题是data skew这种情况发生时,一些钥匙dominate数据分布,导致uneven分区和缓慢查询. 主要发生在需要shufflingjoins甚至是定期aggregations.


减少偏差的实际方法是salting,这涉及在多个分区中人工散布重密钥. 在本文中,我将用一个实际的例子指导您。


Salting 如何解决数据转移问题

通过添加Arandomly通过将生成的数字连接到连接密钥,然后连接到这个组合密钥,我们可以更均匀地分配大密钥,这使得数据的分布更均匀,并将负荷分布在更多的工人身上,而不是将大部分数据发送给一个工人,而让其他人空闲。

盐的益处

  • Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

  • Improved Performance: Speeds up joins and aggregations by balancing the workload.

  • Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.


什么时候使用盐

在使用扭曲的键进行合并或聚合时,当您注意到由于数据扭曲而出现长时间的缝时间或执行器故障时,使用盐化,这也是实时流媒体应用程序中有用的,其中分区影响数据处理效率,或者当大多数工人处于空闲状态时,而少数人处于运行状态时。


在 Scala 中举例

让我们将一些数据与一个unbalanced我们可以假设我们需要连接两个数据集:一个是大数据集,另一个是小数据集。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Simulated large dataset with skew
val largeDF = Seq(
  (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")

// Small dataset
val smallDF = Seq(
  (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")


让我们将盐列添加到我们使用的大数据集中randomization将大密钥的值扩散到较小的分区

// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
    withColumn("salt", (rand() * numBuckets).cast("int")).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
|          1|       txn1|   1|               1_1|
|          1|       txn2|   1|               1_1|
|          1|       txn3|   2|               1_2|
|          2|       txn4|   2|               2_2|
|          3|       txn5|   0|               3_0|
+-----------+-----------+----+------------------+


为了确保我们涵盖大数据集中的所有可能的随机盐键,我们需要explode包含所有可能的盐值的小数据集

// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
    crossJoin(smallDF).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 

saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id|  name|salted_customer_id|
+----+-----------+------+------------------+
|   0|          1| Ahmed|               1_0|
|   1|          1| Ahmed|               1_1|
|   2|          1| Ahmed|               1_2|
|   0|          2|   Ali|               2_0|
|   1|          2|   Ali|               2_1|
|   2|          2|   Ali|               2_2|
|   0|          3|Hassan|               3_0|
|   1|          3|Hassan|               3_1|
|   2|          3|Hassan|               3_2|
+----+-----------+------+------------------+


现在我们可以轻松连接这两个数据集。

// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
    join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
    select("customer_id", "transaction", "name")

joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction|  name|
+-----------+-----------+------+
|          1|       txn2| Ahmed|
|          1|       txn1| Ahmed|
|          1|       txn3| Ahmed|
|          2|       txn4|   Ali|
|          3|       txn5|Hassan|
+-----------+-----------+------+


使用 Python 的例子

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType

# Simulated large dataset with skew
largeDF = spark.createDataFrame([
    (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])

# Small dataset
smallDF = spark.createDataFrame([
    (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])

# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
    saltedSmallDF,
    on=["salted_customer_id", "customer_id"],
    how="inner"
).select("customer_id", "transaction", "name")

笔记

  • 此代码使用 spark.range(...) 来模仿 Scala 的 (0 到 numBuckets).toDF(“盐”).
  • 列表达式是使用col(...), lit(...),和 concat(...)来处理的。
  • 演示到整数使用 .cast(IntegerType())。


调制方法:选择numBuckets

号码
  • 如果您将 numBuckets 设置为 100,每个密钥可以分为 100 个子分区,但是要小心,因为使用过多的桶可能会降低性能,特别是对于数据少的密钥。
  • If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0, e.x.

    •     // Step 1: create a salting key in the large dataset
              val numBuckets = 3
              val saltedLargeDF = largeDF.
                  withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
        
          // Step 2: Explode rows in smallDF for possible salted keys
              val saltedSmallDF = (0 until numBuckets).toDF("salt").
                  crossJoin(smallDF.filter($"customer_id" === 1)).
                  select("customer_id", "salt", "name").
                  union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
      


Rule of Thumb:开始小(例如,10-20),并根据观察到的尺寸和任务运行时间逐步增加。


最后的想法

SKEWED JOIN有了正确的调节和监控,这种技术可以在高度扭曲的数据集上显著减少任务执行时间。


最初发表在 https://practical-software.com 于 2025 年 5 月 11 日。

最初发表在https://practical-software.com2025年5月11日。https://practical-software.comhttps://practical-software.com

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks