1,823 रीडिंग
1,823 रीडिंग

Salting तकनीक के साथ Apache Spark में डेटा स्केव कैसे ठीक करें

द्वारा Islam Elbanna7m2025/06/27
Read on Terminal Reader

बहुत लंबा; पढ़ने के लिए

सीखें कि Apache Spark में डेटा स्क्वाउट को सुधारने के लिए सॉलिंग तकनीक का उपयोग करके Scala और PySpark में बेहतर प्रदर्शन और संतुलित विभाजन कैसे करें।
featured image - Salting तकनीक के साथ Apache Spark में डेटा स्केव कैसे ठीक करें
Islam Elbanna HackerNoon profile picture
0-item
1-item

जब आप बड़े डेटा सेट के साथ काम करते हैंApache Sparkएक सामान्य प्रदर्शन समस्या हैdata skewयह तब होता है जब कुछ कुंजीdominateडेटा वितरण, जिससेunevenयह मुख्य रूप से उन ऑपरेशनों के दौरान होता है जिनकी आवश्यकता होती हैshufflingजैसेjoinsया नियमित रूप सेaggregations.


स्केव को कम करने का एक व्यावहारिक तरीका हैsalting, जिसमें कई विभाजनों पर कृत्रिम रूप से भारी कुंजीों को फैलाना शामिल है. इस पोस्ट में, मैं आपको इस के माध्यम से एक व्यावहारिक उदाहरण के साथ मार्गदर्शन करूंगा।


डेटा स्केव मुद्दों को कैसे हल करता है

जोड़कर Arandomlyजोड़ कुंजी पर संख्या उत्पन्न और फिर इस संयुक्त कुंजी पर जोड़कर, हम बड़े कुंजी को अधिक समान रूप से वितरित कर सकते हैं. यह डेटा वितरण को अधिक समान बनाता है और अधिक श्रमिकों पर भार फैलाता है, एक श्रमिकों को अधिकांश डेटा भेजने के बजाय और दूसरों को बेकार छोड़ देता है.

नमक के लाभ

  • Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.

  • Improved Performance: Speeds up joins and aggregations by balancing the workload.

  • Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.


जब सलाद का उपयोग करें

विचलित कुंजी के साथ जोड़ों या जोड़ों के दौरान, जब आप डेटा विचलन के कारण लंबे shuffle समय या निष्पादक विफलताओं को नोटिस करते हैं तो नमन का उपयोग करें. यह वास्तविक समय स्ट्रीमिंग अनुप्रयोगों में भी उपयोगी है जहां विभाजन डेटा प्रसंस्करण दक्षता को प्रभावित करता है, या जब अधिकांश कर्मचारियों बेकार हैं जबकि कुछ चलने की स्थिति में फंस गए हैं.


स्केल में उदाहरण

आइए हम एक के साथ कुछ डेटा उत्पन्न करते हैंunbalancedहम मान सकते हैं कि हमें जोड़ने की आवश्यकता है दो डेटासेट हैं: एक एक बड़ा डेटासेट है, और दूसरा एक छोटा डेटासेट है।

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Simulated large dataset with skew
val largeDF = Seq(
  (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")

// Small dataset
val smallDF = Seq(
  (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")


चलो बड़े डेटा सेट में नमकीन कॉलम जोड़ते हैं, जिसे हम उपयोग करते हैंrandomizationबड़े कुंजी के मूल्यों को छोटे विभाजनों में फैलाने के लिए

// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
    withColumn("salt", (rand() * numBuckets).cast("int")).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))

saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
|          1|       txn1|   1|               1_1|
|          1|       txn2|   1|               1_1|
|          1|       txn3|   2|               1_2|
|          2|       txn4|   2|               2_2|
|          3|       txn5|   0|               3_0|
+-----------+-----------+----+------------------+


यह सुनिश्चित करने के लिए कि हम बड़े डेटा सेट में सभी संभावित यादृच्छिक नमकीन कुंजी कवर करते हैं, हमेंexplodeसभी संभावित नमकीन मूल्यों के साथ छोटे डेटासेट

// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
    crossJoin(smallDF).
    withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) 

saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id|  name|salted_customer_id|
+----+-----------+------+------------------+
|   0|          1| Ahmed|               1_0|
|   1|          1| Ahmed|               1_1|
|   2|          1| Ahmed|               1_2|
|   0|          2|   Ali|               2_0|
|   1|          2|   Ali|               2_1|
|   2|          2|   Ali|               2_2|
|   0|          3|Hassan|               3_0|
|   1|          3|Hassan|               3_1|
|   2|          3|Hassan|               3_2|
+----+-----------+------+------------------+


अब हम दोनों डेटा सेट को आसानी से जोड़ सकते हैं

// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
    join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
    select("customer_id", "transaction", "name")

joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction|  name|
+-----------+-----------+------+
|          1|       txn2| Ahmed|
|          1|       txn1| Ahmed|
|          1|       txn3| Ahmed|
|          2|       txn4|   Ali|
|          3|       txn5|Hassan|
+-----------+-----------+------+


Python में उदाहरण

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType

# Simulated large dataset with skew
largeDF = spark.createDataFrame([
    (1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])

# Small dataset
smallDF = spark.createDataFrame([
    (1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])

# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
    .withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))

# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
    saltedSmallDF,
    on=["salted_customer_id", "customer_id"],
    how="inner"
).select("customer_id", "transaction", "name")

नोट्स

  • इस कोड में spark.range(...) का उपयोग Scala का अनुकरण करने के लिए किया जाता है (0 से numBuckets).toDF ("साल") तक।
  • कॉलम अभिव्यक्तियों को col(...), lit(...), और concat(...) का उपयोग करके संसाधित किया जाता है।
  • अद्वितीय का उपयोग .cast (IntegerType()) में किया जाता है।


ट्यूनिंग टिप: चुनेंnumBuckets

संख्याएं
  • यदि आप numBuckets = 100 सेट करते हैं, तो प्रत्येक कुंजी को 100 उप विभाजनों में विभाजित किया जा सकता है. हालांकि, सावधान रहें क्योंकि बहुत अधिक बक्के का उपयोग करने से प्रदर्शन कम हो सकता है, खासकर छोटे डेटा वाले कुंजी के लिए।
  • If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal 0, e.x.

    •     // Step 1: create a salting key in the large dataset
              val numBuckets = 3
              val saltedLargeDF = largeDF.
                  withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
        
          // Step 2: Explode rows in smallDF for possible salted keys
              val saltedSmallDF = (0 until numBuckets).toDF("salt").
                  crossJoin(smallDF.filter($"customer_id" === 1)).
                  select("customer_id", "salt", "name").
                  union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")).
                  withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
      


Rule of Thumb:छोटे (उदाहरण के लिए, 10-20) से शुरू करें और ध्यान में रखे गए shuffle आकार और कार्य संचालन समय के आधार पर धीरे-धीरे बढ़ें।


अंतिम विचार

सॉलिंग एक प्रभावी और सरल तरीका है जो पारंपरिक विभाजन या सुझावों पर Apache Spark में स्क्वाइव को प्रबंधित करने के लिए है (SKEWED JOINसही ट्यूनिंग और निगरानी के साथ, यह तकनीक अत्यधिक विचलित डेटा सेट पर कार्य निष्पादन समय को काफी कम कर सकती है।


मूल रूप से 11 मई, 2025 को https://practical-software.com पर प्रकाशित किया गया था।

मूल में प्रकाशितhttps://practical-software.com11 मई 2025 को।https://practical-software.comhttps://practical-software.com

Trending Topics

blockchaincryptocurrencyhackernoon-top-storyprogrammingsoftware-developmenttechnologystartuphackernoon-booksBitcoinbooks