In this blog, we will discuss how to leverage the integration between BigSQL and Spark to work with BigSQL data from Spark using spark JDBC connector. Spark JDBC data source enables you to execute BigSQL queries from Spark and consume the results as data frames. In this example we will read data from a simple BigSQL table into a Spark Dataframe that can be queried and processed using Dataframe API and SparkSQL. Only Spark version: 2.0 and above can be used for this example.
1. Create and populate a simple BigSQL table. You can also use any existing BigSQL table of your choice. Copy following three rows into a file named storedata then upload storedata file to hdfs. Verify that the table is created and populated correctly.
item1,400,1999-12-31,0000.1234 item2,634,2007-02-04,0000.4567 item3,005,2006-09-24,0000.1234 [bigsql@mmkusum100-master-2 blog]$hadoop fs -put storedata /tmp [bigsql@mmkusum100-master-2 blog]$ hadoop fs -ls /tmp -rw-r--r-- 3 bigsql hdfs 126 2017-05-19 10:53 /tmp/storedata [bigsql@mmkusum100-master-2 blog]$db2 "create hadoop table store(item_name varchar(40), item_quantity int, sales_date date, tax_percentage double) stored as textfile" [bigsql@mmkusum100-master-2 blog]$db2 "begin execute immediate 'load hadoop using file url ''/tmp/storedata'' with source properties (''field.delimiter''='','', ''ignore.extra.fields''=''true'') into table store append'; end" [bigsql@mmkusum100-master-2 blog]$ db2 "select * from store" ITEM_NAME ITEM_QUANTITY SALES_DATE TAX_PERCENTAGE ---------------------------------------- ------------- ---------- ------------------------ item1 400 12/31/1999 +1.23400000000000E-001 item2 634 02/04/2007 +4.56700000000000E-001 item3 5 09/24/2006 +1.23400000000000E-001 3 record(s) selected.
2. Launch Spark Shell using the --jars option to point to the location of BigSQL JDBC driver in your environment. You will need to modify the path according to your environment.
[bigsql@mmkusum100-master-2 root]$ spark-shell --jars /usr/ibmpacks/bigsql/4.x.0.0/db2/java/db2jcc.jar Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://172.16.193.56:4040 Spark context available as 'sc' (master = local[*], app id = local-1494446400147). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information.
3. Import classes. Starting from spark 2.0, SparkSession is the entry point to spark SQL.
scala> import org.apache.spark.sql import org.apache.spark.sql scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
4. Load data from your BigSQL table into dataframe df. You will need to modify the jdbc specification, port for your environment.
scala> val df = spark.read.format("jdbc").option("url", "jdbc:db2://mmkusum100-master-2.domain.xxx.com:32051/BIGSQL").option("driver", "com.ibm.db2.jcc.DB2Driver").option("dbtable", "BIGSQL.STORE").option("user", "bigsql").option("password", "bigsql").load() df: org.apache.spark.sql.DataFrame = [ITEM_NAME: string, ITEM_QUANTITY: int ... 2 more fields]
5. Verify that data is correctly loaded from bigsql table to spark data frame.
scala> df.show() +---------+-------------+----------+--------------+ |ITEM_NAME|ITEM_QUANTITY|SALES_DATE|TAX_PERCENTAGE| +---------+-------------+----------+--------------+ | item1| 400|1999-12-31| 0.1234| | item2| 634|2007-02-04| 0.4567| | item3| 5|2006-09-24| 0.1234| +---------+-------------+----------+--------------+
6. Print schema for dataframe df
scala> df.printSchema() root |-- ITEM_NAME: string (nullable = true) |-- ITEM_QUANTITY: integer (nullable = true) |-- SALES_DATE: date (nullable = true) |-- TAX_PERCENTAGE: double (nullable = true)
7. Select desired data from dataframe df
scala> df.select("SALES_DATE").show() +----------+ |SALES_DATE| +----------+ |1999-12-31| |2007-02-04| |2006-09-24| +----------+
8. Create a view named spark_item from your dataFrame df. Spark sql can be used to query the spark view. Experiment with several modified spark SQL to explore more about BigSQL spark connector.
scala> df.createOrReplaceTempView("spark_item"); scala> spark.sql("select ITEM_NAME, SALES_DATE from spark_item") res4: org.apache.spark.sql.DataFrame = [ITEM_NAME: string, SALES_DATE: date] scala> spark.sql("select ITEM_NAME, SALES_DATE from spark_item").show() +---------+----------+ |ITEM_NAME|SALES_DATE| +---------+----------+ | item1|1999-12-31| | item2|2007-02-04| | item3|2006-09-24| +---------+----------+ scala> spark.sql("select ITEM_NAME, ITEM_QUANTITY from spark_item where ITEM_QUANTITY > 400").show() +---------+-------------+ |ITEM_NAME|ITEM_QUANTITY| +---------+-------------+ | item2| 634| +---------+-------------+
Hope this blog helps you get started with the simple interaction between spark and BigSQL. Stay tuned for more interesting and complex queries examples in my next blog topics.