Contents


Offloading your Informix data in Spark, Part 1

Collecting the data

Leverage data against other data sources

Comments

Content series:

This content is part # of # in the series: Offloading your Informix data in Spark, Part 1

Stay tuned for additional content in this series.

This content is part of the series:Offloading your Informix data in Spark, Part 1

Stay tuned for additional content in this series.

A little background

The literature about Spark is so abundant these days, it seems that I need to spend a little more time talking about this old lady called Informix®. Informix was born Relational Database Systems (RDS) back in 1980 and quickly became a reference Relational Database Management System (RDBMS) on UNIX systems. IBM acquired the Informix company in two phases (2001 and 2005), adding the eponym database to the impressive data management product portfolio. Fortunately, and some would say, thanks to their highly loyal and motivated user base and user group, Informix stayed quite active in the IBM portfolio, bringing a lot of innovations to the product like XML and JSON support, NoSQL integration, making it the first enterprise hybrid database, and more. Informix is a great database for transactional applications and is still active in companies like Walmart, Cisco, The Home Depot, DHL, and more. Every time you buy something at the largest supermarket chain in the world or at your favorite orange-themed home improvement store, the transaction is logged in one of each location's Informix database. At regular intervals, that data is brought back for consolidation in Arkansas and Georgia.

Despite having added in-memory support in the recent years, through Informix Warehouse Accelerator (or IWA), the world is going more toward heterogeneous support and data lake-oriented architecture. This is a sweet spot for Apache Spark and one that might make you wonder, "How do I offload the data in my Informix databases in Spark?"

In this tutorial, you will learn how to collect the data from Informix. In the second part of this series, you will learn how to add other sources of data and analyze the data.

To follow along, you need:

  • Spark 2.1.1
  • Informix 12.10.FC8
  • Java™ 1.8.0_60-b27
  • Informix JDBC driver 4.10.8.1
  • MacOS Sierra 10.12.5

Note: All the code is in GitHub.

Get the customer in my dataframe

Figure 1. Adding customer to dataframe
Image shows adding customer to dataframe
Image shows adding customer to dataframe

In this first section, you will connect to the customer table of the stores_demo, the well-known sample database.

The syntax is pretty straightforward, but there is a catch: Remember to use DELIMIDENT=Y in your connection to ensure that the SQL queries are well built. Learn more about DELIMIDENT.

Abstract from BasicCustomerLoader.java.

First, create a Spark session by connecting to local mode (or your cluster).

  SparkSession spark = SparkSession
      .builder()
      .appName("Stores Customer")
      .master("local")
      .getOrCreate();

Read the data from the table and store it in the dataframe.

  Dataset<Row> df = spark
    .read()
    .format("jdbc")
    .option(
      "url",
      "jdbc:informix-sqli://[::1]:33378/stores_demo:IFXHOST=lo_informix1210;DELIMIDENT=Y")
    .option("dbtable", "customer")
    .option("user", "informix")
    .option("password", "in4mix")
    .load();
  df.printSchema();
  df.show(5);

You get the schema.

 |-- customer_num: long (nullable = false)
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- company: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- phone: string (nullable = true)

Then the data (only showing top five rows):

+------------+---------------+---------------+--------------------+-------...
|customer_num|          fname|          lname|             company|       ...
+------------+---------------+---------------+--------------------+-------...
|         101|Ludwig         |Pauli          |All Sports Supplies |213 Ers…
|         102|Carole         |Sadler         |Sports Spot         |785 Gea…
|         103|Philip         |Currie         |Phil's Sports       |654 Pop…
|         104|Anthony        |Higgins        |Play Ball!          |East Sh…
|         105|Raymond        |Vector         |Los Altos Sports    |1899 La…
+------------+---------------+---------------+--------------------+-------...

Dump the entire database in Spark

Figure 2. Entire database in Spark
Image shows entire database in Spark
Image shows entire database in Spark

The JDBC Metadata API lists all the tables, then one by one, you load them in Spark. Only load tables and views and reject system tables, synonyms, aliases, etc.

However, as you import the data, Spark will not recognize some opaque data types present in Informix. To avoid that, you need to create a specific dialect for Informix. The JDBC dialect helps Spark as data comes in (and out), as well as setting up some parameters.

Building a translation dialect

Abstract of InformixJdbcDialect.java in the net.jgp.labs.informix2spark.utils package.

You need a few packages from Spark and Scala.

import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;

import scala.Option;

The dialect class inherits from JdbcDialect, but you will not override all the methods.

public class InformixJdbcDialect extends JdbcDialect {

The main method is canHandle, which, based on the JDBC URL, determines if this is the right dialect to use. In this case, you check that the URL starts with jdbc:informix-sqli, which is a good indicator that we use an Informix database.

  @Override
  public boolean canHandle(String url) {
    return url.startsWith("jdbc:informix-sqli");
  }

The second method is getCatalystType, which returns the data type Catalyst will understand, based on the data types retrieved from the JDBC driver. This list contains all the data types in the stores_demo. If your application uses more data types, you will have to add them here.

@Override
  public Option<DataType> getCatalystType(int sqlType,
      String typeName, int size, MetadataBuilder md) {
    if (typeName.toLowerCase().compareTo("calendar") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "calendarpattern") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "se_metadata") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo(
        "sysbldsqltext") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().startsWith("timeseries")) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo("st_point") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    if (typeName.toLowerCase().compareTo("tspartitiondesc_t") == 0) {
      return Option.apply(DataTypes.BinaryType);
    }
    return Option.empty();
  }

Note that this method returns an Option, which comes from Scala.

Getting the list of tables

It's time to go through all the tables.

Abstract from DatabaseLoader.java in the net.jgp.labs.informix2spark.l100 package. For readability purposes, I removed exception handling.

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;

import net.jgp.labs.informix2spark.utils.Config;
import net.jgp.labs.informix2spark.utils.ConfigManager;
import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
import net.jgp.labs.informix2spark.utils.K;

public class DatabaseLoader {

  private List<String> getTables(Connection connection) {
    List<String> tables = new ArrayList<>();

Get the connection's metadata.

    DatabaseMetaData md;
    md = connection.getMetaData();

From there, query the tables. This syntax returns all tables.

    ResultSet rs;
    rs = md.getTables(null, null, "%", null);

You can now browse through the metadata result set like a normal result set.

    while (rs.next()) {

The table name is the third column.

      String tableName = rs.getString(3);

The table type is the fourth column.

      String tableType = rs.getString(4).toLowerCase();
      System.out.print("Table [" + tableName + "] ... ");

Keep only the tables and views. The other types are system table, global temporary, local temporary, alias, and synonym.

      if (tableType.compareTo("table") == 0
          || tableType.compareTo("view") == 0) {
        tables.add(tableName);
        System.out.println("is in (" + tableType + ").");
      } else {
        System.out.println("is out (" + tableType + ").");
      }
    }

    return tables;
  }
}

Consume CPU cycles and memory

Now that you have the list of tables you want, you can assemble all your work, and create a map of dataframes.

Abstract from DatabaseLoader.java in the net.jgp.labs.informix2spark.l100 package. For readability purposes, I removed exception handling and some condition testing.

  private void start() {

Connect to Spark in local mode.

    SparkSession spark = SparkSession
        .builder()
        .appName("Stores Data")
        .master("local")
        .getOrCreate();

This small configuration object with a builder eases the management of the connection.

    Config config = ConfigManager.getConfig(K.INFORMIX);
    Connection connection = config.getConnection();

Get all the tables.

    List<String> tables = getTables(connection);
    if (tables.isEmpty()) {
      return;
    }

Define the dialect and register it within Spark.

    JdbcDialect dialect = new InformixJdbcDialect();
    JdbcDialects.registerDialect(dialect);

Prepare your map. The map is indexed by the table name and the dataframe.

    Map<String, Dataset<Row>> database = new HashMap<>();

Go through all the tables.

    for (String table : tables) {
      System.out.print("Loading table [" + table
          + "] ... ");

Follow the same principle as before, just with a different table every time.

    Dataset<Row> df = spark
      	.read()
  	    .format("jdbc")
  	    .option("url", config.getJdbcUrl())
  	    .option("dbtable", table)
  	    .option("user", config.getUser())
  	    .option("password", config.getPassword())
  	    .option("driver", config.getDriver())
  	    .load();
      database.put(table, df);
      System.out.println("done");
    }

Pick a random table (state is shown here), analyze it, and print the first five rows.

    System.out.println("We have " + database.size()
        + " table(s) in our database");
    Dataset<Row> df = database.get("state");

    df.printSchema();
    System.out.println("Number of rows in state: " + df
        .count());
    df.show(5);
  }

Execute the program to get the following.

Table [sysaggregates] ... is out (system table).
Table [sysams] ... is out (system table).
…
Table [call_type] ... is in (table).
Table [catalog] ... is in (table).
Table [classes] ... is in (table).
Table [cust_calls] ... is in (table).
Table [customer] ... is in (table).
Table [customer_ts_data] ... is in (table).
Table [employee] ... is in (table).
Table [ext_customer] ... is in (table).
Table [items] ... is in (table).
Table [manufact] ... is in (table).
Table [orders] ... is in (table).
Table [se_metadatatable] ... is in (table).
Table [se_views] ... is in (table).
…
Loading table [customer] ... done
Loading table [customer_ts_data] ... done
Loading table [employee] ... done
Loading table [ext_customer] ... done
Loading table [items] ... done
Loading table [manufact] ... done
Loading table [orders] ... done
Loading table [se_metadatatable] ... done
Loading table [se_views] ... done
Loading table [state] ... done
Loading table [stock] ... done
…
We have 45 table(s) in our database

root
 |-- code: string (nullable = true)
 |-- sname: string (nullable = true)

Only showing the top five rows of the 'state' table:

+----+---------------+
|code|          sname|
+----+---------------+
|  AK|Alaska         |
|  HI|Hawaii         |
|  CA|California     |
|  OR|Oregon         |
|  WA|Washington     |
+----+---------------+

Going forward

You are now ready to perform analytics on this data, but that is for another lesson.

In this tutorial, we used standard Java code, as well as standard JDBC methods. We designed this code for Informix. However, adapting to another great database, like Db2®, is a matter of minutes.

Thanks to Pradeep Natarajan, now with HCL, but still with Informix, who is always there when I have weird questions.

Go further


Downloadable resources


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management, Open source
ArticleID=1047883
ArticleTitle=Offloading your Informix data in Spark, Part 1: Collecting the data
publish-date=09132017