Topic
5 replies Latest Post - ‏2013-03-20T11:02:00Z by SystemAdmin
ekarpov@ec-leasing.ru
2 Posts
ACCEPTED ANSWER

Pinned topic Problem with writing file in HDFS

‏2013-03-12T13:50:50Z |
I'm trying to write stream in HDFS. Streams and Biginsights are install on the same VM. Application is working, there is no error. But file in HDFS is empty.

Program text:

namespace namesp_1 ;

use com.ibm.streams.bigdata.hdfs::HDFSFileSink ;
use com.ibm.streams.inet::InetSource ;

composite Main
{
type
StockQuote = tuple<rstring name> ;
graph
() as HDFSFileSink_1 = HDFSFileSink(InetSource_1_out0)
{
param
format : txt ;
useVersionOneApi : false ;
hdfsConfigFile : "/etc/hdfsconfig.txt" ;
file : "/StreamsTest/Test1.txt" ;
}

(stream<rstring varName> InetSource_1_out0) as InetSource_1 = InetSource()
{
param
URIList : ;
fetchInterval : 60.0 ;
incrementalFetch : true ;
initDelay : 5.0 ;
}

/etc/hdfsconfig.txt incude this:

hdfsport=9000
hdfshost=192.168.1.175
hdfsuser=biadmin
hdfsgroup=supergroup
  • MikeBranson
    MikeBranson
    19 Posts
    ACCEPTED ANSWER

    Re: Problem with writing file in HDFS

    ‏2013-03-12T14:03:29Z  in response to ekarpov@ec-leasing.ru
    The first potential issue I see with the code you posted is that the URIList parameter on your InetSource operator is empty. It doesn't identify any URIs to retrieve data from, so it won't produce any data. Since that operator feeds the HDFSFileSink, no data will flow to the file.

    Mike B
    • ekarpov@ec-leasing.ru
      2 Posts
      ACCEPTED ANSWER

      Re: Problem with writing file in HDFS

      ‏2013-03-13T05:59:46Z  in response to MikeBranson
      URIList have one link : "http://www.aari.nw.ru/clgmi/gts/buoy/buoy.current.n.txt"
      I copied not the last version of my code. Sorry. So, another reasons?
      • SystemAdmin
        SystemAdmin
        1245 Posts
        ACCEPTED ANSWER

        Re: Problem with writing file in HDFS

        ‏2013-03-19T16:35:19Z  in response to ekarpov@ec-leasing.ru
        the HDFSFileSink doesn't write tuples as they comes into the Sink.
        There is a buffer (by default 128Mo).

        The write is done if the buffer is full or when there is a punctuation.

        You could try to put this
        param
        bufferSize: 1u;
        .........

        If you start your application and stop it you should see the output file because it will flush before stop.
        • SystemAdmin
          SystemAdmin
          1245 Posts
          ACCEPTED ANSWER

          Re: Problem with writing file in HDFS

          ‏2013-03-20T11:02:00Z  in response to SystemAdmin
          The comment mentioned above is correct . HDFSFileSink operator opens a buffer and writes go into that buffer. The buffer is only written to disk when adding the next tuple would exceed the size of the buffer (as specified in the bufferSize parameter), when it receives punctuation (including final punctuation), or when the operator shuts down. The operator keeps a number of buffers so that it can write into one while others wait to be written to disk.

          Just a correction in the comment is that default size for the buffer is 64M instead of 128

          You can also first try putting a FileSink operator and see if you have data being generated from inet Source .
      • SystemAdmin
        SystemAdmin
        1245 Posts
        ACCEPTED ANSWER

        Re: Problem with writing file in HDFS

        ‏2013-03-19T16:38:54Z  in response to ekarpov@ec-leasing.ru
        If the size of your tuples are small, and you don't have time to wait until 1Mo.

        try this to generate a punctuation mark and then force the sink to write into HDFS

        stream<Output> GeneratePunctuationForFlushNumber=Custom(MyInputStream){
        logic
        state:{
        mutable int32 val=0;
        }
        onTuple MyInputStream:{
        val++;
        submit(Final,GeneratePunctuationForFlushNumber);
        if(val >=10){
        submit(Sys.WindowMarker, GeneratePunctuationForFlushNumber);
        val=0;
        }
        }
        onPunct MyInputStream:{
        if (currentPunct() == Sys.WindowMarker)
        if(val >=10){
        submit(Sys.WindowMarker, GeneratePunctuationForFlushNumber);
        val=0;
        }
        }
        }

        () as HDFSFileSink_1 = HDFSFileSink(GeneratePunctuationForFlushNumber)