SQL to Hadoop and back again, Part 3: Direct transfer and live data exchange

Big data is a term that has been used regularly now for almost a decade, and it — along with technologies like NoSQL — are seen as the replacements for the long-successful RDBMS solutions that use SQL. Today, DB2®, Oracle, Microsoft® SQL Server MySQL, and PostgreSQL dominate the SQL space and still make up a considerable proportion of the overall market. In this final article of the series, we will look at more automated solutions for migrating data to and from Hadoop. In the previous articles, we concentrated on methods that take exports or otherwise formatted and extracted data from your SQL source, load that into Hadoop in some way, then process or parse it. But if you want to analyze big data, you probably don't want to wait while exporting the data. Here, we're going to look at some methods and tools that enable a live transfer of data between your SQL and Hadoop environments.

Martin C. Brown, Director of Documentation

Martin BrownA professional writer for over 15 years, Martin (MC) Brown is the author and contributor to more than 26 books covering an array of topics, including the recently published Getting Started with CouchDB. His expertise spans myriad development languages and platforms: Perl, Python, Java, JavaScript, Basic, Pascal, Modula-2, C, C++, Rebol, Gawk, Shellscript, Windows, Solaris, Linux, BeOS, Microsoft WP, Mac OS and more. He currently works as the director of documentation for Continuent.



22 October 2013

Also available in Japanese

Using Sqoop

Like some solutions we've seen earlier, Sqoop is all about taking data, usually wholesale, from a database and inserting that into Hadoop in the format required for your desired use. For example, Sqoop can take raw tabular data — either a whole database, table, view, or query — and insert it into Hadoop using a native JSON-style format, CSV format, tab-delimited format, or Sqoop can import it to a format suitable for using the data in Hive or HBase.

The elegance of Sqoop is that it handles the entire extraction, transfer, data type translation, and insertion process for you, in either direction. Sqoop also handles — providing you have organized your data appropriately — the incremental transfer of information. This means you can perform a load and 24 hours later, you can perform an additional load that imports only the rows that have changed.

InfoSphere BigInsights

InfoSphere® BigInsights™ makes integrating between Hadoop and SQL databases much simpler as it provides the necessary tools and mechanics to export and import data between different databases. Using InfoSphere BigInsights you can define database sources, views, queries and other selection criteria, and then automatically convert that into a variety of formats before importing that collection directly into Hadoop (see Resources for more information).

For example, you can create a query that extracts the data and populates a JSON array with the record data. Once exported, a job can be created to process and crunch the data before either displaying it, or importing the processed data and exporting the data back into DB2.

Download BigInsights Quick Start Edition, a complimentary, downloadable version of InfoSphere BigInsights.

Sqoop is an efficient method of swapping data, since it uses multithreaded transfers to extract, convert, and insert the information among databases. This approach can be more efficient for data transfer than the export/import methods previously shown. The limitation of Sqoop is that it automates aspects of data exchange that, if made configurable, could be better tailored to your data and expected uses.

Importing to Hadoop using Sqoop

Sqoop works very simply by taking all the data in a table (effectively SELECT * FROM tablename), or through the query submitted, then submits this data as a MapReduce load job that writes the content out into HDFS within Hadoop.

The basic Sqoop tool accepts a command, import, then a series of options that define the JDBC interface, along with configuration information, such as the JDBC driver, authentication information, and table data. For example, here's how to import the Chicago Bus data from a MySQL source: $ sqoop import --connect jdbc:mysql://192.168.0.240/hadoop --username root --table chicago.

Sqoop likes to use the primary keys of the table data as an identifier for the information because each row of data will be inserted into HDFS as a CSV row in a file. The primary key is also the better method to use for append-only data, such as logs. Using the primary key is also handy when performing incremental imports because we can use it to identify which rows have already been imported.

The output of the command actually goes a long way to describe the underlying process (see Listing 1).

Listing 1. Output of the sqoop import command
$ sqoop import --connect jdbc:mysql://192.168.0.240/hadoop 
    --username root --table chicago
13/08/20 18:45:46 INFO manager.MySQLManager: Preparing to use a MySQL 
    streaming resultset.
13/08/20 18:45:46 INFO tool.CodeGenTool: Beginning code generation
13/08/20 18:45:47 INFO manager.SqlManager: Executing SQL statement: 
    SELECT t.* FROM `chicago` AS t LIMIT 1
13/08/20 18:45:47 INFO manager.SqlManager: Executing SQL statement: 
   SELECT t.* FROM `chicago` AS t LIMIT 1
13/08/20 18:45:47 INFO orm.CompilationManager: HADOOP_MAPRED_HOME 
    is /usr/lib/hadoop-mapreduce
13/08/20 18:45:47 INFO orm.CompilationManager: Found hadoop core jar at: 
/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-core.jar
Note: /tmp/sqoop-cloudera/compile/2a66b88e152785acb3688bb530daa957/chicago.java 
    uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/08/20 18:45:49 INFO orm.CompilationManager: Writing jar file:
 /tmp/sqoop-cloudera/compile/2a66b88e152785acb3688bb530daa957/chicago.jar
13/08/20 18:45:49 WARN manager.MySQLManager: It looks like you are 
    importing from mysql.
13/08/20 18:45:49 WARN manager.MySQLManager: This transfer can be faster! 
    Use the --direct
13/08/20 18:45:49 WARN manager.MySQLManager: option to exercise a 
    MySQL-specific fast path.
13/08/20 18:45:49 INFO manager.MySQLManager: Setting zero DATETIME 
    behavior to convertToNull (mysql)
13/08/20 18:45:49 ERROR tool.ImportTool: Error during import: No primary key 
could  be found for table chicago. Please specify one with --split-by or perform 
a sequential import with '-m 1'.
[cloudera@localhost ~]$ sqoop import --connect jdbc:mysql://192.168.0.240/hadoop 
    --username root --table chicago
13/08/20 18:48:55 INFO manager.MySQLManager: Preparing to use a MySQL 
    streaming resultset.
13/08/20 18:48:55 INFO tool.CodeGenTool: Beginning code generation
13/08/20 18:48:55 INFO manager.SqlManager: Executing SQL statement: 
    SELECT t.* FROM `chicago` AS t LIMIT 1
13/08/20 18:48:55 INFO manager.SqlManager: Executing SQL statement: 
    SELECT t.* FROM `chicago` AS t LIMIT 1
13/08/20 18:48:55 INFO orm.CompilationManager: HADOOP_MAPRED_HOME 
    is /usr/lib/hadoop-mapreduce
13/08/20 18:48:55 INFO orm.CompilationManager: Found hadoop core jar at:
   /usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-core.jar
Note: /tmp/sqoop-cloudera/compile/3002dc39075aa6746a99e5a4b27240ac/chicago.
    java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/08/20 18:48:58 INFO orm.CompilationManager: Writing jar file:
 /tmp/sqoop-cloudera/compile/3002dc39075aa6746a99e5a4b27240ac/chicago.jar
13/08/20 18:48:58 WARN manager.MySQLManager: It looks like you are importing 
    from mysql.
13/08/20 18:48:58 WARN manager.MySQLManager: This transfer can be faster! 
    Use the --direct
13/08/20 18:48:58 WARN manager.MySQLManager: option to exercise a 
    MySQL-specific fast path.
13/08/20 18:48:58 INFO manager.MySQLManager: Setting zero DATETIME 
    behavior to convertToNull (mysql)
13/08/20 18:48:58 INFO mapreduce.ImportJobBase: Beginning import of chicago
13/08/20 18:48:59 WARN mapred.JobClient: Use GenericOptionsParser for parsing 
    the arguments. Applications should implement Tool for the same.
13/08/20 18:49:00 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: 
    SELECT MIN(`id`), MAX(`id`) FROM `chicago`
13/08/20 18:49:00 INFO mapred.JobClient: Running job: 
    job_201308151105_0012
13/08/20 18:49:01 INFO mapred.JobClient:  map 0% reduce 0%
13/08/20 18:49:51 INFO mapred.JobClient:  map 100% reduce 0%
13/08/20 18:49:53 INFO mapred.JobClient: Job complete: 
    job_201308151105_0012
13/08/20 18:49:53 INFO mapred.JobClient: Counters: 23
13/08/20 18:49:53 INFO mapred.JobClient:   File System Counters
13/08/20 18:49:53 INFO mapred.JobClient:     FILE: Number of bytes read=0
13/08/20 18:49:53 INFO mapred.JobClient:     FILE: Number of bytes 
    written=695444
13/08/20 18:49:53 INFO mapred.JobClient:     FILE: Number of read operations=0
13/08/20 18:49:53 INFO mapred.JobClient:     FILE: Number of large read 
    operations=0
13/08/20 18:49:53 INFO mapred.JobClient:     FILE: Number of write operations=0
13/08/20 18:49:53 INFO mapred.JobClient:     HDFS: Number of bytes read=433
13/08/20 18:49:53 INFO mapred.JobClient:     HDFS: Number of bytes written=97157691
13/08/20 18:49:53 INFO mapred.JobClient:     HDFS: Number of read operations=4
13/08/20 18:49:53 INFO mapred.JobClient:     HDFS: Number of large 
    read operations=0
13/08/20 18:49:53 INFO mapred.JobClient:     HDFS: Number of write operations=4
13/08/20 18:49:53 INFO mapred.JobClient:   Job Counters 
13/08/20 18:49:53 INFO mapred.JobClient:     Launched map tasks=4
13/08/20 18:49:53 INFO mapred.JobClient:     Total time spent by all maps in 
    occupied slots (ms)=173233
13/08/20 18:49:53 INFO mapred.JobClient:     Total time spent by all reduces in 
    occupied slots (ms)=0
13/08/20 18:49:53 INFO mapred.JobClient:     Total time spent by all maps waiting 
    after reserving slots (ms)=0
13/08/20 18:49:53 INFO mapred.JobClient:     Total time spent by all reduces 
    waiting  after reserving slots (ms)=0
13/08/20 18:49:53 INFO mapred.JobClient:   Map-Reduce Framework
13/08/20 18:49:53 INFO mapred.JobClient:     Map input records=2168224
13/08/20 18:49:53 INFO mapred.JobClient:     Map output records=2168224
13/08/20 18:49:53 INFO mapred.JobClient:     Input split bytes=433
13/08/20 18:49:53 INFO mapred.JobClient:     Spilled Records=0
13/08/20 18:49:53 INFO mapred.JobClient:     CPU time spent (ms)=24790
13/08/20 18:49:53 INFO mapred.JobClient:     Physical memory (bytes) 
    snapshot=415637504
13/08/20 18:49:53 INFO mapred.JobClient:     Virtual memory (bytes) 
    snapshot=2777317376
13/08/20 18:49:53 INFO mapred.JobClient:     Total committed heap usage 
    (bytes)=251133952
13/08/20 18:49:53 INFO mapreduce.ImportJobBase: Transferred 92.6568 MB 
   in 54.4288 seconds (1.7023 MB/sec)
13/08/20 18:49:53 INFO mapreduce.ImportJobBase: Retrieved 2168224 records.

Once transferred, the data is stored, by default, as comma-separated values, and added to a directory named after the table that is imported, with the data split into select sizes (see Listing 2).

Listing 2. Storing the data
$ hdfs dfs -ls chicago
Found 6 items
-rw-r--r--   3 cloudera cloudera          0 2013-08-20 18:49 chicago/_SUCCESS
drwxr-xr-x   - cloudera cloudera          0 2013-08-20 18:49 chicago/_logs
-rw-r--r--   3 cloudera cloudera   23904178 2013-08-20 18:49 chicago/part-m-00000
-rw-r--r--   3 cloudera cloudera   24104937 2013-08-20 18:49 chicago/part-m-00001
-rw-r--r--   3 cloudera cloudera   24566127 2013-08-20 18:49 chicago/part-m-00002
-rw-r--r--   3 cloudera cloudera   24582449 2013-08-20 18:49 chicago/part-m-00003

To change the directory where the information is stored, use --target-dir to specify the directory location within HDFS.

The file format can be explicitly modified using command-line arguments, but the options are limited. For example, you can't migrate tabular data into a JSON record with Sqoop.

A more complex alternative is to use the SequenceFile format, which translates the raw data into a binary format that can be reconstituted within the Java™ environment of Hadoop as a Java class, with each column of the table data as a property of each instantiated class record. As an alternative, you can use Sqoop to import data directly into an HBase- or Hive-compatible table.

Importing using a query

Wholesale table transfers are useful, but one of the primary benefits of the SQL environment is the ability to join and reformat the input into a more meaningful stream of columnar data.

By using a query, you can extract entire tables, table fragments, or complex table joins. I tend to use queries when the source data is from multiple SQL tables and I want to crunch the data as a single source table within Hadoop.

To use a query, the --query argument must be specified on the command line. The query must include a WHERE clause that includes the variable $CONDITIONS; this is automatically populated by Sqoop to be used when splitting the source content (see Listing 3).

Listing 3. Using the --query argument
$ sqoop import --connect jdbc:mysql://192.168.0.240/hadoop --username root \
  --query "SELECT log.id,log.daterec,sensor.logtype,sensor.value FROM log 
  JOIN sensor on (sensor.logid == log.id) WHERE $CONDITIONS"

The basic process is the same; we're just being limited about the data being exchanged. Internally, Sqoop merely executes the query and takes the tabular output.

A good technique to use here is to remember that the size of the data being transferred is (comparatively) meaningless. Also bear in mind that during processing within Hadoop, you will only have access to the information in the files that are imported; you won't be able to run a join or other lookup to find the information you need that would normally exist in a multi-table SQL environment. Therefore, duplication of information (for example, repeats of a string, ID, or date identifier column) on a row basis that you might ordinarily dedupe, can safely be repeated.

Incremental imports

The incremental import is an attempt by Sqoop to handle the fact that source data is unlikely to be static. The process is not automatic, and you must be prepared to keep a record of the last data that was imported.

The incremental system operates in two ways, either using a lastmodified approach, or using an append approach:

  • The lastmodified approach requires changing your SQL table structure and application as it performs a comparison on a date that is then used to determine which records have changed since the last import was made. This is best used for data from the SQL side that is updated, but you must adapt your application and database structure to include a column that contains the date and time when the record was inserted or updated. Most databases include a timestamp data type for exactly this purpose.
  • The append approach uses a simpler check field. This can be used in a number of ways, but the most obvious is one where an auto-incremented column is used to hold data and is, therefore, better suited to data that is permanently appended, rather than created and updated. Another option is to use a column that is updated to a new value for each insert or update, but this requires hoop jumping that is more complex than the auto_increment value.

For either system, the fundamental approach is the same: tell Sqoop which column contains the data to be checked and the check value, then import as normal. For example, to import all the data since the original Chicago bus data import, we specify the auto_increment ID of the last row imported (see Listing 4).

Listing 4. Specifying the auto_increment ID
$ sqoop import --connect jdbc:mysql://192.168.0.240/hadoop --username root \
    --table chicago --check-column id --incremental append --last-value=2168224
...

13/08/20 19:39:01 INFO tool.ImportTool: Incremental import complete! 
To run another incremental import of all data following this import, supply the 
following arguments:
13/08/20 19:39:01 INFO tool.ImportTool:  --incremental append
13/08/20 19:39:01 INFO tool.ImportTool:   --check-column id
13/08/20 19:39:01 INFO tool.ImportTool:   --last-value 4336573
13/08/20 19:39:01 INFO tool.ImportTool: (Consider saving this with 
'sqoop job --create')

One useful feature of the incremental process is that the job outputs the command-line values you would need to use for the next import, but it's easier to save this as a job (see Listing 5).

Listing 5. Saving output as a job
$ sqoop job create nextimport --incremental append \
    --check-column id --last-value 4336573 \
    --connect jdbc:mysql://192.168.0.240/hadoop \
    --username root --table chicago

Now you can run the next import by running $ sqoop job --exec nextimport.

The incremental import process is great for those jobs that aggregate data from your SQL store into Hadoop over time, while deleting the active data in the SQL store. The basic premise here can be used to do near-live updates of information into Hadoop from an SQL source by easily transferring the information across.

Exporting to SQL using Sqoop

The export process just converts the data in Hadoop back into a table. Sqoop achieves this export by loading the data into a staging table and importing the data into the target table. The target table has to exist, and the structure has to match the information being exported from Hadoop (see Listing 6).

Listing 6. Sqoop export command
$ sqoop export --connect jdbc:mysql://192.168.0.240/hadoop --username root 
    --table chicago2 --export-dir=chicago
13/08/20 20:08:44 INFO manager.MySQLManager: Preparing to use a MySQL 
    streaming resultset.
13/08/20 20:08:44 INFO tool.CodeGenTool: Beginning code generation
13/08/20 20:08:46 INFO manager.SqlManager: Executing SQL statement: 
    SELECT t.* FROM `chicago2` AS t LIMIT 1
13/08/20 20:08:47 INFO manager.SqlManager: Executing SQL statement: 
    SELECT t.* FROM `chicago2` AS t LIMIT 1
13/08/20 20:08:47 INFO orm.CompilationManager: HADOOP_MAPRED_HOME 
    is /usr/lib/hadoop-mapreduce
13/08/20 20:08:47 INFO orm.CompilationManager: Found hadoop core jar at:
 /usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-core.jar
Note: /tmp/sqoop-cloudera/compile/5f6d818f5d78c0e4349b5fc3924f87da/
    chicago2.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
13/08/20 20:08:49 INFO orm.CompilationManager: Writing jar file:
 /tmp/sqoop-cloudera/compile/5f6d818f5d78c0e4349b5fc3924f87da/chicago2.jar
13/08/20 20:08:50 INFO mapreduce.ExportJobBase: Beginning export of chicago2
13/08/20 20:08:54 WARN mapred.JobClient: Use GenericOptionsParser for 
parsing the arguments. Applications should implement Tool for the same.
13/08/20 20:08:57 INFO input.FileInputFormat: Total input paths to process : 8
13/08/20 20:08:57 INFO input.FileInputFormat: Total input paths to process : 8
13/08/20 20:08:58 INFO mapred.JobClient: Running job: job_201308151105_0015
13/08/20 20:08:59 INFO mapred.JobClient:  map 0% reduce 0%
13/08/20 20:09:31 INFO mapred.JobClient:  map 1% reduce 0%
13/08/20 20:09:33 INFO mapred.JobClient:  map 2% reduce 0%
13/08/20 20:09:34 INFO mapred.JobClient:  map 3% reduce 0%
13/08/20 20:09:36 INFO mapred.JobClient:  map 4% reduce 0%
13/08/20 20:09:37 INFO mapred.JobClient:  map 5% reduce 0%
13/08/20 20:09:40 INFO mapred.JobClient:  map 7% reduce 0%
13/08/20 20:09:43 INFO mapred.JobClient:  map 9% reduce 0%
13/08/20 20:09:46 INFO mapred.JobClient:  map 12% reduce 0%
13/08/20 20:09:49 INFO mapred.JobClient:  map 13% reduce 0%
13/08/20 20:09:52 INFO mapred.JobClient:  map 15% reduce 0%
13/08/20 20:09:55 INFO mapred.JobClient:  map 20% reduce 0%
13/08/20 20:09:58 INFO mapred.JobClient:  map 22% reduce 0%
13/08/20 20:09:59 INFO mapred.JobClient:  map 23% reduce 0%
13/08/20 20:10:01 INFO mapred.JobClient:  map 25% reduce 0%
13/08/20 20:10:02 INFO mapred.JobClient:  map 27% reduce 0%
13/08/20 20:10:04 INFO mapred.JobClient:  map 29% reduce 0%
13/08/20 20:10:05 INFO mapred.JobClient:  map 33% reduce 0%
13/08/20 20:10:08 INFO mapred.JobClient:  map 34% reduce 0%
13/08/20 20:10:09 INFO mapred.JobClient:  map 39% reduce 0%
13/08/20 20:10:11 INFO mapred.JobClient:  map 40% reduce 0%
13/08/20 20:10:12 INFO mapred.JobClient:  map 43% reduce 0%
13/08/20 20:10:13 INFO mapred.JobClient:  map 46% reduce 0%
13/08/20 20:10:15 INFO mapred.JobClient:  map 49% reduce 0%
13/08/20 20:10:16 INFO mapred.JobClient:  map 52% reduce 0%
13/08/20 20:10:18 INFO mapred.JobClient:  map 55% reduce 0%
13/08/20 20:10:19 INFO mapred.JobClient:  map 58% reduce 0%
13/08/20 20:10:21 INFO mapred.JobClient:  map 62% reduce 0%
13/08/20 20:10:22 INFO mapred.JobClient:  map 64% reduce 0%
13/08/20 20:10:24 INFO mapred.JobClient:  map 67% reduce 0%
13/08/20 20:10:25 INFO mapred.JobClient:  map 70% reduce 0%
13/08/20 20:10:27 INFO mapred.JobClient:  map 73% reduce 0%
13/08/20 20:10:28 INFO mapred.JobClient:  map 77% reduce 0%
13/08/20 20:10:30 INFO mapred.JobClient:  map 79% reduce 0%
13/08/20 20:10:31 INFO mapred.JobClient:  map 83% reduce 0%
13/08/20 20:10:33 INFO mapred.JobClient:  map 85% reduce 0%
13/08/20 20:10:34 INFO mapred.JobClient:  map 87% reduce 0%
13/08/20 20:10:36 INFO mapred.JobClient:  map 90% reduce 0%
13/08/20 20:10:37 INFO mapred.JobClient:  map 92% reduce 0%
13/08/20 20:10:39 INFO mapred.JobClient:  map 94% reduce 0%
13/08/20 20:10:40 INFO mapred.JobClient:  map 95% reduce 0%
13/08/20 20:10:41 INFO mapred.JobClient:  map 97% reduce 0%
13/08/20 20:10:43 INFO mapred.JobClient:  map 99% reduce 0%
13/08/20 20:10:44 INFO mapred.JobClient:  map 100% reduce 0%
13/08/20 20:10:45 INFO mapred.JobClient: Job complete: job_201308151105_0015
13/08/20 20:10:45 INFO mapred.JobClient: Counters: 24
13/08/20 20:10:45 INFO mapred.JobClient:   File System Counters
13/08/20 20:10:45 INFO mapred.JobClient:     FILE: Number of bytes read=0
13/08/20 20:10:45 INFO mapred.JobClient:     FILE: Number of bytes written=690336
13/08/20 20:10:45 INFO mapred.JobClient:     FILE: Number of read operations=0
13/08/20 20:10:45 INFO mapred.JobClient:     FILE: Number of large read operations=0
13/08/20 20:10:45 INFO mapred.JobClient:     FILE: Number of write operations=0
13/08/20 20:10:45 INFO mapred.JobClient:     HDFS: Number of bytes read=195427182
13/08/20 20:10:45 INFO mapred.JobClient:     HDFS: Number of bytes written=0
13/08/20 20:10:45 INFO mapred.JobClient:     HDFS: Number of read operations=28
13/08/20 20:10:45 INFO mapred.JobClient:     HDFS: Number of large read operations=0
13/08/20 20:10:45 INFO mapred.JobClient:     HDFS: Number of write operations=0
13/08/20 20:10:45 INFO mapred.JobClient:   Job Counters 
13/08/20 20:10:45 INFO mapred.JobClient:     Launched map tasks=4
13/08/20 20:10:45 INFO mapred.JobClient:     Data-local map tasks=4
13/08/20 20:10:45 INFO mapred.JobClient:     Total time spent by all maps in 
    occupied slots (ms)=288054
13/08/20 20:10:45 INFO mapred.JobClient:     Total time spent by all reduces in 
    occupied slots (ms)=8603
13/08/20 20:10:45 INFO mapred.JobClient:     Total time spent by all maps waiting 
    after reserving slots (ms)=0
13/08/20 20:10:45 INFO mapred.JobClient:     Total time spent by all reduces waiting 
    after reserving slots (ms)=0
13/08/20 20:10:45 INFO mapred.JobClient:   Map-Reduce Framework
13/08/20 20:10:45 INFO mapred.JobClient:     Map input records=4336448
13/08/20 20:10:45 INFO mapred.JobClient:     Map output records=4336448
13/08/20 20:10:45 INFO mapred.JobClient:     Input split bytes=672
13/08/20 20:10:45 INFO mapred.JobClient:     Spilled Records=0
13/08/20 20:10:45 INFO mapred.JobClient:     CPU time spent (ms)=55930
13/08/20 20:10:45 INFO mapred.JobClient:     Physical memory (bytes) 
    snapshot=445665280
13/08/20 20:10:45 INFO mapred.JobClient:     Virtual memory (bytes) 
    snapshot=2768896000
13/08/20 20:10:45 INFO mapred.JobClient:     Total committed heap usage 
    (bytes)=251133952
13/08/20 20:10:45 INFO mapreduce.ExportJobBase: Transferred 186.3739 MB in 
    111.7007 seconds (1.6685 MB/sec)
13/08/20 20:10:45 INFO mapreduce.ExportJobBase: Exported 4336448 records.

The export works in a similar way to the import. A MapReduce job is created that dumps the stored data from Hadoop and writes the individual records into your target SQL store.


Automating data exchange

Although all of the different solutions that have been described up to now could easily be scripted to make the process easier and more efficient, the problem they all have is one of immediacy. Post-processing (or even offline processing) of data is not uncommon, but there are situations where you want to exchange data live between servers so you can perform larger, longer queries, in addition to the faster, quicker access of data through the SQL interface.

The big problem at the moment is that there is no simple way to do this live without changing your application. Ideally, there would be an application layer that sits between Hadoop and your SQL data source that automatically handles the exchange of information.

For this automatic exchange to work, the system needs to know what might have changed or modified and how to identify it. SQL to Hadoop is easy if you tie into a change mechanism such as CDC in DB2 and Oracle or the binary log in MySQL. Going from Hadoop is more difficult because it relies on knowing when data processed by Hadoop has been stored effectively, and when the MapReduce job has completed. For incremental processing, it is even more complex.

Using Sqoop2

Sqoop2 is a complete rewrite of the Sqoop code that doesn't rely on the explicit JDBC interface for communicating with other databases. Instead, any database format that can provide a stream of data. This flexibility helps primarily with many NoSQL data sources, such as MongoDB, and makes it easier to work with a wider range of SQL data sources where the underlying dialect of the SQL interface is not a limitation to exchanging and swapping data.

Behind the scenes, Sqoop2 operates as a server process that provides a direct interface between the source database and Hadoop. It operates entirely as I've just described. Unlike Sqoop, you can predefine jobs and structures and how the interface of the data exists between the two systems, then use this definition to run explicit transfers of the data. Sqoop2 is still in its early stages (the first stable version was made available in March 2013), and convenience features like Hive and HBase support are not complete. The real benefit will come with a forthcoming UI, which is expected toward the end of the 2013.

Incorporating Hadoop in your application

For performing more regular, measured transfer of data, the best approach is to make the data exchange part of your application framework. This is the only way to be sure that the exchange of information is wired correctly into your application workflow. One solution is to use Hive as your best approach, since it allows SQL queries to be used directly on the two systems.

However, this kind of integration is risky is because of the potential for bad data. The main benefit of using SQL is the transactional nature that guarantees data has been written to disk. Exporting (Sqoop, CSV, etc.) the data is much safer and works much better with the append-only nature of HDFS and Hadoop in general.

The exact method is dependent on your application, but writing the same data to both systems is not without its pitfalls. In particular:

  • Writing data to two databases is complicated. To guarantee that no data is lost, you would need to confirm both transactions and reject the write for the application to resubmit in the event that either the SQL or Hadoop write had failed.
  • Updates are more complicated in Hadoop. HDFS is append-only by nature, so you would have to handle the process by writing a correction record into HDFS, then using MapReduce to compress the insert and update operations during processing.
  • Deletes are fundamentally the same as updates: We just kill the data during MapReduce processing.
  • Any changes (updates or deletes) may cause problems if you are doing streaming updates and compression of the data in Hadoop.
  • If you are reading the data back into SQL in a summarized form for near-line processing, you'll need to plan the process (as described in Part 2) very carefully.

The data life cycle between SQL and Hadoop of export, process, import is generally much easier to handle (disk space issues aside). If you need near-live transfer, then the Sqoop incremental transfer is much more efficient.


Conclusion

Live data transfer between SQL and Hadoop is not a sensible option, but with Sqoop, we can do the next best thing by using incremental updates to load the most recent data into Hadoop in a regular fashion. The alternative — live updates through your application — is so incredibly risky from a quality and reliability of data point of view that it should be avoided. Regular swapping of information between SQL and Hadoop is safer and allows for the data to be managed during the transfer more effectively.

Throughout this "SQL to Hadoop and back again" series, the focus has been on trying to understand that life cycle and how the transfer and exchange of information operates. The format of the data is relatively simple, but knowing how and when to effectively exchange the information and process it in a way that matches your data needs is the challenge. There are lots of solutions out there to move the data, but it is still up to your application to understand the best way to make use of that exchange.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Big data and analytics on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Big data and analytics
ArticleID=948569
ArticleTitle=SQL to Hadoop and back again, Part 3: Direct transfer and live data exchange
publish-date=10222013