Lorsque vous travaillez avec de grands ensembles de données dansApache SparkUn problème de performance estdata skewCela se produit lorsque quelques clésdominatela distribution des données, conduisant àunevenpartitions et des requêtes lentes. Cela se produit principalement pendant les opérations qui nécessitentshufflingcommejoinsOu même régulieraggregations.
Une méthode pratique pour réduire les déchets estsalting, qui implique la diffusion artificielle de clés lourdes sur plusieurs partitions. Dans ce post, je vous guiderai à travers cela avec un exemple pratique.
Comment Salting résout les problèmes de détournement de données
En ajoutant arandomlygénéré le nombre à la clé jointe, puis en se joignant sur cette clé combinée, nous pouvons distribuer les grandes clés plus uniformément.Cela rend la distribution des données plus uniforme et répartit la charge sur plus de travailleurs, au lieu d'envoyer la plupart des données à un travailleur et de laisser les autres inactives.
Les bienfaits du salage
-
Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.
-
Improved Performance: Speeds up joins and aggregations by balancing the workload.
-
Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.
Quand utiliser le saumon
Lors de joints ou d'agrégations avec des clés déformées, utilisez le saltage lorsque vous remarquez de longs temps de déformation ou des erreurs d'exécuteur en raison d'une déformation des données. Il est également utile dans les applications de streaming en temps réel où la partitionnement affecte l'efficacité du traitement des données, ou lorsque la plupart des travailleurs sont inactives alors que quelques-uns sont coincés dans un état d'exécution.
L’exemple de l’escalade
Nous allons générer quelques données avec ununbalancedNous pouvons supposer qu'il y a deux ensembles de données que nous devons joindre: l'un est un grand ensemble de données, et l'autre est un petit ensemble de données.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
Ajoutez la colonne salée aux grands ensembles de données, que nous utilisonsrandomizationpour distribuer les valeurs de la grande clé dans des partitions plus petites
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
Pour nous assurer que nous couvrons toutes les clés salées aléatoires possibles dans les grands ensembles de données, nous devonsexplodele petit ensemble de données avec toutes les valeurs salées possibles
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
Maintenant, nous pouvons facilement joindre les deux ensembles de données
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
Exemple en Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
Notes
- Ce code utilise spark.range(...) pour imiter Scala (0 à numBuckets).toDF (« sel »).
- Les expressions de colonne sont traitées en utilisant col (...), lit (...), et concat (...).
- Le cast à integer utilise .cast (IntegerType()).
Tuning : le choixnumBuckets
Numérotation
- Si vous définissez numBuckets = 100, chaque clé peut être divisée en 100 sous-partitions. Cependant, soyez prudent car l'utilisation de trop de buckets peut diminuer la performance, en particulier pour les clés avec peu de données.
- If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal
0
, e.x.-
// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
-
Rule of Thumb:Commencez petit (par exemple, 10-20) et augmentez progressivement en fonction des tailles de shuffle observées et du temps d'exécution des tâches.
Pensées finales
Le saltage est une méthode efficace et simple pour gérer la déviation dans Apache Spark lorsqu'il s'agit de partitions traditionnelles ou d'indices (SKEWED JOIN
Avec le bon réglage et la surveillance, cette technique peut réduire considérablement les temps d'exécution des tâches sur des ensembles de données très déformés.
Publié à https://practical-software.com le 11 mai 2025.
Publié à l'origine à