Topic
  • 4 replies
  • Latest Post - ‏2013-10-23T20:16:10Z by david.cyr
david.cyr
david.cyr
20 Posts

Pinned topic Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

‏2013-10-17T15:44:55Z |

I have a use case where I need to use the HDFSFileSink operator to write data for BigInsights, and need to ensure that no data sits in the buffer for too long, even in periods of slow data arrival (such that the buffer isn't constantly filling up and flushing). This is easy enough to accomplish with a Beacon and punctuatrions.

We have noticed, however, that when using this approach we often end up with an initial 0-byte file, and then the files of data. The HDFSFileSink operator isn't continuously generating 0-byte files (for example, in periods of no data, the heartbeats aren't causing files ot be created with no data). The pattern I've seen so far seems limited to an initial 0-byte file, and then files with the data.

The 0-byte file seems to be causing them some problems processing information in the directory on the BigInsights side (I'm not sure of the details), so I'm trying to see if I can find a way to avoid producing them.

I'm including a quick sample program that I can use to duplicate this problem in my environment... The behavior of this program is that it will produce one 0-byte file (e.g. test 4815.0.0) and then 5 or 6 files containing the rows of data (labeled 0-29, per the IterationCount). It does not continue to produce 0 byte files, even though the beacon keeps chirping for a few minutes after the data has stopped.

Thanks in advance for any help you can provide in eliminating this pesky 0 byte file. I'll continue looking on my side as well, now that I have a simple test case.

-------------------------------

namespace application ;

use com.ibm.streams.bigdata.hdfs::HDFSFileSink ;

composite ScratchHDFSAndTimer
{
 graph
  (stream<uint64 iterCount> DataChirp) as DataOriginatingBeacon = Beacon()
  {
   param
    initDelay : 60.0 ;
    iterations : 30 ;
    period : 5.0 ;
   output
    DataChirp : iterCount = IterationCount() ;
  }

  (stream<uint64 iterCount> BufferFlushChirp) as BufferFlushHeartbeatsBeacon =
   Beacon()
  {
   param
    initDelay : 3.0 ;
    iterations : 20 ;
    period : 30.0 ;
   output
    BufferFlushChirp : iterCount = IterationCount() ;
  }

  (stream<uint64 iterCount> BufferFlushHeartbeat) as Custom_3 =
   Custom(BufferFlushChirp as inPort0Alias)
  {
   logic
    onTuple BufferFlushChirp : submit(Sys.WindowMarker, BufferFlushHeartbeat) ;
  }

  () as HDFSFileSink_4 = HDFSFileSink(BufferFlushHeartbeat, DataChirp)
  {
   param
    file :
     '/user/biadmin/junk/test.%PROCID.%PELAUNCHNUM.%FILENUM' ;
    format : txt ;
    hdfsConfigFile : 'hdfsconfig.txt' ;
  }

}

 

 

  • KrisWH
    KrisWH
    13 Posts
    ACCEPTED ANSWER

    Re: Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

    ‏2013-10-23T18:30:21Z  

    Increasing the initDelay on your heartbeat so that you're sure some data has arrived (say to 90 seconds in this example) would keep you from running into the problem.  Or can the BigInsights side exclude the file pattern that is the zero-byte file?

    If you were willing to modify the toolkit code, in HDFSFileSink_cpp.cgt, near line 258, you could change the curBuf=buffers[0]; to curBuf=NULL; and then no file would be written until at least some data had arrived.

     

     

  • KrisWH
    KrisWH
    13 Posts
    ACCEPTED ANSWER

    Re: Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

    ‏2013-10-23T20:05:53Z  
    • david.cyr
    • ‏2013-10-23T19:01:26Z

    Thanks for the good and detailed answer (especially the part about the _cpp, in case I ever need to know that).

    To let you know how I finally resolved this (essentially in line with your first recommendation about the heartbeat), I actually have a Custom task now just prior to the HDFSFileSink in my 'real' code, since I also need to add in CSV-style headers whenever I am first making a file (either by heartbeat or by amount of rows [as a proxy for figuring out if I'm in the buffer size]). In that Custom logic, I'm also checking to see if I've sent at least one row of data through, and if not I'm suppressing my punctuations.

    I had considered leaving it up to an init delay, but since in my real code the delays for my different data contributors are customizable, I didn't want to leave it up to people to "remember" to always make this delay longer than the others, hence the check in the Custom logic (along with the custom logic for handling spitting out headers when appropriate).

    Thanks again for the detailed response!

    d

     

     

    Glad to help!

    It sounds like your custom operator approach makes sense, especially since you are already adding header rows.  However, if you want to go back to the initDelay approach,  it may help to use expressions when specifying parameters.   So if you have:

    composite ScratchHDFSAndTimer
    {
    param

    expression<float64> $dataInitDelay: 60.0;
    expression<float64> $otherInitDelay: 5.0;

    at the top of your file, then in your data beacon, you specify the delay using that variable:

    initDelay : $dataInitDelay ;

    for the heartbeat delay, you can do (for example):

    initDelay : max($dataInitDelay,$otherInitDelay) + 5.0;

    Then the heartbeat delay will always be greater either of the two other delays. 

  • KrisWH
    KrisWH
    13 Posts

    Re: Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

    ‏2013-10-23T18:30:21Z  

    Increasing the initDelay on your heartbeat so that you're sure some data has arrived (say to 90 seconds in this example) would keep you from running into the problem.  Or can the BigInsights side exclude the file pattern that is the zero-byte file?

    If you were willing to modify the toolkit code, in HDFSFileSink_cpp.cgt, near line 258, you could change the curBuf=buffers[0]; to curBuf=NULL; and then no file would be written until at least some data had arrived.

     

     

  • david.cyr
    david.cyr
    20 Posts

    Re: Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

    ‏2013-10-23T19:01:26Z  
    • KrisWH
    • ‏2013-10-23T18:30:21Z

    Increasing the initDelay on your heartbeat so that you're sure some data has arrived (say to 90 seconds in this example) would keep you from running into the problem.  Or can the BigInsights side exclude the file pattern that is the zero-byte file?

    If you were willing to modify the toolkit code, in HDFSFileSink_cpp.cgt, near line 258, you could change the curBuf=buffers[0]; to curBuf=NULL; and then no file would be written until at least some data had arrived.

     

     

    Thanks for the good and detailed answer (especially the part about the _cpp, in case I ever need to know that).

    To let you know how I finally resolved this (essentially in line with your first recommendation about the heartbeat), I actually have a Custom task now just prior to the HDFSFileSink in my 'real' code, since I also need to add in CSV-style headers whenever I am first making a file (either by heartbeat or by amount of rows [as a proxy for figuring out if I'm in the buffer size]). In that Custom logic, I'm also checking to see if I've sent at least one row of data through, and if not I'm suppressing my punctuations.

    I had considered leaving it up to an init delay, but since in my real code the delays for my different data contributors are customizable, I didn't want to leave it up to people to "remember" to always make this delay longer than the others, hence the check in the Custom logic (along with the custom logic for handling spitting out headers when appropriate).

    Thanks again for the detailed response!

    d

     

     

  • KrisWH
    KrisWH
    13 Posts

    Re: Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

    ‏2013-10-23T20:05:53Z  
    • david.cyr
    • ‏2013-10-23T19:01:26Z

    Thanks for the good and detailed answer (especially the part about the _cpp, in case I ever need to know that).

    To let you know how I finally resolved this (essentially in line with your first recommendation about the heartbeat), I actually have a Custom task now just prior to the HDFSFileSink in my 'real' code, since I also need to add in CSV-style headers whenever I am first making a file (either by heartbeat or by amount of rows [as a proxy for figuring out if I'm in the buffer size]). In that Custom logic, I'm also checking to see if I've sent at least one row of data through, and if not I'm suppressing my punctuations.

    I had considered leaving it up to an init delay, but since in my real code the delays for my different data contributors are customizable, I didn't want to leave it up to people to "remember" to always make this delay longer than the others, hence the check in the Custom logic (along with the custom logic for handling spitting out headers when appropriate).

    Thanks again for the detailed response!

    d

     

     

    Glad to help!

    It sounds like your custom operator approach makes sense, especially since you are already adding header rows.  However, if you want to go back to the initDelay approach,  it may help to use expressions when specifying parameters.   So if you have:

    composite ScratchHDFSAndTimer
    {
    param

    expression<float64> $dataInitDelay: 60.0;
    expression<float64> $otherInitDelay: 5.0;

    at the top of your file, then in your data beacon, you specify the delay using that variable:

    initDelay : $dataInitDelay ;

    for the heartbeat delay, you can do (for example):

    initDelay : max($dataInitDelay,$otherInitDelay) + 5.0;

    Then the heartbeat delay will always be greater either of the two other delays. 

  • david.cyr
    david.cyr
    20 Posts

    Re: Can I avoid the initial write of a 0 byte file when using timed punctuation flushes with HDFSFileSink operator?

    ‏2013-10-23T20:16:10Z  
    • KrisWH
    • ‏2013-10-23T20:05:53Z

    Glad to help!

    It sounds like your custom operator approach makes sense, especially since you are already adding header rows.  However, if you want to go back to the initDelay approach,  it may help to use expressions when specifying parameters.   So if you have:

    composite ScratchHDFSAndTimer
    {
    param

    expression<float64> $dataInitDelay: 60.0;
    expression<float64> $otherInitDelay: 5.0;

    at the top of your file, then in your data beacon, you specify the delay using that variable:

    initDelay : $dataInitDelay ;

    for the heartbeat delay, you can do (for example):

    initDelay : max($dataInitDelay,$otherInitDelay) + 5.0;

    Then the heartbeat delay will always be greater either of the two other delays. 

    That's really cool. I'll remember that trick :)

     

    I do have my externalized/submission parms expressed as expressions, so I could do that. I really like the max +5.0 trick.

     

    thanks again

    d