Topic
3 replies Latest Post - ‏2013-09-06T15:39:16Z by Kevin_Foster
3YAS_Sami_Abed
3YAS_Sami_Abed
2 Posts
ACCEPTED ANSWER

Pinned topic Using JSONToTuple operator with HDFS sink

‏2013-09-03T13:43:19Z |

Hi Team.

I borrowed the logic from sample project 049_json_to_tuple_to_json_using_java in order to use JSONToTuple operator to convert JSON string into Streams tuple, then write the result into HDFS using HDFSFileSink. (Full script further below)

However, upon running the app, error is issued:

> ./output/bin/standalone

Exception in thread "Thread-11" java.lang.NoClassDefFoundError: org.apache.hadoop.conf.Configuration
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
        at java.net.URLClassLoader.findClass(URLClassLoader.java:434)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:660)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:358)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:626)
Can't construct instance of class org.apache.hadoop.conf.Configuration
03 Sep 2013 14:29:03.055 [12554] ERROR #splapplog,J[0],P[0],GenericSink,HdfsCommon M[GenericSink.cpp:getHdfsPtr:300]  - Could not access HDFS file system on host db2data01.mul.ie.ibm.com on  port 9,000.
03 Sep 2013 14:29:03.057 [12554] ERROR #splapplog,J[0],P[0],GenericSink,spl_pe M[PEImpl.cpp:logTerminatingException:1172]  - CDISR5033E: An exception occurred during the execution of the GenericSink operator. Processing element number 0 is terminating.
03 Sep 2013 14:29:03.058 [12554] ERROR #splapptrc,J[0],P[0],GenericSink,spl_operator M[PEImpl.cpp:instantiateOperators:422]  - CDISR5030E: An exception occurred during the execution of the GenericSink operator. The exception is Could not connect to HDFS.
03 Sep 2013 14:29:03.058 [12554] ERROR #splapptrc,J[0],P[0],GenericSink,spl_pe M[PEImpl.cpp:process:633]  - CDISR5079E: An exception occurred during the processing of the processing element. The error is: Could not connect to HDFS.
03 Sep 2013 14:29:03.058 [12554] ERROR #splapptrc,J[0],P[0],GenericSink,spl_operator M[PEImpl.cpp:process:654]  - CDISR5053E: Runtime failures occurred in the following operators: GenericSink.
 

I have copied all required folders from project 049 (as instructed): com.ibm.ssb.parsers.json; impl; lib; build.xml; Makefile

If I leave out JSONToTuple and only use HDFSFileSink, then the message is successfully written into HDFS.

If I define operator JSONToTuple after HDFSFileSink, then the error is different:

> ./output/bin/standalone
Fatal Error: Missing forwarder for JVM_Startup()

 

Could you provide any hints or references ?

In the meantime, to workaround the issue, I will define the JSON-to-tuple logic and HDFS sink in two separate composites. But not sure if it will resolve the problem.

Thank you

Sami Abed

 

Script:

--------

use com.ibm.ssb.parsers.json::*;
use com.ibm.streams.bigdata.hdfs::*;

composite HDFSTweetsToTuple2 {
        type
                MyJsonType = rstring jsonData;
                MyAddressType = rstring country;
                MyProfileType = rstring name, uint64 age, MyAddressType address;

        graph

                stream <rstring message> GeneratedData = Beacon() {
                        param
                                iterations: 5u;

                        output
                                GeneratedData: message = "{\"name\": \"Jane Doe\", \"age\": 20, \"address\":{\"country\": \"USA\"}}";
                } // End of Beacon.

                // Convert the JSON formatted string into a SPL tuple.
                stream<MyProfileType> ProfileTuple = JSONToTuple(GeneratedData) {}

                () as GenericSink = HDFSFileSink(ProfileTuple)   {
                        param
                               bufferSize : 1u;
                               numBuffers: 3u;
                               format: txt;
                               file: "/home/streamsadmin1/SAbed_dev/HDFSTweetsToTuple2/outputHI.txt";
                               hdfsConfigFile: "/home/streamsadmin1/SAbed_dev/HDFSTweetsToTuple2/etc/hdfsconfig.txt";
                }

} // End of HDFSTweetsToTuple2 composite.
 

Makefile:

-----------

all:
        ant
        $(STREAMS_INSTALL)/bin/spl-make-toolkit -i .
        $(STREAMS_INSTALL)/bin/sc -T -M HDFSTweetsToTuple2 -t /home/streamsadmin1/InfoSphereStreams/toolkits/com.ibm.streams.bigdata /home/streamsadmin1/InfoSphereStreams/toolkits/com.ibm.ssb.parsers.json

clean:
        ant clean
        $(STREAMS_INSTALL)/bin/sc -C -M HDFSTweetsToTuple2
        $(STREAMS_INSTALL)/bin/spl-make-toolkit -i . -c
        rm -rf output
 

hdfsconfig.txt

------------------

hdfshost=db2data01.mul.ie.ibm.com
hdfsport=9000
hdfsuser=streamsadmin1
hdfsgroup=biadm1
hadoopdir=results
hadoopDir=results
file=outputHI.txt
format=txt
 

  • Kevin_Foster
    Kevin_Foster
    98 Posts
    ACCEPTED ANSWER

    Re: Using JSONToTuple operator with HDFS sink

    ‏2013-09-03T14:52:47Z  in response to 3YAS_Sami_Abed

    It's possible that you have a conflict between operators when they are running in the same process.

    Can you try a Distributed build to see what happens when they are each in their own PE?

    -Kevin

    • 3YAS_Sami_Abed
      3YAS_Sami_Abed
      2 Posts
      ACCEPTED ANSWER

      Re: Using JSONToTuple operator with HDFS sink

      ‏2013-09-06T13:53:30Z  in response to Kevin_Foster

      That has solved the problem. Thank you very much.

      Is it expected that certain operators conflict ?

      Is it a known malfunction in Streams ?

      • Kevin_Foster
        Kevin_Foster
        98 Posts
        ACCEPTED ANSWER

        Re: Using JSONToTuple operator with HDFS sink

        ‏2013-09-06T15:39:16Z  in response to 3YAS_Sami_Abed

        It depends on the code that is embedded in the operator.

        If that code is designed (or does not conflict) with a multi-threaded process then it's fine to allow Streams to fuse multiple operators into a Standalone build or into the same PE in a Distributed build. Otherwise you need separation into different PE's using a Distributed build.

        As a recommendation, you might try adding a line like this to the Big Data toolkit operators that you use:


        config placement : partitionExlocation("HDFS_toolkit");  // or any quoted string you prefer

         

        This will keep the PE separation later if someone tries to optimize the build with profiling.

        -Kevin