Data Skew Problem in Spark

Problem description

Running joins of skewed data sets we can face stragglers during execution which slow down whole app.

Solution

Such slow tasks are mostly results of non-uniform data distribution between workers of spark app. The possible solution is to redistribute large data set between available workes in uniform manner. And in the same time, broadcast smaller data set to all workers pre-sorted by column for consolidation.

Solution of skew problem

  • uniform repartition for large data set (partition number depends from available resources)
  • broadcast for sorted smaller data set (could be shrinked to fewer attributes)

Data set could be repartitioned with

  • DataFrame default repartitioner
  • DataFrame repartitioner with special expression based on data columns
  • RDD custom repartitioner

Repartition strategy depends of data nature and domain specifics. Are preferred DataFrame/DataSets API.

Additionally, we can tweak solution with executors and cores number for better cluster resources utilization.

import org.apache.spark.sql.functions.broadcast
import utils.SparkActions
object DataSkew extends SparkActions
{
def initSpark (appName: String, dynamicAllocation: Boolean = false): SparkSession = {
SparkSession
.builder()
.appName(appName)
.config("spark.dynamicAllocation.enabled", dynamicAllocation)
.getOrCreate()
}
def activeExecutors(spark: SparkSession): Seq[String] = {
val allExecutors = spark.sparkContext.getExecutorMemoryStatus.map(_._1)
val driverHost: String = spark.sparkContext.getConf.get("spark.driver.host")
allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
}

def main(args: Array[String]): Unit =
{
if (args.size < 2) {
println ("Error: path to <users-dataset> and <deps-dataset> are required!")
System.exit(1)
}

val usersCSV = args(0)
val depsCSV = args(1)
private lazy val spark = initSpark(appName)// load data
val depsIn = spark.read.format("csv").option("header", "true").load(depsCSV)
val usersIn = spark.read.format("csv").option("header", "true").load(usersCSV)
// cluster tweak
val executors = activeExecutors(spark)
val multiFactor = config.getInt("executor.multiFactor")
// prepare data
import spark.implicits._
val deps = depsIn.sort('id, 'assigned_date, 'company_id, 'factory_id)
val users = usersIn.repartition(executors.size*multiFactor)
// process data
val joinExp = $"users.department_id" === $"deps.id" && $"users.date_of_birth" === $"deps.assigned_date" && ($"users.company_id" === $"deps.company_id" || $"users.company_id" === $"deps.factory_id")
val countExpr = users.as('users).join(broadcast(deps).as('deps), joinExp, "inner")
countExpr.explain()println (s"\n\nresult: = ${countExpr.count()}\n\n")}}

Looking to learn Big Data? Join our Training program Best Big Data Spark Training in Bangalore to learn the concepts in-depth according to industry standards.

--

--

Naveen - (Founder & Trainer @ NPN Training)

A software training institute which believes that technology has to be learnt under experienced practitioners — www.npntraining.com