Parallel processing of unstructured data, Part 3: Extend the sashyReader

This series explores how to process unstructured data in parallel fashion — within a machine and across a series of machines — using the power of IBM® DB2® for Linux®, UNIX® and Windows® (LUW) and GPFS™ shared-nothing cluster (SNC) to provide efficient, scalable access to unstructured data through a standard SQL interface. In this article, see how the Java-based sashyReader framework leverages the architectural features in DB2 LUW. The sashyReader provides for parallel and scalable processing of unstructured data stored locally or on a cloud via an SQL interface. This is useful for data ingest, data cleansing, data aggregation, and other tasks requiring the scanning, processing, and aggregation of large unstructured data sets. You also learn how to extend the sashyReader framework to read arbitrary unstructured text data by using dynamically pluggable Python classes.

Share:

Steve Raspudic (stevera@ca.ibm.com ), PureData Provisioning Architect, IBM

Steve RaspudicSteven Raspudic is leading the provisioning team for the PureData for Transactions appliance and has played a key technical role in the delivery of numerous DB2 technologies, including the DB2 High Availability infrastructure and the DB2 High Availability and Disaster Recovery (HADR) technology. He is the holder of several patents in the areas of relational databases and database availability.



Alexander Abrashkevich (aabrashk@ca.ibm.com), PureData Provisioning Development, IBM

Alexander AbrashkevichAlexander Abrashkevich joined the IBM Toronto Lab in 2000. Since then, he worked on various projects in DB2 UNIX Porting team, DB2 RASPD (Reliability Availability Serviceability and Problem Determination) team, DB2 Health Advisor team, and DB2 DPS (Data Protection Services) team. Since 2011, he has been working on IBM cloud solutions -- first as a member of the DBaaS (Database as a Service) team, then as a lead developer on the PureData System for Transactions provisioning team. He is the holder of several patents and has numerous publications.



Toni Kunic (tonik@ca.ibm.com), PureData Provisioning Development, IBM

Toni KunicToni joined the PureData team in May of 2013. Since that time, he has been engaged in writing core SAN provisioning code for the appliance.



22 May 2014

Overview

Part 1 and Part 2 of this series discussed DB2 LUW-driven SQL access to unstructured data via sashyReader. The unstructured data can reside on a variety of file systems in a variety of serialization formats. The SQL interface is provided by the DB2 LUW Generic Table Function feature. The parallel and scalable parsing of unstructured data is performed using a Java user defined function (UDF) provided with this series. Various standard unstructured formats can be read, and the reader can be extended to read arbitrary unstructured text data using extensions written in Python.

Learn which edition of IBM DB2 for Linux, UNIX and Windows is right for you.

This article assumes that you're familiar with the following material discussed previously in this series: fundamentals of parallel processing, unstructured data, some serialization format concepts, the general parallel file system (GPFS) shared-nothing cluster (SNC) file system, interaction with cloud hosted data, and the DB2 LUW Generic Table Function feature and its capabilities to provide an SQL interface to unstructured data.

In this third and final part of the series, we'll demonstrate the Java-based framework sashyReader, which taps into the architectural features in DB2 LUW. The framework contains native support for comma-separated values (CSV), JavaScript Object Notation (JSON), and eXtesible Markup Language (XML). You can extend it by using dynamically pluggable Python classes, allowing the parallel SQL based parsing of arbitrary row based text unstructured data formats. You can download the sample code used in this article.

Figure 1 shows a high-level view of the sashyReader.

Figure 1. High level diagram of the sashyReader
High Level Diagram of the sashyReader

Input data

A key feature of the sashyReader architecture is the variety and extensibility of the data sources it supports. This support includes file systems and file serialization formats. Depending on the file system, data may be distributed to engines taking into account locality to attain better performance. The following file systems, which are discussed in this article, are currently supported:

sashyReader also has built-in support for the following file formats:


Using sashyReader

The installation procedure for sashyReader is in the README.md file in the sashyReader archive accompanying this article (download).

After you set up sashyReader, you're ready to run some queries. Briefly, to specify and configure input files:

  • Input parameters to the sashyReader are provided dynamically with a standard JSON structure. The parameters are themselves key-value pairs. For example, you specify the path to the input file or delimiter to use when parsing a CSV file. When constructing the input JSON, it is recommended to use an online JSON validator to ensure the JSON input map is valid.
  • The most important field you must specify is the inputURI. All input files to the sashyReader should be specified as a URI. Wild card characters (such as *.csv) are allowed to specify multiple files at once. An example input JSON is file:///tmp/data.csv, which points to a local file stored at /tmp/data.csv.

The next section examines inputJSON construction by file system.

Supported file systems

This section covers the file systems that sashyReader supports.

Local file system

For any local file on a file system mounted to the OS, you can use the "file://" URI scheme. This ensures that sashyReader distributes one continuous block of the file to each database partition. For example:

{"inputURI": "file:///tmp/data.csv"}

Keep in mind that the example above has three slashes in a row; two are part of the URI scheme, and one is part of the path.

GPFS

For a file stored on GPFS, the gpfs:// scheme is used. Using this scheme ensures that sashyReader distributes blocks to readers based on block locality for optimal IO access. For example:

{"inputURI": "gpfs:///gpfs1/data.csv"}

Amazon S3

Data stored in S3 is described by a URI with the form s3://bucketName/objectPath. To access data stored in S3, the input JSON also requires that access credentials be provided in the form of a standard AwsCredentials.properties file which contains your access AWS id and key (Resources has more details and the exact format). The JSON would look something like:

{"inputURI": "s3://sashybucket/l2_10k.csv",
"credentialsFile": "/home/username/AwsCredentials.properties"}

The example above describes data stored in the cloud on S3 with file name l2_10k.csv in the bucket sashybucket and access credentials to the data stored in a file /home/username/AwsCredentials.properties.

HDFS

HDFS URIs have a general structure, like hdfs://authority/path, where authority is the name node host, including port number if necessary. For example:

{"inputURI": "hdfs://my.hdfs.node:54310/user/hduser/airports.csv"}

More information on HDFS URIs is in Resources.

Microsoft Azure

For Azure, the input JSON is very similar to S3:

{"inputURI":"azure://sashybucket/airports.csv",
"credentialsFile": "/home/db2inst1/AzureCredentials.properties"}

The structure of AzureCredentials.properties is the same as AwsCredentials.properties.

Supported file formats

This article uses views to simplify the interaction with the table function. For all of the examples, we'll define various views of the form:

create view <viewname> as select * from
table <table_reference>
as <typed_correlation_clause>

Then, all subsequent references to the table function can be done via the defined view.

The following sections provide an overview of the sashyReader supported file formats and information on writing your own via python extensions. The examples are local, but the URI can be pointed to any of the file systems noted above. For instance, to try these examples on S3 simply change the URI to point to the input data location on S3 and supply a credentials file for AWS.

CSV example

You can download the CSV data we use from Our Airports data downloads. Place the downloaded airports.csv file in a location accessible to the instance owner. For example: /home/sashyReader/testInput/airports.csv.

Create a file named test_csv.clp and execute it using the db2 -tvf test_csv.clp command. Ensure the file has the following contents:

> cat test_csv.clp
connect to sample;
create view airview as
select * from table(SASHY('
{
"inputURI": "file:///tmp/airports.csv",
"viewName":"airview",
"inputFormatSettings": {"columnSeparator":",", "recordDelimiter": "\n"}
}
'))
as TX(
id int, ident varchar(100), type varchar(100),
name varchar(100), latitude_deg double,
longitude double, elevation_ft int, continent varchar(100),
iso_country varchar(100), iso_region varchar(100),
municipality varchar(100), scheduled_service varchar(100),
gps_code varchar(100), iata_code varchar(100), local_code varchar(100),
home_link varchar(50), wikipedia_link varchar(200),
keywords varchar(50));

The inputJSON contains the URI location of the input data, an optional view name to make querying more convenient, and an optional inputFormatSettings key-value map. The inputFormatSettings map contains options specific to input formats. In the case of CSV, they are columnSeparator and recordDelimiter. Refer to the README file for more options.

After executing the clp file, you can run queries against the created view. For example, to find all airports with elevation greater than 10000 feet, issue the query:

select id,name,elevation_ft from airview where elevation_ft > 10000

Note that this example works for DB2 LUW single partition or many partitions. As more partitions are added to a particular DB2 LUW instance, the parsing times are reduced due to the inherent parallelism of the sashyReader.

JSON example

For an example of JSON processing, we'll parse raw twitter data. (See the link in Resources to a good tutorial on how to use python and the tweepy library to pull down raw twitter data.)

Let's assume the raw twitter data has landed into a file named /tmp/twitter_feed.json. You can parse the twitter data with the sashyReader using the following:

> cat test_json.clp
connect to sample;
create view twtview as
select * from table(SASHY('
{"inputURI": "file:///tmp/twitter_feed.json",
"viewName":"twtview"
}
')) as TX
(created_at varchar(48), id varchar(20), id_str varchar(20),
text varchar(1024), retweeted varchar(16), source varchar(1024), lang varchar(16)
);

Create the view with:

 db2 -tvf test_json.clp.

Now, queries can be issued against the view. For example, to count the number of tweets in English (en), issue the query:

select count(*) from twtview where lang like 'en'

To count the number of tweets in your set not in English (en), issue the query:

select count(*) from twtview where lang not like 'en'

The dataset can also be stored in the cloud (similar to the previous examples) rather than on disk. To do so, modify the inputURI to point to the location in cloud storage where the data set resides and supply the location of the cloud credentials file.

XML example

For this example, supply your own XML file or generate one (Resources has a link to generate randomized test data). Assuming the data is in /tmp/people.xml, and that the records represent people, you can parse it with the following code:

> cat test_xml.clp
connect to sample;
create view xmlview as
select * from table(SASHY('
{"inputURI": "file:///tmp/people.xml",
"viewName":"xmlview",
"inputFormatSettings": {"startTag":"<record>", "endTag":"</record>"}
}
')) as TX
(NAME varchar(96), FIRST_NAME varchar(48), LAST_NAME varchar(48),
 AGE smallint
 );

Note, however, that because XML is a hierarchical data format it cannot be easily mapped to a relational table. Therefore, we process the XML by giving the record begin and end tags as input arguments and, when querying the data, tags within the record we're interested in should correspond to column names.

For example, with the following record structure:

<record>
	<name>
		<first_name>Bob</first_name>
		<last_name>Boberton</last_name>
	</name>
	<age>44</age>
</record>

The column NAME contains Bob Boberton.
The column FIRST_NAME contains Bob.
The column LAST_NAME contains Boberton.
The column AGE contains 44.

Create the view with:

db2 -tvf test_xml.clp

Now you can issue queries against the view:

select age from xmlview where age > 30

Python InputFormat extensions

You can extend sashyReader with new file formats by using python modules with specific interfaces. These interfaces are similar to the common Hadoop classes, so developers familiar with the Hadoop paradigm and Hadoop-focused Java programming can quickly and easily use the sashyReader platform.

It is recommended that you begin by creating a copy of one of the provided example modules— LinePyReader.py, RegexDelimPyReader.py, or NoOpPyReader.py—to serve as a starting point. Each of those python modules provides three classes: InputFormat, RecordReader, and RecordFormatter class.

The InputFormat class represents the input to sashyReader. It needs to implement block cleanup functionality. Block cleanup is defined in the cleanBlock() function, which takes a SashyInputSplit object (a block representing a byte-level segment of an input file) and advances the block's beginning and end so they match the beginning and end of a record. For example, in LinePyInputFormat, this means ensuring that the resulting clean block begins and ends on a newline character.

In the RecordReader class, you have to implement the nextKeyValue() function. The contract of that function is to read a record from a stream created from a clean block, and to advance the key to a long representing the byte offset of the record beginning and the value to a character string representing the record. The key and value should then be accessible via getCurrentKey() and getCurrentValue(). For a simple format like CSV, this comes down to reading a new line out of the block.

The RecordFormatter format() function has to be implemented. This function, given a record as a character string, should format and return the record as a list of strings. For the CSV example, it means splitting the record string (a line) at occurrences of the delimiter character and returning the results. Keep in mind that this list is returned to DB2 as a String[] and therefore the ordering of the fields in that list should match the ordering you specify in the AS TX statement of the SQL.

After ensuring that the InputFormat instantiates and initializes the correct RecordReader and InputFormat in createRecordReader() and getRecordFormatter(), you can start using your new reader with sashyReader. Simply put the python module in ~/sqllib/function and add the following to your input JSON:

"inputFormat": "<module_name>.<input_format_class_name>"

For example, if you've created a module named MyPyReader that contains the input format class MyPyInputFormatClass, use the following inputFormat key for the inputJSON structure:

"inputFormat": "MyPyReader.MyPyInputFormatClass"

SashyReader includes a couple of python readers that you can use as a template to quickly get started writing your own reader. The rest of this section discusses some of the python readers that come with sashyReader.

db2diag.log example

Place a standard db2diag.log file in /tmp/db2diag.log.

Generate a view of the following form:

> cat test_diag.clp
connect to sample;
create view diagvw as
select * from table(SASHY('
{"inputURI": "file:///tmp/db2diag.log",
"inputFormat":"DB2DiagPyReader.DB2DiagPyInputFormat",
"viewName":"diagvw"
}
')) as TX
(TIMESTAMP TIMESTAMP, RECORDID varchar(25), LEVEL varchar(15),
PID varchar(25),TID varchar(50),PROC varchar(50), INSTANCE varchar(50),
NODE varchar(10), DB varchar(256), AUTHID varchar(50), HOSTNAME varchar(256),
FUNCTION varchar(150), EDUID varchar(50),
EDUNAME varchar(50), APPHDL varchar(50), APPID varchar(150),
MESSAGE varchar(512), START varchar(256), STOP varchar(256), CHANGE varchar(256),
CALLSTCK varchar(2048)
);

Create the view using the following command:

db2 -tvf test_diag.clp

You can now perform relational operations against the db2diag.log raw data.

For example, to get all db2diag.log messages of logging level ERROR or higher, perform the following query:

select * from diagvw where LEVEL like '%ERR%'

WAS system out example

Place a standard WebSphere Application Server (WAS) system out file in the location /tmp/WAS_SystemOut.log.

The goal is to return this data as a relational table.

Run the following:

> cat test_was.clp
connect to sample;
create view wasview as
select * from table(SASHY('
{"inputURI": "file:///tmp/WAS_SystemOut.log",
"inputFormat":"WASSystemOutPyReader.WASSystemOutPyInputFormat",
"viewName":"wasview"
}
')) as TX
(TIMESTAMP TIMESTAMP, col2 varchar(32), col3 varchar(32),col4 varchar(32),
col5 varchar(1024),col6 varchar(3500));

Execute the view definition with the following command:

db2 -tvf test_was.clp

Select from the view wasview. You can perform relational operations against the WAS system output log raw data.

For example:

db2 "select * from wasview"

DB2 latch trace example

Please use the following clp file as the template for your queries to DB2 latch trace files.

Note: The ordering of AS TX matters and shouldn't be changed when using this specific reader.

The query is:

> cat test_latch.clp
connect to sample;
select count(*)  from table(SASHY(' {
"inputURI": "file:///tmp/small.perf.xml",
"inputFormat": "DB2LatchTracePyReader.DB2LatchTracePyInputFormat"
} '))
as TX(
        addr VARCHAR(32),
        avg_atomic_retries_counter DOUBLE,
        avg_nonwait_time DOUBLE,
        avg_tries_counter DOUBLE,
        avg_waitqueue DOUBLE,
        avg_waits_counter DOUBLE,
        avg_wait_time DOUBLE,
        avg_wastes_counter DOUBLE,
        conflicts INT,
        id1 VARCHAR(64),
        id2 VARCHAR(32),
        max_atomic_retries_counter INT,
        max_nonwait_time DOUBLE,
        max_tries_counter INT,
        max_waitqueue INT,
        max_waits_counter INT,
        max_wait_time DOUBLE,
        max_wastes_counter INT,
        min_atomic_retries_counter INT,
        min_nonwait_time DOUBLE,
        min_tries_counter INT,
        min_waitqueue INT,
        min_waits_counter INT,
        min_wait_time DOUBLE,
        min_wastes_counter INT,
        occurrences INT,
        offset VARCHAR(32),
        op VARCHAR(32),
        total_atomic_retries_counter INT,
        total_nonwait_time DOUBLE,
        total_tries_counter INT,
        total_waitqueue INT,
        total_waits_counter INT,
        total_wait_time DOUBLE,
        total_wastes_counter INT,
        type VARCHAR(32),
        waits INT,
        stacktraceback VARCHAR(32000));

Execute the clp file to run the query:

db2 -tvf test_latch.clp

Be aware that the "id" attribute in the DB2 latch trace log is labeled "id1" in the SQL statement and should be queried as "id1." Furthermore, the stacktraceback column contains a string representation of the stack (without <frame> tags, with the attributes parsed into a human-readable form).


Conclusion

This article demonstrated how you can process unstructured data in massively parallel fashion by using the features in DB2 LUW 10.1 or greater to provide efficient, highly scalable access to unstructured data through a standard SQL interface. You can now apply the expressive power of SQL to large unstructured data sets, and join, aggregate, and ingest that data into a DB2 LUW data warehouse in an efficient manner.

You also learned how to extend the sashyReader framework by using Python code. The extension allows arbitrary, row-based text data to be parsed in a highly parallel fashion within the DB2 LUW engine. It also allows further processing and ingestion of the unstructured data within a relational context.


Download

DescriptionNameSize
Sample codesashyReader2.tgz150KB

Resources

Learn

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Information management on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Information Management
ArticleID=971809
ArticleTitle=Parallel processing of unstructured data, Part 3: Extend the sashyReader
publish-date=05222014