Wenn Sie mit großen Datensätzen inApache SparkEin gemeinsames Leistungsproblem istdata skewDies geschieht, wenn ein paar SchlüsseldominateDie Datenverteilung führt zuunevenPartitionen und langsame Abfragen. Dies geschieht hauptsächlich während von Operationen, dieshufflingSo wiejoinsOder sogar regelmäßigaggregations.
Ein praktischer Weg, um den Schwanz zu reduzieren, istsalting, was die künstliche Verbreitung schwerer Schlüssel über mehrere Partitionen beinhaltet. In diesem Beitrag werde ich Sie durch dies mit einem praktischen Beispiel führen.
Wie Salting Data Skew-Probleme löst
Durch Hinzufügen arandomlyWir können große Schlüssel gleichmäßiger verteilen, indem wir die erzeugte Zahl auf den Join-Schlüssel übertragen und dann über diesen kombinierten Schlüssel übertragen.Dies macht die Datenverteilung einheitlicher und verteilt die Last auf mehr Arbeiter, anstatt den Großteil der Daten an einen Arbeiter zu senden und die anderen leer zu lassen.
Vorteile des Salzens
-
Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.
-
Improved Performance: Speeds up joins and aggregations by balancing the workload.
-
Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.
Wann Salz verwenden
Bei Joins oder Aggregationen mit verzerrten Tasten verwenden Sie Salting, wenn Sie lange Shuffle-Zeiten oder Ausführungsfehler aufgrund von Datenverschiebungen bemerken. Es ist auch hilfreich in Echtzeit-Streaming-Anwendungen, bei denen die Partitionierung die Effizienz der Datenverarbeitung beeinflusst, oder wenn die meisten Mitarbeiter leer sind, während einige in einem laufenden Zustand stecken.
Ein Beispiel in der Scala
Lassen Sie uns einige Daten mit einemunbalancedWir können annehmen, dass es zwei Datensätze gibt, die wir verbinden müssen: Eine ist ein großer Datensatz und die andere ist ein kleiner Datensatz.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
Fügen wir die Salzspalte zu den großen Datensätzen hinzu, die wir verwendenrandomizationum die Werte des großen Schlüssels in kleinere Partitionen zu verteilen
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
Um sicherzustellen, dass wir alle möglichen randomisierten Salzschlüssel in den großen Datensätzen abdecken, müssen wirexplodeDer kleine Datensatz mit allen möglichen Salzwerten
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
Jetzt können wir die beiden Datensätze leicht miteinander verbinden
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
Ein Beispiel in Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
Notizen
- Dieser Code verwendet spark.range(...) um Scala (0 bis numBuckets).toDF („Salz“) zu imitieren.
- Spalteausdrücke werden mithilfe von col (...), lit (...), und concat (...).
- Der Cast to integer verwendet .cast (IntegerType()).
Tuning Tipp: WählennumBuckets
Numberspiele
- Wenn Sie numBuckets = 100 festlegen, kann jeder Schlüssel in 100 Unterpartitionen aufgeteilt werden. Seien Sie jedoch vorsichtig, da die Verwendung von zu vielen Buckets die Leistung verringern kann, insbesondere bei Schlüsseln mit wenig Daten.
- If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal
0
, e.x.-
// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
-
Rule of Thumb:Beginnen Sie klein (z.B. 10-20) und erhöhen Sie allmählich auf der Grundlage der beobachteten Shuffle-Größen und der Aufgabenlaufzeit.
Endgültige Gedanken
Salting ist eine effektive und einfache Methode zur Verwaltung von Abweichungen in Apache Spark, wenn traditionelle Partitionierung oder Hinweise (SKEWED JOIN
Mit der richtigen Tuning und Überwachung kann diese Technik die Ausführungszeiten bei stark verzerrten Datensätzen deutlich reduzieren.
Ursprünglich veröffentlicht unter https://practical-software.com am 11. Mai 2025.
Ursprünglich veröffentlicht in