যখন আপনি বড় ডাটা সেট সঙ্গে কাজApache Sparkএকটি কার্যকারিতা সমস্যা হচ্ছেdata skewএটি ঘটে যখন কয়েকটি চাবিdominateতথ্য বিতরণ, যাতেunevenপৃষ্ঠাগুলি এবং ধীরে ধীরে অনুরোধগুলি. এটি প্রধানত অপারেশনগুলির সময় ঘটে যাshufflingযেমনjoinsএমনকি নিয়মিতaggregations.
বিচ্যুতি কমাতে একটি কার্যকর উপায়salting, যা কৃত্রিমভাবে কঠোর কীগুলি একাধিক পার্টিশনগুলিতে ছড়িয়ে দেয়. এই পোস্টে, আমি আপনাকে একটি বাস্তব উদাহরণ দিয়ে এই মাধ্যমে পরিচালনা করব।
কিভাবে সাল্টিং ডেটা স্কাইভ সমস্যা সমাধান করে
যোগ করলে Arandomlyসংযুক্ত চাবি থেকে সংযুক্ত চাবি গঠিত সংখ্যা এবং তারপর সংযুক্ত চাবির উপর সংযুক্ত করে, আমরা বড় চাবি আরও সমানভাবে বিতরণ করতে পারি. এটি ডেটা বিতরণকে আরও সমান করে তোলে এবং বেশিরভাগ ডেটা এক শ্রমিকের কাছে পাঠানোর পরিবর্তে আরো শ্রমিকদের উপর লোড ছড়িয়ে দেয় এবং অন্যদের বাদ দেয়।
সালমানের সুবিধা
-
Reduced Skew: Spreads data evenly across partitions, preventing a few workers overload and improves utilization.
-
Improved Performance: Speeds up joins and aggregations by balancing the workload.
-
Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large, uneven partitions.
কখন সালমান ব্যবহার করবেন
ডেটা ভেঙে যাওয়ার কারণে দীর্ঘ shuffle times বা executor ব্যর্থতা লক্ষ্য করলে সাল্টিং ব্যবহার করুন. এটি রিয়েল টাইম স্ট্রিমিং অ্যাপ্লিকেশনগুলিতে সহায়ক যেখানে পার্টিশনিং ডেটা প্রক্রিয়াকরণের দক্ষতা প্রভাবিত করে, অথবা যখন বেশিরভাগ কর্মী নিখোঁজ হয় যখন কয়েকটি চলমান অবস্থায় আটকে আছে.
Scala এর উদাহরণ
আমরা একটি ডাটা সঙ্গে কিছু ডাটা তৈরি করিunbalancedআমরা অনুমান করতে পারি যে আমাদের সংযুক্ত করতে হবে দুটি ডেটা সেট: একটি একটি বড় ডেটা সেট এবং অন্যটি একটি ছোট ডেটা সেট।
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Simulated large dataset with skew
val largeDF = Seq(
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
).toDF("customer_id", "transaction")
// Small dataset
val smallDF = Seq(
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
).toDF("customer_id", "name")
আমরা যে বড় ডেটা সেটগুলি ব্যবহার করি তাতে লোডিং কলাম যোগ করি।randomizationবড় কী এর মানগুলি ছোট পার্টিশনগুলিতে ছড়িয়ে দিতে
// Step 1: create a salting key in the large dataset
val numBuckets = 3
val saltedLargeDF = largeDF.
withColumn("salt", (rand() * numBuckets).cast("int")).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedLargeDF.show()
+-----------+-----------+----+------------------+
|customer_id|transaction|salt|salted_customer_id|
+-----------+-----------+----+------------------+
| 1| txn1| 1| 1_1|
| 1| txn2| 1| 1_1|
| 1| txn3| 2| 1_2|
| 2| txn4| 2| 2_2|
| 3| txn5| 0| 3_0|
+-----------+-----------+----+------------------+
বড় ডেটা সেটগুলিতে সম্ভাব্য সমস্ত র্যান্ডম সলিড কীগুলি কভারেজ করার জন্য, আমাদের নিশ্চিত করতে হবেexplodeসমস্ত সম্ভাব্য লবণযুক্ত মানগুলির সাথে ছোট ডেটা সেট
// Step 2: Explode rows in smallDF for possible salted keys
val saltedSmallDF = (0 until numBuckets).toDF("salt").
crossJoin(smallDF).
withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
saltedSmallDF.show()
+----+-----------+------+------------------+
|salt|customer_id| name|salted_customer_id|
+----+-----------+------+------------------+
| 0| 1| Ahmed| 1_0|
| 1| 1| Ahmed| 1_1|
| 2| 1| Ahmed| 1_2|
| 0| 2| Ali| 2_0|
| 1| 2| Ali| 2_1|
| 2| 2| Ali| 2_2|
| 0| 3|Hassan| 3_0|
| 1| 3|Hassan| 3_1|
| 2| 3|Hassan| 3_2|
+----+-----------+------+------------------+
এখন আমরা সহজেই দুটি ডাটা সেট সংযুক্ত করতে পারি
// Step 3: Perform salted join
val joinedDF = saltedLargeDF.
join(saltedSmallDF, Seq("salted_customer_id", "customer_id"), "inner").
select("customer_id", "transaction", "name")
joinedDF.show()
+-----------+-----------+------+
|customer_id|transaction| name|
+-----------+-----------+------+
| 1| txn2| Ahmed|
| 1| txn1| Ahmed|
| 1| txn3| Ahmed|
| 2| txn4| Ali|
| 3| txn5|Hassan|
+-----------+-----------+------+
Python এর উদাহরণ
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, lit, concat
from pyspark.sql.types import IntegerType
# Simulated large dataset with skew
largeDF = spark.createDataFrame([
(1, "txn1"), (1, "txn2"), (1, "txn3"), (2, "txn4"), (3, "txn5")
], ["customer_id", "transaction"])
# Small dataset
smallDF = spark.createDataFrame([
(1, "Ahmed"), (2, "Ali"), (3, "Hassan")
], ["customer_id", "name"])
# Step 1: create a salting key in the large dataset
numBuckets = 3
saltedLargeDF = largeDF.withColumn("salt", (rand() * numBuckets).cast(IntegerType())) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 2: Explode rows in smallDF for possible salted keys
salt_range = spark.range(0, numBuckets).withColumnRenamed("id", "salt")
saltedSmallDF = salt_range.crossJoin(smallDF) \
.withColumn("salted_customer_id", concat(col("customer_id"), lit("_"), col("salt")))
# Step 3: Perform salted join
joinedDF = saltedLargeDF.join(
saltedSmallDF,
on=["salted_customer_id", "customer_id"],
how="inner"
).select("customer_id", "transaction", "name")
নোট
- এই কোডটি spark.range(...) ব্যবহার করে Scala এর (0 থেকে numBuckets).toDF ("সাল্ট") অনুসরণ করে।
- কলাম বাক্যগুলি col (...), lit (...), এবং concat ( ...) ব্যবহার করে পরিচালিত হয়।
- অন্তর্নিহিত অ্যাকাউন্ট ব্যবহার করে .cast (IntegerType())।
ট্যাগ টাইপ: নির্বাচননমুনা
নমুনা
- আপনি যদি numBuckets = 100 সেট করে থাকেন, তবে প্রতিটি কী 100 সাব-পার্টিফিকেশনগুলিতে বিভক্ত করা যেতে পারে। তবে, সাবধান থাকুন কারণ অতিরিক্ত বকেট ব্যবহার করে পারফরম্যান্স কমে যেতে পারে, বিশেষ করে ছোট ডেটা সহ কীগুলির জন্য।
- If you know how to identify the skewed keys, then you can apply the salting for those keys only, and set the salting for other keys as literal
0
, e.x.-
// Step 1: create a salting key in the large dataset val numBuckets = 3 val saltedLargeDF = largeDF. withColumn("salt", when($"customer_id" === 1, (rand() * numBuckets).cast("int")).otherwise(lit(0))). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt")) // Step 2: Explode rows in smallDF for possible salted keys val saltedSmallDF = (0 until numBuckets).toDF("salt"). crossJoin(smallDF.filter($"customer_id" === 1)). select("customer_id", "salt", "name"). union(smallDF.filter($"customer_id" =!= 1).withColumn("salt", lit(0)).select("customer_id", "salt", "name")). withColumn("salted_customer_id", concat($"customer_id", lit("_"), $"salt"))
-
Rule of Thumb:ছোট (উদাহরণস্বরূপ, 10-20) শুরু করুন এবং পর্যবেক্ষিত shuffle আকার এবং task runtime উপর ভিত্তি করে ধীরে ধীরে বৃদ্ধি করুন।
চূড়ান্ত চিন্তা
স্লিপিং একটি কার্যকরী এবং সহজ পদ্ধতি Apache Spark যখন ঐতিহ্যগত বিভাজন বা টিপস (SKEWED JOIN
সঠিক সেটিং এবং নজরদারির সাথে, এই প্রযুক্তিটি ব্যাপকভাবে উচ্চতর বিচ্ছিন্ন ডেটা সেটগুলিতে কাজ সম্পাদন সময় হ্রাস করতে পারে।
মূলত 11 মে, 2025 এ https://practical-software.com এ প্রকাশিত হয়েছে।
প্রাথমিকভাবে প্রকাশিত হয়েছে at