Contents


Offloading your Informix data in Spark, Part 2

Basic analysis of your data

Leverage data against other data sources

Comments

Content series:

This content is part # of # in the series: Offloading your Informix data in Spark, Part 2

Stay tuned for additional content in this series.

This content is part of the series:Offloading your Informix data in Spark, Part 2

Stay tuned for additional content in this series.

In Part 1 of this tutorial series, you learned how to move a single table from Informix® to Spark, how to put the entire database in Spark, and how to build a specific dialect for Informix.

The focus of this second part is on analytics and gaining basic insight from the data. Absolutely no knowledge of Informix is required.

Enterprise ready

You are using the Informix stores demo database, which mimics an enterprise database. The company is a sports equipment wholesaler that sells to mom and pop stores. This tutorial uses only one table: the orders table. The schema of the orders table after it is imported in Apache Spark is shown below.

orders table schema

A small note on the database diagram: in the original database, there are primary keys, foreign keys, constraints, native data types (like SERIAL). All of this is translated in Spark's data type and simplified structure; there is no need for indices, constraints, or keys.

Time to ship

Your first exercise is to measure how much time the warehouse needs between the moment the customer places the order and the moment the order ships.

Granted, using Spark for this operation is a bit like using a nuclear power station to fuel your cellphone, but you will discover progressively in the world of Spark analytics!

You can download the code from GitHub.

Abstract and walk through the code

Be prepared. The first import might seem a bit unusual because Spark has a lot of predefined functions. The list of functions in the org.apache.spark.sql.functions package is a helpful reference, if needed.

In the following code, replace datediff with * to import all of the functions. But, for this tutorial, only datediff is needed. You probably already have an idea of what it does.

import static org.apache.spark.sql.functions.datediff;

The rest of the imported classes are similar to the ones used in Part 1. I like to share the imports as you walk through the code. If I didn't, it could be very confusing to reference classes with homonymic names.

import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;

Time to start the application:

public class TimeToShipApp {

  public static void main(String[] args) {
    TimeToShipApp app = new TimeToShipApp();
    app.start();
  }

  private void start() {

Start a session with Spark's local mode:

SparkSession spark = SparkSession
        .builder()
        .appName("Time to Ship")
        .master("local")
        .getOrCreate();

Even though this example does not require it, it is good practice to register the Informix dialect:

JdbcDialect dialect = new InformixJdbcDialect();
    JdbcDialects.registerDialect(dialect);

You get your Informix parameters:

 Config config = ConfigManager.getConfig(K.INFORMIX);

Create a list of all the tables that you want to work with (and add to your data lake). It might seem like overkill in this example because we only use one table, but the same idea is reused in other examples (and repetition helps you learn).

List<String> tables = new ArrayList<>();
    tables.add("orders");

As you know, a map contains values indexed by key. Each dataframe coming from Spark is stored in a map, whose key is the name of the table and value is the dataframe with your data.

Map<String, Dataset<Row>> datalake = new HashMap<>();
    for (String table : tables) {
      System.out.print("Loading table [" + table
          + "] ... ");
      Dataset<Row> df = spark.read()
          .format("jdbc")
          .option("url", config.getJdbcUrl())
          .option("dbtable", table)
          .option("user", config.getUser())
          .option("password", config.getPassword())
          .option("driver", config.getDriver())
          .load();

      datalake.put(table, df);
      System.out.println("done.");
    }

Up to this point, this execution's result is:

Loading table [orders] ... done.
We have loaded 1 table(s) in our data lake.

You are now ready to exploit the data. Use the ordersDF dataframe:

 Dataset<Row> ordersDf = datalake.get("orders");

Create a new column using the withColumn() method. This new column is the result of the datediff() Spark SQL function. This function takes 2 parameters: the end date and the start date.

ordersDf = ordersDf.withColumn(
        "time_to_ship", 
        datediff(ordersDf.col("ship_date"), ordersDf.col("order_date")));

Note that:

datediff(ordersDf.col("ship_date"), ordersDf.col("order_date"))

is not the same as:

datediff(ordersDf.col("order_date"), ordersDf.col("ship_date"))

What you're really doing is subtraction. So, put the biggest number (or more recent date) first; otherwise, you will have a negative number.

At this point, if you print the schema of the dataframe, you will see:

 ordersDf.printSchema();
root
 |-- order_num: integer (nullable = false)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = false)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)
 |-- time_to_ship: integer (nullable = true)

See the column that you added? It's called time_to_ship, whose type is an integer (and it can be null).

And if you look at the data:

ordersDf.show(10);
    System.out.println("We have " + ordersDf.count() + " orders");
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|time_to_ship|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|     1001|2008-05-20|         104|express          ...|      n|B77836    |2008-06-01|      20.40|      10.00|2008-07-22|          12|
|     1002|2008-05-21|         101|PO on box; delive...|      n|9270      |2008-05-26|      50.60|      15.30|2008-06-03|           5|
|     1003|2008-05-22|         104|express          ...|      n|B77890    |2008-05-23|      35.60|      10.80|2008-06-14|           1|
|     1004|2008-05-22|         106|ring bell twice  ...|      y|8006      |2008-05-30|      95.80|      19.20|      null|           8|
|     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|          16|
|     1006|2008-05-30|         112|after 10 am      ...|      y|Q13557    |      null|      70.80|      14.20|      null|        null|
|     1007|2008-05-31|         117|                null|      n|278693    |2008-06-05|     125.90|      25.20|      null|           5|
|     1008|2008-06-07|         110|closed Monday    ...|      y|LZ230     |2008-07-06|      45.60|      13.80|2008-07-21|          29|
|     1009|2008-06-14|         111|next door to groc...|      n|4745      |2008-06-21|      20.40|      10.00|2008-08-21|           7|
|     1010|2008-06-17|         115|deliver 776 King ...|      n|429Q      |2008-06-29|      40.60|      12.30|2008-08-22|          12|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
only showing top 10 rows
We have 23 orders

You can see that order #1006 has null in the time_to_ship column because it has not shipped yet. When you analyze the time to ship, you realize it's not Amazon Prime!

Next, you'll want to get rid of the null values, but the dataframe is immutable, which means the data cannot be changed. To get around this, create a new dataframe and exclude the values that you do not like. Nope, a dataframe is not an SQL table, there is no DELETE FROM.

 Dataset<Row> ordersDf2 = ordersDf.filter(
        "time_to_ship IS NOT NULL");
    ordersDf2.printSchema();
    ordersDf2.show(5);
    System.out.println("We have " + ordersDf2.count()
        + " delivered orders");

  }
}

And the output is:

+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|time_to_ship|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
|     1001|2008-05-20|         104|express          ...|      n|B77836    |2008-06-01|      20.40|      10.00|2008-07-22|          12|
|     1002|2008-05-21|         101|PO on box; delive...|      n|9270      |2008-05-26|      50.60|      15.30|2008-06-03|           5|
|     1003|2008-05-22|         104|express          ...|      n|B77890    |2008-05-23|      35.60|      10.80|2008-06-14|           1|
|     1004|2008-05-22|         106|ring bell twice  ...|      y|8006      |2008-05-30|      95.80|      19.20|      null|           8|
|     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|          16|
+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
only showing top 5 rows
We have 22 delivered orders

But wait, what exactly happened?

Spark is based on a concept of lazy evaluation. I think of this as when you are asking a child to do something: clean your room, put the laundry in the basket, put your books on the shelf... Usually those transformations do not happen until you say something like "now" (and sometimes with a slightly louder voice). This is the action signal that they are waiting for to get started. It is exactly the same for Spark.

The guy in charge here is called Catalyst. Catalyst is the optimizer that finds the best way to perform the transformations. Once more, think of this as a kid who needs to remove his clothes from the shelf before he can put the books back. With version 2.2, Spark added a cost-based optimizer to Catalyst, an important contribution from IBM.

What is a dataframe?

For Apache Spark, a dataframe is almost like a table in the relational databases world. Well, almost...

  • The data is immutable, meaning it does not change. Using SQL, you can easily UPDATE data and INSERT new rows in your table. The dataframe is different. To perform these actions, you have to create a new dataframe each time. This is something you're going to do often.
  • Like a table, a dataframe has metadata: columns have names, types, and whether they require a value (a nullable property). Unlike a table, you won't find indices, foreign keys, composite keys, or triggers.
  • Because of Spark's analytical nature, it is easy to add a column and perform transformations, which can be tricky with RDBMS tables.

The dataframe API is what developers use. You will visit the Class Dataset page more than Facebook! Directly from this API, you can access the column information, perform joins and unions, and more. (Part 3 of this article series will explore this API.)

In Java, a dataframe is implemented as a Dataset<Row>. The storage relies on partitions in the cluster. Spark offers a lower level API so you can repartition and understand the execution plan, but this is outside the scope of this article. Since Spark 2, storage is handled by a component called Tungsten, which is more efficient than the Java/Scala objects.

Figure 2 shows the API, implementation, and storage of a Spark dataframe.

API, Implementation, and Storage of a Spark                 dataframe.
API, Implementation, and Storage of a Spark dataframe.

What you learned

Hopefully, you now have a better understanding of how Spark stores data and how to manipulate it through the dataframe API. You also learned about built-in functions and where to find them. And most of all, you now have the fundamentals on which to build more complex analytics and machine learning… and that's what's next in this tutorial series.

Go further


Downloadable resources


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management, Open source
ArticleID=1048727
ArticleTitle=Offloading your Informix data in Spark, Part 2: Basic analysis of your data
publish-date=08222017