IBM Support

IBM Spectrum Conductor with Spark shared applications improve performance for SQL queries on large data sets

Technical Blog Post


Abstract

IBM Spectrum Conductor with Spark shared applications improve performance for SQL queries on large data sets

Body

Use case: Running SQL queries on large data sets

Clients need to run multi-user/multiple SQL-like queries, which include filtering, aggregations with user-defined keys, and so on, on large data sets (in terabyte (TB) sizes that are created from multiple files that are frequently updated (daily or even multiple times in a day). The requirement for performance is very strict due to the large TB data sizes coupled with complex query runtimes in seconds. For example, clients can request to have query runtimes under 10 seconds for hundreds of millions of rows.

Traditionally, databases are used to solve this sort of problems, however, they have scalability issues. When databases grow in size, you are limited to the number of machines and you have to maintain the database itself (perform installation, administrative tasks, backup etc.)

Spark SQL allows to overcome the data scalability limitation, but it takes a long time in Spark to load such data files and then query the files, since loading data from disks is slow by definition. A second issue is that each Spark job's query should load the same data, so for multiple queries of the same input data at any moment in time, many machines in the cluster may have in memory multiple copies of the same input data partitions existing. This in turn is very inefficient for overall cluster utilization, requires more memory, etc.

 

Our solution

The main solution is to have a first job (that we call a "loader" job) to put partitioned data in heap memory of executors processes. Then, multiple query jobs get “attached” to the first loader job, so that they can run query job's tasks inside the same executor process space where the corresponding data is loaded. The multiple query jobs can directly and simultaneously use the input data that is partitioned in memory.

The solution uses our Spark extension for shared applications. IBM Spectrum Conductor with Spark version 2.2.0 has what we call shared applications, where multiple users can submit Spark jobs to an application and use the same Resilient Distributed Datasets (RDDs) or corresponding data frames represented as temporary tables.

With shared applications, the first loader job creates a shared Spark context (that has a user-defined unique name) and then the Spark query job is submitted with the same named shared Spark context. These job’s driver is attached to the same loader's Spark driver process (deployed in 'cluster' mode and started on dynamically chosen host) and run its tasks in the same Spark executors processes, which (as for any shared applications) are still kept alive after the loader job had finished. The life-time of the executors can be defined by the user and it is controlled by Spark Master scheduler.

 

Advantages over other solutions

Spark SQL has advantages against other solutions for sharing data in memory (such as distributed in-memory caches and databases) where Spark applications provide database functionality on the fly. The loader is creating the temporary tables, which has most the general SQL database features without any external database integration. Then loader's programmer has full control not only on the queries but also on tables creations, schemas of the tables, etc. 

With IBM Conductor with Spark shared applications, query job’s tasks are run in the same process space where the input data was loaded, so the query doesn't have any overhead for accessing the data (that is, it doesn't need data serialization or de-serialization). Then general Spark scalability is applied in the solution to the data and queries. Clients, through IBM Spectrum Conductor with Spark configurations and programmatically can control how data is partitioned and then how many query tasks run in parallel, etc.

 

Shared application example

In the following example, the shared application’s first job loads a large file in comma separated format, which represent a table that is loaded and cached in memory. Then query jobs attach to the same application using the same named shared context and query the cached table with any complex aggregation functions.

In the tests, IBM Conductor with Spark used an input file that was 400 GB in size, which loaded into the table with 100 million rows.

 

Here is an example of the loader code:

object CacheLoader extends SharedContextJob {

  override def runJob(sc: SharedSparkContext, args: Array[String]): Any = {

    // creates Spark SQL session using passed shared context
    val spark = SparkSession.builder().getOrCreate()
 

    // read input file to data frame
    val dataDF = spark.read.format("csv").option("header", "true").load(args(0)+"/dataset.csv")

    // create named temporary table view for the data frame
    dataDF.createOrReplaceTempView("data")
    

    // explicitly cache the temporary table to memory
    spark.sqlContext.cacheTable("data")

    // count rows in the cached table - trigger action to load the table, etc.
    val data_counts = spark.sql("SELECT count(*) FROM data")
    System.out.println(" Total # rows in the cache: " + data_counts.collect().mkString(""))

    return ()
    // no SparkContext termination so other jobs will reuse it
  }
}

  • The loader job is submitted once with the following command:

<SIG-deployment-path>/bin/spark-submit --deploy-mode cluster --master spark:<SIG-URL> --class com.ibm.cws.examples.CacheLoader --conf spark.ego.shared.context.name=SampleCache samplecache_2.11-1.0.jar <dataset-path>

 

Now the table is loaded into memory and you can see it in the Spark UI.

image

Here is an example of the query code:

object CacheLoader extends SharedContextJob {

  override def runJob(sc: SharedSparkContext, args: Array[String]): Any = {

    // creates Spark SQL session using passed shared context
    val spark = SparkSession.builder().getOrCreate()
 
    // "attach" to the same table by name 
    val csdataDF = spark.sqlContext.table("data");
 
    //  then the table can be queried in Spark SQL while using any arguments to the query job
    queryDF = spark.sql("SELECT ... FROM data")

    // and query results either collected or saved
    System.out.println(" Query results: "  + queryDF.collect().mkString(""))


    return ()
    // no SparkContext termination so other jobs will reuse it
  }
}

  • Query jobs can be submitted multiple times with the following command:

<SIG-deployment-path>/bin/spark-submit --deploy-mode cluster --master spark:<SIG-URL> --class com.ibm.cws.examples.CacheQuery --conf spark.ego.shared.context.name=SampleCache samplecache_2.11-1.0.jar <query-arguments>

 

Performance results:

IBM Spectrum Conductor with Spark tested this example on a cluster that consisted of five hosts that each had 28 CPUs and 250 GB of RAM.

image
Loader job's performance

The file took about five minutes to load.

Query workload results:

6000 tasks across 2 stages
755 parallel tasks
14000 distinct rows
Query time: 6 -12 seconds 

 

IBM Spectrum Conductor with Spark is a good solution for working with SQL queries on large data sets. Stay tuned for a sample that will be posted on our IBM Bluemix DevOps Services page. For more information on our product, see IBM Knowledge Center.

[{"Business Unit":{"code":"BU059","label":"IBM Software w\/o TPS"},"Product":{"code":"SS4H63","label":"IBM Spectrum Conductor"},"Component":"","Platform":[{"code":"PF025","label":"Platform Independent"}],"Version":"","Edition":"","Line of Business":{"code":"LOB10","label":"Data and AI"}}]

UID

ibm16163689