Contents


Offloading your Informix data in Spark, Part 3

Complex analysis

Leverage data against other data sources

Comments

Content series:

This content is part # of # in the series: Offloading your Informix data in Spark, Part 3

Stay tuned for additional content in this series.

This content is part of the series:Offloading your Informix data in Spark, Part 3

Stay tuned for additional content in this series.

Where are we? Where are we going?

In the first part of this tutorial series, you learned how to get a single table from Informix to Spark, how to get the entire database in Spark, and how to build a specific dialect for Informix. The second part focused on basic analytics and gaining insight from the data.

This third part goes deeper into joins and more complex queries. You continue to use the Informix sample database, but you don't need any Informix knowledge.

More data!

You use these four tables:

  • Customer
  • Orders
  • Item (which is each line of the order)
  • Stock (which is the description of the product you sell)
the flow of the populated tables.
the flow of the populated tables.

The only trick to be aware of in the structure of this database is that the product (in the stock table) is uniquely identified by two keys: stock_num and manu_code. The manu_code field defines the manufacturer, which you are not going to use. Despite being called stock, this table does not give you the number of articles in your stock/warehouse. It's a bit confusing, but when you inherit from a legacy system, things can be disconcerting, no?

You might have designed this differently, but I think it is part of the challenge (and fun): doing analytics on a database designed in the late 80s. It is too easy to design a custom database for the purpose of a tutorial!

Unsold goods

Your second application digs into the data and looks for articles you have a reference but were never sold. Your sales people might want to get rid of those products through sales, return to manufacturer, and other ways. But we are IT, so let's not interfere too much with the creativity of our sales and purchasing people.

There is a lot of common code between this example, UnsoldProductsApp.java (in the net.jgp.labs.informix2spark.l300 package), and the previous example, TimeToShipApp.java (in the net.jgp.labs.informix2spark.l200 package). I like to have full examples, but in this case, because we are building on top of the previous example, I focus on the changes.

Keep the "data lake," which contains four tables:

List<String> tables = new ArrayList<>();
tables.add("customer");
tables.add("orders");
tables.add("items");
tables.add("stock");

Ask Spark to load them:

Map<String, Dataset<Row>> datalake = new HashMap<>();
    for (String table : tables) {
      System.out.print("Loading table [" + table + "] ... ");
      Dataset<Row> df = spark.read()
          .format("jdbc")
          .option("url", config.getJdbcUrl())
          .option("dbtable", table)
          .option("user", config.getUser())
          .option("password", config.getPassword())
          .option("driver", config.getDriver())
          .load();
      datalake.put(table, df);
      System.out.println("done");
    }

    System.out.println("We have loaded " + datalake.size() + " table(s) in our data lake");

So far, executing this program gives the following:

Loading table [customer] ... done
Loading table [orders] ... done
Loading table [items] ... done
Loading table [stock] ... done
We have loaded 4 table(s) in our data lake

You can have a quick peek at the data:

for (String table : tables) {
      Dataset<Row> df = datalake.get(table);
      System.out.println("Number of rows in " + table + ": " + df.count());
      df.show(10);
      df.printSchema();
    }

And you should get something like the following (limiting to the 4th dataset):

...
Number of rows in stock: 74
+---------+---------+---------------+----------+----+---------------+
|stock_num|manu_code|    description|unit_price|unit|     unit_descr|
+---------+---------+---------------+----------+----+---------------+
|        1|      HRO|baseball gloves|    250.00|case|10 gloves/case |
|        1|      HSK|baseball gloves|    800.00|case|10 gloves/case |
|        1|      SMT|baseball gloves|    450.00|case|10 gloves/case |
|        2|      HRO|baseball       |    126.00|case|24/case        |
|        3|      HSK|baseball bat   |    240.00|case|12/case        |
|        3|      SHM|baseball bat   |    280.00|case|12/case        |
|        4|      HSK|football       |    960.00|case|24/case        |
|        4|      HRO|football       |    480.00|case|24/case        |
|        5|      NRG|tennis racquet |     28.00|each|each           |
|        5|      SMT|tennis racquet |     25.00|each|each           |
+---------+---------+---------------+----------+----+---------------+
only showing top 10 rows

root
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)

You can assign each set to a dataframe (Dataset<Row>). It is easier to reuse them after.

Dataset<Row> ordersDf = datalake.get("orders");
    Dataset<Row> customerDf = datalake.get("customer");
    Dataset<Row> itemsDf = datalake.get("items");
    Dataset<Row> stockDf = datalake.get("stock");

Join to see more data

As you know, to get more insights, you have to join the tables:

Seq<String> stockColumns = new Set2<>("stock_num", "manu_code").toSeq();

    Dataset<Row> allDf = customerDf
        .join(
            ordersDf, 
            customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
            "full_outer")
        .join(
            itemsDf, 
            ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
            "full_outer")
        .join(stockDf, stockColumns, "full_outer")
        .drop(ordersDf.col("customer_num"))
        .drop(itemsDf.col("order_num"))
        .drop(stockDf.col("stock_num"))
        .drop(stockDf.col("manu_code"));

Because it is using method chaining, this transformation looks pretty complex. Method chaining is becoming more and more popular. Here, you join four tables using four columns, and you drop the duplicate columns.

Splitting the join

To get a better understanding of the join process, you can split the transformations in more atomic operations, like:

Dataset<Row> allDf = customerDf.join(
        ordersDf, 
        customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
        "full_outer");
    allDf = allDf.join(
        itemsDf, 
        ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
        "full_outer");
    Seq<String> stockColumns = new Set2<>("stock_num", "manu_code").toSeq();
    allDf = allDf.join(stockDf, stockColumns, "full_outer");
...

You can execute step by step. The step-by-step code is also in the GitHub repository; look for the UnsoldProductsSplitJoinApp class in the net.jgp.labs.informix2spark.l301 package.

You can now join the orders table with the customer tables on the customer_num column, performing an outer join.

Dataset<Row> allDf = customerDf.join(
        ordersDf, 
        customerDf.col("customer_num").equalTo(ordersDf.col("customer_num")), 
        "full_outer");
    allDf.printSchema();
    allDf.show();

Your dataframe's schema looks like:

root
 |-- customer_num: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- order_num: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)

Notice that you have two columns called customer_num. One is coming from the customer table (more precisely the customerDf data frame) on the left, and the other column is coming from the orders table (or the ordersDf data frame) on the right.

Joining dataframes is similar to joining tables with Informix and other RDBMSes: You join from the left to the right, specifying a condition (here we have equijoins) and a way to join (full_outer, left, and so on).

I simplified the output of the data by removing extra columns. As you look at the data, notice that there are 2 customer_num columns, yet they do not have the same values. Why?

+------------+--------------------+---------+----------+------------+----------+----------+
|customer_num|             company|order_num|order_date|customer_num|    po_num| paid_date|
+------------+--------------------+---------+----------+------------+----------+----------+
|         108|Quinn's Sports      |     null|      null|        null|      null|      null|
|         101|All Sports Supplies |     1002|2008-05-21|         101|9270      |2008-06-03|
|         115|Gold Medal Sports   |     1010|2008-06-17|         115|429Q      |2008-08-22|
|         126|Neelie's Discount Sp|     1022|2008-07-24|         126|W9925     |2008-09-02|
|         103|Phil's Sports       |     null|      null|        null|      null|      null|
|         128|Phoenix University  |     null|      null|        null|      null|      null|
|         122|The Sporting Life   |     1019|2008-07-11|         122|Z55709    |2008-08-06|
|         111|Sports Center       |     1009|2008-06-14|         111|4745      |2008-08-21|
|         120|Century Pro Shop    |     1017|2008-07-09|         120|DM354331  |      null|
|         117|Kids Korner         |     1007|2008-05-31|         117|278693    |      null|
|         117|Kids Korner         |     1012|2008-06-18|         117|278701    |      null|
|         112|Runners & Others    |     1006|2008-05-30|         112|Q13557    |      null|
|         127|Big Blue Bike Shop  |     1023|2008-07-24|         127|KF2961    |2008-08-22|
|         107|Athletic Supplies   |     null|      null|        null|      null|      null|
|         114|Sporting Place      |     null|      null|        null|      null|      null|
|         102|Sports Spot         |     null|      null|        null|      null|      null|
|         113|Sportstown          |     null|      null|        null|      null|      null|
|         121|City Sports         |     1018|2008-07-10|         121|S22942    |2008-08-06|
|         125|Total Fitness Sports|     null|      null|        null|      null|      null|
|         109|Sport Stuff         |     null|      null|        null|      null|      null|
|         105|Los Altos Sports    |     null|      null|        null|      null|      null|
|         110|AA Athletics        |     1008|2008-06-07|         110|LZ230     |2008-07-21|
|         110|AA Athletics        |     1015|2008-06-27|         110|MA003     |2008-08-31|
|         106|Watson & Son        |     1004|2008-05-22|         106|8006      |      null|
|         106|Watson & Son        |     1014|2008-06-25|         106|8052      |2008-07-10|
|         116|Olympic City        |     1005|2008-05-24|         116|2865      |2008-06-21|
|         123|Bay Sports          |     1020|2008-07-11|         123|W2286     |2008-09-20|
|         119|The Triathletes Club|     1016|2008-06-29|         119|PC6782    |      null|
|         131|JGP.net             |     null|      null|        null|      null|      null|
|         118|Blue Ribbon Sports  |     null|      null|        null|      null|      null|
|         124|Putnum's Putters    |     1021|2008-07-23|         124|C3288     |2008-08-22|
|         104|Play Ball!          |     1001|2008-05-20|         104|B77836    |2008-07-22|
|         104|Play Ball!          |     1003|2008-05-22|         104|B77890    |2008-06-14|
|         104|Play Ball!          |     1011|2008-06-18|         104|B77897    |2008-08-29|
|         104|Play Ball!          |     1013|2008-06-22|         104|B77930    |2008-07-31|
+------------+--------------------+---------+----------+------------+----------+----------+

The first customer_num column belongs to the left dataframe. Because you did a full outer join, you get all the data, even data from the customers who did not place an order. This is why some entries in the second customer_num column have null, like on line 4 or 8.

Later, if you want to reference this column, Spark might be confused by which customer_num column you are calling. Therefore, it makes sense to remove the column you do not want (for example, the second one).

To delete the column you do not want, call the drop() method on the dataframe. Pay attention to the argument: You specify which column you want to drop by using the col() method of the dataframe.

allDf = allDf.drop(ordersDf.col("customer_num"));

Then, you can join the two other tables.

The following join is simply joining the dataframe (allDf) with items on the order_num column, then dropping the extra column:

allDf = allDf.join(
        itemsDf, 
        ordersDf.col("order_num").equalTo(itemsDf.col("order_num")), 
        "full_outer");
    allDf = allDf.drop(itemsDf.col("order_num"));

Two columns link the last table: stock_num and manu_code. You need to assemble them in a sequence (Seq). The sequence and set used here are Scala objects, but you can use them easily in your Java code.

Seq<String> stockColumns = new Set2<>("stock_num",
                "manu_code").toSeq();

The syntax for joining is now even simpler:

allDf = allDf.join(stockDf, stockColumns, "full_outer");

And you can remove the two last columns:

allDf = allDf.drop(stockDf.col("stock_num"));
allDf = allDf.drop(stockDf.col("manu_code"));

This ends the detailed walkthrough of the atomic join operations versus the method chaining. If you have any question, feel free to use the comments section at the bottom of this tutorial.

Exploiting the full dataframe

Now you have all your data in a single dataframe! You can start drilling through it to extract value.

A good reflex is to cache your data, so it does not have to be recomputed every time you need it.

allDf.cache();

You can also print the schema or see a few lines of data.

allDf.printSchema();
allDf.show(5);

In the schema, you can see that there are not any duplicate rows.

root
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)
 |-- stock_num: integer (nullable = true)
 |-- manu_code: string (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- order_num: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- customer_num: integer (nullable = true)
 |-- ship_instruct: string (nullable = true)
 |-- backlog: string (nullable = true)
 |-- po_num: string (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_weight: decimal(8,2) (nullable = true)
 |-- ship_charge: decimal(6,2) (nullable = true)
 |-- paid_date: date (nullable = true)
 |-- item_num: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: decimal(8,2) (nullable = true)
 |-- description: string (nullable = true)
 |-- unit_price: decimal(6,2) (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_descr: string (nullable = true)

And the data is indeed wide as there are now all those columns resulting from the merge.
(line numbers useful here)
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
|stock_num|manu_code|customer_num|          fname|          lname|             company|            address1|address2|           city|state|zipcode|             phone|order_num|order_date|customer_num|       ship_instruct|backlog|    po_num| ship_date|ship_weight|ship_charge| paid_date|item_num|quantity|total_price|    description|unit_price|unit|     unit_descr|
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
|      101|      SHM|         119|Bob            |Shorter        |The Triathletes Club|2405 Kings Highway  |    null|Cherry Hill    |   NJ|  08002|609-663-6079      |     1016|2008-06-29|         119|delivery entrance...|      n|PC6782    |2008-07-12|      35.00|      11.80|      null|       1|       2|     136.00|bicycle tires  |     68.00|box |4/box          |
|      309|      SHM|        null|           null|           null|                null|                null|    null|           null| null|   null|              null|     null|      null|        null|                null|   null|      null|      null|       null|       null|      null|    null|    null|       null|ear drops      |     40.00|case|20/case        |
|      109|      PRC|         119|Bob            |Shorter        |The Triathletes Club|2405 Kings Highway  |    null|Cherry Hill    |   NJ|  08002|609-663-6079      |     1016|2008-06-29|         119|delivery entrance...|      n|PC6782    |2008-07-12|      35.00|      11.80|      null|       2|       3|      90.00|pedal binding  |     30.00|case|6 pairs/case   |
|        6|      ANZ|         116|Jean           |Parmelee       |Olympic City        |1104 Spinosa Drive  |    null|Mountain View  |   CA|  94040|415-534-8822      |     1005|2008-05-24|         116|call before deliv...|      n|2865      |2008-06-09|      80.80|      16.20|2008-06-21|       4|       1|      48.00|tennis ball    |     48.00|case|24 cans/case   |
|        6|      ANZ|         115|Alfred         |Grant          |Gold Medal Sports   |776 Gary Avenue     |    null|Menlo Park     |   CA|  94025|415-356-1123      |     1010|2008-06-17|         115|deliver 776 King ...|      n|429Q      |2008-06-29|      40.60|      12.30|2008-08-22|       2|       1|      48.00|tennis ball    |     48.00|case|24 cans/case   |
+---------+---------+------------+---------------+---------------+--------------------+--------------------+--------+---------------+-----+-------+------------------+---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+--------+--------+-----------+---------------+----------+----+---------------+
only showing top 5 rows

Empty your warehouse

Your exercise is to find the products you never sold.

Start by removing the columns that you do not need. In the previous example, you removed one column at a time. You can be a little more efficient now that you are a Spark expert, no? In the same vein, as you are mastering method chaining, you can now do this operation in one call, right?

First, create a list of columns.

List<String> columnsToDrop = new ArrayList<>();
    columnsToDrop.add("zipcode");
    columnsToDrop.add("phone");
    columnsToDrop.add("customer_num");
    columnsToDrop.add("fname");
    columnsToDrop.add("lname");
    columnsToDrop.add("company");
    columnsToDrop.add("address1");
    columnsToDrop.add("address2");
    columnsToDrop.add("city");
    columnsToDrop.add("state");
    columnsToDrop.add("order_num");
    columnsToDrop.add("order_date");
    columnsToDrop.add("customer_num");
    columnsToDrop.add("ship_instruct");
    columnsToDrop.add("backlog");
    columnsToDrop.add("po_num");
    columnsToDrop.add("ship_date");
    columnsToDrop.add("ship_weight");
    columnsToDrop.add("ship_charge");
    columnsToDrop.add("paid_date");
    columnsToDrop.add("time_to_ship");
    columnsToDrop.add("item_num");
    columnsToDrop.add("quantity");
    columnsToDrop.add("total_price");

Then in one operation:

  1. Drop all the columns. Notice how you can convert a Java list to a Scala sequence to use with drop().
  2. Filter using Spark SQL syntax.
Dataset<Row> unsoldProductsDf = allDf
        .drop(JavaConversions.asScalaBuffer(columnsToDrop))
        .filter("order_num IS NULL")
        .filter("description IS NOT NULL");
    unsoldProductsDf.cache();
    System.out.println("We have " + unsoldProductsDf.count()
        + " unsold references in our warehouse, time to do something!");
    unsoldProductsDf.show();

You now know how many product references there are in your warehouse that were never sold:

There are 35 unsold references in this warehouse, it's time to do something!
+---------+---------+---------------+----------+----+---------------+
|stock_num|manu_code|    description|unit_price|unit|     unit_descr|
+---------+---------+---------------+----------+----+---------------+
|      309|      SHM|ear drops      |     40.00|case|20/case        |
|      312|      SHM|racer goggles  |     96.00|box |12/box         |
|      203|      NKL|irons/wedge    |    670.00|case|2 sets/case    |
|      102|      PRC|bicycle brakes |    480.00|case|4 sets/case    |
|      302|      HRO|ice pack       |      4.50|each|each           |
|        3|      SHM|baseball bat   |    280.00|case|12/case        |
|      205|      HRO|3 golf balls   |    312.00|case|24/case        |
|      313|      ANZ|swim cap       |     60.00|box |12/box         |
|      110|      ANZ|helmet         |    244.00|case|4/case         |
|      301|      HRO|running shoes  |     42.50|each|each           |
|      108|      SHM|crankset       |     45.00|each|each           |
|      110|      HRO|helmet         |    260.00|case|4/case         |
|      205|      NKL|3 golf balls   |    312.00|case|24/case        |
|      311|      SHM|water gloves   |     48.00|box |4 pairs/box    |
|      310|      SHM|kick board     |     80.00|case|10/case        |
|      303|      KAR|socks          |     36.00|box |24 pairs/box   |
|      310|      ANZ|kick board     |     84.00|case|12/case        |
|      101|      PRC|bicycle tires  |     88.00|box |4/box          |
|      109|      SHM|pedal binding  |    200.00|case|4 pairs/case   |
|      312|      HRO|racer goggles  |     72.00|box |12/box         |
+---------+---------+---------------+----------+----+---------------+
only showing top 20 rows

There are 35 unsold references in this warehouse, it's time to do something!

You can now go to the sales and purchasing departments with this information.

What you learned

In this slightly longer article of this series, you learned how to join data in Spark, drop some columns, cache a dataframe, and perform analytics on the resulting dataframe using SQL. With this information, you can provide insights to your purchasing departments on products that were never sold and to your sales people on what products to get rid of!

Go further


Downloadable resources


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management, Open source
ArticleID=1049314
ArticleTitle=Offloading your Informix data in Spark, Part 3: Complex analysis
publish-date=09202017