当您在大数据集中工作时Apache Spark一个共同的性能问题是data skew这种情况发生时,一些钥匙dominate数据分布,导致uneven分区和缓慢查询. 主要发生在需要shuffling如joins甚至是定期aggregations.
减少偏差的实际方法是salting,这涉及在多个分区中人工散布重密钥. 在本文中,我将用一个实际的例子指导您。
Salting 如何解决数据转移问题
通过添加Arandomly通过将生成的数字连接到连接密钥,然后连接到这个组合密钥,我们可以更均匀地分配大密钥,这使得数据的分布更均匀,并将负荷分布在更多的工人身上,而不是将大部分数据发送给一个工人,而让其他人空闲。
盐的益处
-
Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.
-
Improved Performance: Speeds up joins and aggregations by balancing the workload.
-
Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.
什么时候使用盐
在使用扭曲的键进行合并或聚合时,当您注意到由于数据扭曲而出现长时间的缝时间或执行器故障时,使用盐化,这也是实时流媒体应用程序中有用的,其中分区影响数据处理效率,或者当大多数工人处于空闲状态时,而少数人处于运行状态时。
在 Scala 中举例
让我们将一些数据与一个unbalanced我们可以假设我们需要连接两个数据集:一个是大数据集,另一个是小数据集。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
让我们将盐列添加到我们使用的大数据集中randomization将大密钥的值扩散到较小的分区
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
为了确保我们涵盖大数据集中的所有可能的随机盐键,我们需要explode包含所有可能的盐值的小数据集
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
现在我们可以轻松连接这两个数据集。
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
使用 Python 的例子
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
笔记
- 此代码使用 spark.range(...) 来模仿 Scala 的 (0 到 numBuckets).toDF(“盐”).
- 列表达式是使用col(...), lit(...),和 concat(...)来处理的。
- 演示到整数使用 .cast(IntegerType())。
调制方法:选择numBuckets
号码
- 如果您将 numBuckets 设置为 100,每个密钥可以分为 100 个子分区,但是要小心,因为使用过多的桶可能会降低性能,特别是对于数据少的密钥。
- If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal
0
, e.x.-
// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
-
Rule of Thumb:开始小(例如,10-20),并根据观察到的尺寸和任务运行时间逐步增加。
最后的想法
SKEWED JOIN
有了正确的调节和监控,这种技术可以在高度扭曲的数据集上显著减少任务执行时间。
最初发表在 https://practical-software.com 于 2025 年 5 月 11 日。
最初发表在