Offloading your Informix data in Spark, Part 4

Leverage data against other data sources

Add external data sources, bring more insights!

Content series:

This content is part # of # in the series: Offloading your Informix data in Spark, Part 4

Stay tuned for additional content in this series.

This content is part of the series:Offloading your Informix data in Spark, Part 4

Stay tuned for additional content in this series.

Where are we? Where are we going?

In the first three parts of this series, you learned how to:

You could argue that everything you have done so far is doable with a traditional relational database. And you would be completely right. Now it's time to discover some of the power of Spark by adding two other data sources.

More data why?

Yesterday morning, as you were celebrating International Coffee Day, you overheard the vice president of sales talking to the new sales guy. They were discussing sales zones, where people make money, and the retail shops in the area. As a quick reminder, your company sells sports equipment in wholesale to retail shops around the United States. Although you did not fully understand their sales lingo, you were pretty upset with their knowledge of data. You had to jump in: "I can analyze the sales data, cross it with median revenue per ZIP code and the population size for this area, you can then see which ZIP code needs more attention."

You knew you would make a few new friends with such statements. Now you need to deliver.

Even more data!

Your first reflex of visiting IBM's developerWorks is good. Nevertheless, you need two additional datasets: the population per ZIP code and the revenue per ZIP code.

For those datasets, rely on two U.S. administrations: the United States Census Bureau and the Internal Revenue Service (IRS). For purposes of this tutorial, a curated version of the Census Bureau is used because the raw data is a bit challenging to understand.

Experiment with the IRS

With the IRS data, the first exercise is to find the number of households above a pre-defined revenue of $75,000/year in your sales area using the original IRS dataset.

You can download the code and data from GitHub. For this part, the labs are in the net.jgp.labs.informix2spark.l4xx package. The data is in the data directory. The IRS provides a technical explanation of the dataset. It is named 14zpdoc.doc and available in the repository's data directory.

Basically, each area is defined by its ZIP code. Each area is divided into six adjusted gross income (AGI) groups, based on revenue brackets:

  1. Below $25,000
  2. Between $25,000 and below $50,000
  3. Between $50,000 and below $75,000
  4. Between $75,000 and below $100,000
  5. Between $100,000 and below $200,000
  6. More than $200,000

The target for this tutorial is the three upper income groups.

All the code is provided to give you a global view.

package net.jgp.labs.informix2spark.l400;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class HouseholdsAboveMedianRevenuePerZipApp {

  public static void main(String[] args) {
    HouseholdsAboveMedianRevenuePerZipApp app = new HouseholdsAboveMedianRevenuePerZipApp();

Run the application by passing the ZIP code to analyze.


  private void start(int zip) {

You create a spark session in local mode.

    SparkSession spark = SparkSession
        .appName("Number of households in your ZIP Code")

It is simple to load CSV files in Spark: use the format method with the csv argument. The CSV loader accepts many options (as you know, CSV can be tricky). The IRS files follow a fairly simple scenario, where you use two options:

  • The first row in the file is a header
  • Have Spark infer the schema so it can determine what datatypes you will use.

If you don't have Spark infer the schema, all columns of this dataframe will be considered strings. CSV parsing has many more options, and each option is explained in my blog.

Finally, note that the filename uses a wildcard. Spark loads all the files matching 14zpallagi*.csv in the data directory. You can use a regular expression here:

  • 14zpallagi*.csv reads all the files starting with 14zpallagi and finishing with .csv,
  • 14zpallagi-part[1-3].csv reads: 14zpallagi-part1.csv, 14zpallagi-part2.csv, and 14zpallagi-part3.csv.
    String filename = "data/14zpallagi*.csv";
    Dataset<Row> df = spark
        .option("inferSchema", "true")
        .option("header", "true")

You can now inspect the loaded data:


First look at the long schema. It is long because the IRS generously shares 127 columns (I have removed a lot of them).

 |-- STATEFIPS: integer (nullable = true)
 |-- STATE: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- agi_stub: integer (nullable = true)
 |-- N1: double (nullable = true)
 |-- mars1: double (nullable = true)
 |-- MARS2: double (nullable = true)
 |-- MARS4: double (nullable = true)
 |-- PREP: double (nullable = true)
 |-- N2: double (nullable = true)
 |-- NUMDEP: double (nullable = true)
 |-- TOTAL_VITA: double (nullable = true)
 |-- VITA: double (nullable = true)
 |-- TCE: double (nullable = true)
 |-- A00100: double (nullable = true)
 |-- N02650: double (nullable = true)

Look at a sample of the data. The sample() method takes up to three parameters, replacement - or record independence, a fraction, and (optionally) a seed.

    df.sample(true, 0.01, 4589).show(2);
    System.out.println("Dataframe has " + df.count() + " rows and " + df.columns().length
        + " columns.");

I limited the output to two rows!

|STATEFIPS|STATE|zipcode|agi_stub|     N1| mars1|  MARS2|MARS4|   PREP|      N2| NUMDEP|TOTAL_VITA|VITA|TCE|     A00100| N02650|     A02650| N00200|     A00200| N00300|  A00300| N00600|  A00600| N00650|  A00650| N00700| A00700| N00900|  A00900| N01000|   A01000|N01400|  A01400| N01700|  A01700|  SCHF|N02300|A02300| N02500|  A02500| N26270|   A26270| N02900|  A02900|N03220|A03220|N03300| A03300|N03270| A03270|N03150| A03150|N03210|A03210|N03230|A03230|N03240| A03240| N04470|   A04470|     A00101| N18425|  A18425|N18450|A18450| N18500|  A18500| N18300|  A18300| N19300|  A19300| N19700|  A19700| N04800|     A04800| N05800|   A05800| N09600| A09600|N05780|A05780| N07100| A07100| N07300| A07300|N07180|A07180|N07230|A07230|N07240|A07240|N07220|A07220|N07260|A07260| N09400|  A09400|N85770|A85770|N85775|A85775|N09750|A09750| N10600|   A10600|N59660|A59660|N59720|A59720|N11070|A11070|N10960|A10960|N11560|A11560| N06500|   A06500| N10300|   A10300| N85530| A85530| N85300| A85300| N11901|  A11901| N11902|  A11902|
|        1|   AL|      0|       6|51270.0|4160.0|45860.0|910.0|39250.0|148020.0|50820.0|      80.0|80.0|0.0|2.2652783E7|51270.0|2.3073614E7|44660.0|1.1051815E7|40610.0|264670.0|32640.0|791136.0|31530.0|620013.0|29090.0|96411.0|13720.0|855431.0|31500.0|2331256.0|9340.0|444369.0|14820.0|833058.0|2630.0| 520.0|1842.0|11220.0|292923.0|22620.0|4633716.0|22380.0|420831.0|1080.0| 267.0|2770.0|86011.0|8520.0|92498.0|1140.0|10827.0|   0.0|   0.0|   0.0|   0.0|1660.0|61679.0|47840.0|2499208.0|2.1437993E7|45930.0|761404.0|1440.0|4867.0|45630.0|168515.0|47820.0|962132.0|36310.0|456194.0|45490.0|920293.0|51240.0|1.9692443E7|51230.0|5274998.0|15590.0|77387.0|   0.0|   0.0|21020.0|54852.0|15290.0|25542.0|3230.0|1747.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|1410.0| 459.0|14770.0|124954.0|   0.0|   0.0|   0.0|   0.0| 100.0| 219.0|50690.0|5185930.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|51170.0|5204318.0|51230.0|5472274.0|19090.0|33930.0|25450.0|89886.0|28340.0|774418.0|15560.0|243494.0|
|        1|   AL|  35004|       5|  590.0|  40.0|  530.0|  0.0|  300.0|  1660.0|  550.0|       0.0| 0.0|0.0|    74554.0|  590.0|    75493.0|  560.0|    64835.0|  260.0|   150.0|  140.0|   236.0|  130.0|   149.0|  350.0|  310.0|  100.0|  1671.0|  100.0|    364.0|  60.0|  1459.0|  150.0|  3991.0|   0.0|   0.0|   0.0|   90.0|  1796.0|   40.0|   1408.0|  240.0|   939.0|  30.0|   8.0|   0.0|    0.0|   0.0|    0.0|   0.0|    0.0| 130.0| 132.0|   0.0|   0.0|   0.0|    0.0|  450.0|   9296.0|    57663.0|  430.0|  2268.0|   0.0|   0.0|  420.0|   353.0|  450.0|  2766.0|  400.0|  2900.0|  420.0|  2321.0|  590.0|    56931.0|  590.0|   9612.0|    0.0|    0.0|   0.0|   0.0|  310.0|  448.0|   40.0|    3.0| 120.0|  66.0|  70.0|  99.0|   0.0|   0.0| 190.0| 249.0|   0.0|   0.0|   60.0|   228.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|  590.0|  10176.0|   0.0|   0.0|   0.0|   0.0|   0.0|   0.0|  60.0|  59.0|   0.0|   0.0|  580.0|   9179.0|  580.0|   9419.0|    0.0|    0.0|    0.0|    0.0|  200.0|   625.0|  380.0|  1553.0|
only showing top 2 rows
The dataframe has 166719 rows and 127 columns.

You do not have to clean your dataset, but I find it more readable. To clean your dataset, filter on the ZIP code and drop the extra columns.

    Dataset<Row> df2 = df.filter(df.col("zipcode").equalTo(zip));
    String[] colsToDrop = { "STATEFIPS", "mars1", "MARS2", "MARS4", "PREP", "N2",
        "NUMDEP", "TOTAL_VITA", "VITA", "TCE", "A00100", "N02650", "N00200", "A00200",
        "N00300", "A00300", "N00600", "A00600", "N00650", "A00650", "N00700", "A00700",
        "N00900", "A00900", "N01000", "A01000", "N01400", "A01400", "N01700", "A01700",
        "SCHF", "N02300", "A02300", "N02500", "A02500", "N26270", "A26270", "N02900",
        "A02900", "N03220", "A03220", "N03300", "A03300", "N03270", "A03270", "N03150",
        "A03150", "N03210", "A03210", "N03230", "A03230", "N03240", "A03240", "N04470",
        "A04470", "A00101", "N18425", "A18425", "N18450", "A18450", "N18500", "A18500",
        "N18300", "A18300", "N19300", "A19300", "N19700", "A19700", "N04800", "A04800",
        "N05800", "A05800", "N09600", "A09600", "N05780", "A05780", "N07100", "A07100",
        "N07300", "A07300", "N07180", "A07180", "N07230", "A07230", "N07240", "A07240",
        "N07220", "A07220", "N07260", "A07260", "N09400", "A09400", "N85770", "A85770",
        "N85775", "A85775", "N09750", "A09750", "N10600", "A10600", "N59660", "A59660",
        "N59720", "A59720", "N11070", "A11070", "N10960", "A10960", "N11560", "A11560",
        "N06500", "A06500", "N10300", "A10300", "N85530", "A85530", "N85300", "A85300",
        "N11901", "A11901", "N11902", "A11902" };
    for (String colName : colsToDrop) {
      df2 = df2.drop(colName);
    System.out.println("Dataframe has " + df2.count() + " rows and " + df2
        .columns().length + " columns.");

Now look at the results. You get something a lot more intelligible.

 |-- STATE: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- agi_stub: integer (nullable = true)
 |-- N1: double (nullable = true)
 |-- A02650: double (nullable = true)

|STATE|zipcode|agi_stub|    N1|  A02650|
|   NC|  27514|       1|3590.0| 42542.0|
|   NC|  27514|       2|2030.0| 74332.0|
|   NC|  27514|       3|1040.0| 65651.0|
|   NC|  27514|       4| 800.0| 71410.0|
|   NC|  27514|       5|1690.0|249042.0|
|   NC|  27514|       6|1650.0|843353.0|
Dataframe has 6 rows and 5 columns.

With this smaller dataset, you can see that what you are interested in is records where agi_stub is greater than 3. You want to count them, grouped by the ZIP code, and summed by the number of returns in column N1. In Spark code, it gives:

    Dataset<Row> df3 = df2.filter(df2.col("agi_stub").$greater(3));
    df3 = df3.groupBy("zipcode").sum("N1").withColumnRenamed("sum(N1)", "households");;

And you get:

|  27514|    4140.0|

What do these results mean? In this ZIP code, 4,140 tax returns were filed by people making more than $75,000. It is not a 1:1 to households, but it gives you a good idea of the potential.

Did it Spark?

The revenue dataset is almost 200MB, of which you will use just a fraction. The curated census data is only 400KB. Does it make sense to onboard this dataset in your Informix® database, which you primarily use for your sales and warehouse transactions? Probably not. Don't get me wrong, Informix is perfectly capable of handling these datasets, but is it its role?

As you move through your data scientist path, you will add more and more datasets, experiment with them (and maybe use dedicated tools like IBM Watson Studio). However, you probably do not want all these datasets in your production database for each experiment.

Back to business

After showing these first results to your sales folks, you found out, as a team, a good index for potential revenue.

Basically, the idea is:

  • Find the best sales area.
  • Consider the key figures as reference: population (ref_pop), current revenue you are doing in this area (ref_rev), and income (ref_inc).
  • Adjust the potential revenues for each ZIP code, comparing the population (pop), the revenue (rev), and the average income (inc).

Note: pl. indices is referred to as "index" in the rest of this article. Do not confuse this index with a database index.

For each area or ZIP code, apply:

So, this is nice, but how do you do this in Spark?

From business to development

You can find this example in the net.jgp.labs.informix2spark.l420 package, the application called SalesTargetApp.


You can walk through the code, with first the import, then the initialization.

package net.jgp.labs.informix2spark.l420;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.lit;

import java.math.BigDecimal;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;
import scala.collection.Seq;

public class SalesTargetApp {

  SparkSession spark;

  public SalesTargetApp() {

  private void init() {
    this.spark = SparkSession
        .appName("Sales Target")

  public static void main(String[] args) {
    SalesTargetApp app = new SalesTargetApp();

As with all the previous examples, everything begins with the start() method.

  private void start() {
    Dataset<Row> householdDf = getHouseholdDataframe();
    Dataset<Row> populationDf = getPopulationDataframe();
    Dataset<Row> indexDf = joinHouseholdPopulation(householdDf, populationDf);
    Dataset<Row> salesDf = getSalesData();

You are building four dataframes by calling methods. Let's dig into them.

Household data

This method is very similar to the first experiment in this tutorial with the IRS data. Start by reading the CSV files.

  private Dataset<Row> getHouseholdDataframe() {
    String filename = "data/14zpallagi*.csv";
    Dataset<Row> df ="csv")
        .option("inferSchema", "true")
        .option("header", "true").load(filename);

Like in SQL, select the columns that interest you. This is an alternate method to dropping all the columns you do not want.

    df =

In the last column, you want the total revenue on all tax returns for each group. Remember, the IRS splits the data into six groups.


This operation creates a column at the end of the dataframe, named (N1 * A02650). That is not a descriptive column name, so you decide to rename it income. df.columns() returns the list of column names, df.columns()[df.columns().length - 1] provides the name of the last column.

    df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "income");

Because you are not interested in analyzing each AGI per ZIP code, you need to group the different AGI category by ZIP code. While doing this, you can also add all the incomes into a column renamed total_income.

    df = df.groupBy("zipcode").sum("income");
    df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "total_income");

    return df;

If you displayed the dataframe at this point, you would see:

|  35071| 4.7763307E8|
|  36525|   6306850.0|
|  36538|    201690.0|
|  85253|9.45181039E9|
|  85321|  1.290294E7|
only showing top 5 rows


Loading the population dataframe is straightforward with all the experience you gained so far.

  private Dataset<Row> getPopulationDataframe() {
    String filename = "data/2010+Census+Population+By+Zipcode+(ZCTA).csv";
    Dataset<Row> df ="csv")
        .option("inferSchema", "true")
        .option("header", "true")

There is another way to rename columns, but you need to have the column names to do this (and you do have the column names).

    df = df.withColumnRenamed("Zip Code ZCTA", "zipcode");
    df = df.withColumnRenamed("2010 Census Population", "pop");
    return df;

And your dataframe looks like:

|zipcode|  pop|
|   1001|16769|
|   1002|29049|
|   1003|10372|
|   1005| 5079|
|   1007|14649|
only showing top 5 rows

Joining household income and population

As you learned in part 3, you are joining the two datasets on the ZIP Code, in an outer way - it means you will potentially have some null value.

  private Dataset<Row> joinHouseholdPopulation(
      Dataset<Row> householdDf,
      Dataset<Row> populationDf) {
    Dataset<Row> df = householdDf

Now, create a new column named income_per_inh. This column contains the result of the division of the total income by the population. This gives you an estimate of the income per inhabitant.

    return df;

withColumn() allows you to create a new column in your dataframe, using one or multiple columns as well as functions. withColumn() might become your favorite method as you do more and more transformations. It is easier to use than trying to find the last column to rename.

Here's how this new dataframe looks now:

|zipcode|total_income|  pop|    income_per_inh|
|   1088|   1144910.0|  670|1708.8208955223881|
|   1238|  7.228838E7| 6047|11954.420373739043|
|   1342|   4992920.0| 1492| 3346.461126005362|
|   2122|1.09356174E9|23479| 46576.16338004174|
|   2142| 1.0935586E8| 3141| 34815.61922954473|
only showing top 5 rows

Sales data

For the sales data, reuse the code you wrote for Part 3 of this tutorial series. You just need to put it in a method.

  private Dataset<Row> getSalesData() {
    return salesDf;

Remember, all the code is available on GitHub.

Your sales dataframe now contains the ZIP code and the revenues you are making.

|  94062|1390.00|
|  94040| 562.00|
|  94022| 448.00|
|  19898|1131.00|
|  74006|1614.00|
only showing top 5 rows

Combining all the datasets, forecasting sales

You now have all the datasets and are ready to build the indices you defined with the sales team.

    Dataset<Row> salesIndexDf = salesDf
        .join(indexDf, salesDf.col("zipcode").equalTo(indexDf.col("zipcode")), "left")

Look for the revenue per inhabitant.

    salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue")

Now sort to identify the best sales area.

    salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc());

Extract the "best row." The best row contains all the values to use as a reference from the sales team's best sector.

    Row bestRow = salesIndexDf.first();
    double bestRevenuePerInhabitant = ((BigDecimal) bestRow.getAs("revenue_by_inh"))
    int populationOfBestRevenuePerInhabitant = bestRow.getAs("pop");
    double incomeOfBestRevenuePerInhabitant = bestRow.getAs("income_per_inh");

Next, create a column in your dataframe. You know you can use the withColumn() method. But what if you want to add a column with a specific value? You can do this by taking a numeric column, dividing it by its value (so you have 1), and then multiplying it by the value.

    salesIndexDf = salesIndexDf.withColumn(

Or you can use the lit() static function:

    salesIndexDf = salesIndexDf.withColumn(
    salesIndexDf = salesIndexDf.withColumn(

Now you are ready to create the three indices.

    salesIndexDf = salesIndexDf.withColumn(
    salesIndexDf = salesIndexDf.withColumn(
    salesIndexDf = salesIndexDf.withColumn(

It is now time to create the final index for each area, which is the multiplication of each index.

    salesIndexDf = salesIndexDf.withColumn(

And apply the index to the existing revenue.

 salesIndexDf = salesIndexDf.withColumn(

You can drop a few columns to enhance the output. You could have dropped more, but you also need to have your report look scientific enough. Finally, make sure you order by the potential revenue, in a descending order.

    salesIndexDf = salesIndexDf

You can look at the resulting data.;
|zipcode|revenue|  pop|    income_per_inh|             index| potential_revenue|
|  94025|  84.00|40526| 840368.1256477323|1610.5247342457083|135284.07767663948|
|  08540|1499.97|47115|469565.43117903004| 68.11481294046366|102170.17596630729|
|  94086|1200.00|45697| 244836.9227739239| 41.76194133319635| 50114.32959983562|
|  94062|1390.00|25876| 738260.2450146854| 34.85768977158333| 48452.18878250083|
|  80219| 232.00|61296|104358.72308144088| 165.6588621914614|38432.856028419046|
|  94022| 448.00|18500|1081220.6994594594| 80.96352858991159|36271.660808280394|
|  94040| 562.00|32996|257082.76791126197|  48.8167513518355| 27435.01425973155|
|  32256| 438.00|38483|142462.90881688017| 47.21447575076384|20679.940378834563|
|  85016| 584.00|33896| 94057.98914326174| 18.13799917867292|10592.591520344986|
|  94063|5592.00|30949| 59561.31635917154|               1.0|            5592.0|
|  94085| 450.00|21247| 95544.70842942533| 9.395047814603645|  4227.77151657164|
|  74006|1614.00|25750| 63162.90718446602|2.5434469314609442| 4105.123347377964|
|  08002| 654.00|22274|59319.770584538026| 4.410905752434176| 2884.732362091951|
|  60406| 824.00|25460| 36702.44422623723|2.8300506455361445| 2331.961731921783|
|  94026|1451.80| null|              null|              null|              null|
|  19898|1131.00| null|              null|              null|              null|

Results and verification

Your reference was the sales in ZIP code 94063, where you sold for $5,592. This area has an index of 1.

Based on the index you built, you can see that the best potential is in ZIP code 94025, which is Menlo Park, Calif. (funny, that's the birthplace of Informix). The data shows that the revenue per household is among the highest in the nation here. Therefore, it makes sense that a higher sales potential is in ZIP code 94025.

map of San Francisco Bay area
map of San Francisco Bay area

The area with the least potential is 60406, Blue Island, Ill., a less populated district, south of Chicago.

Map of Southern Illinois
Map of Southern Illinois

You can look at the top five ZIP codes with the most potential: Menlo Park, Calif., Princeton, N.J., Sunnyvale, Calif., Redwood City, Calif., and Denver, Colo. Your sales manager will probably draw the conclusion that it makes sense to increase its sales effort in those areas.

What you learned

In Part 4 of this tutorial series, you learned:

  • How to perform advanced analytics. Even though your enterprise might not have all the data internally, you can download external datasets from the administration or Open Data portals.
  • Spark performs equally as well on data coming from RDBMS or external files.
  • With all APIs or languages, there are different ways to obtain the same thing. It's best to work toward consistency and clarity above all; this will help your code maintenance.
  • How to verify your results. Your findings are in line with the gut feeling of California being where people have more money to spend on sports equipment.

Go further

Downloadable resources

Zone=Information Management, Open source
ArticleTitle=Offloading your Informix data in Spark, Part 4: Leverage data against other data sources