Topic
  • 5 replies
  • Latest Post - ‏2012-10-15T12:36:50Z by hnasgaard
LeBuiHung
LeBuiHung
9 Posts

Pinned topic Sticky situation with Sort

‏2012-10-11T11:36:22Z |
Here is my code:

stream<smsSchema> sortedStream= Sort(inputStream as inTuple)
{
window
inTuple : sliding, count(10) ;
param
sortBy : SERIALNO ;
}

let's say we have 100 tuples for inputStream. I can only have as much as 90 tuples out from sortedStream (as 10 tuples will be in the queued count(10)).

Is there any suggestion in this case ?

Many thanks in advance.

Hung.
  • hnasgaard
    hnasgaard
    200 Posts

    Re: Sticky situation with Sort

    ‏2012-10-11T12:06:37Z  
    Since your window is sliding, assuming you are getting final punctuation on the input stream, the window should be flushed. Is your input stream one that will provide final punctuation?
  • LeBuiHung
    LeBuiHung
    9 Posts

    Re: Sticky situation with Sort

    ‏2012-10-12T01:27:13Z  
    • hnasgaard
    • ‏2012-10-11T12:06:37Z
    Since your window is sliding, assuming you are getting final punctuation on the input stream, the window should be flushed. Is your input stream one that will provide final punctuation?
    This is the code for my source:
    stream<smsRawSchema> smsRawStr = FileSource(*smsFilename*)
    {
    param
    format : txt;
    parsing : permissive ;
    }

    stream<smsSchema> smsSorted = Sort(smsRawStr as inTuple)
    {
    window
    inTuple : sliding, count(10) ;
    param
    sortBy : SERIALNO ;
    }
    Since FileSource has input ports, it does not generate final punctuations. (source: http://publib.boulder.ibm.com/infocenter/streams/v2r0/index.jsp?topic=%2Fcom.ibm.swg.im.infosphere.streams.spl-bp-operator-development.doc%2Fstr_punct.html)
  • hnasgaard
    hnasgaard
    200 Posts

    Re: Sticky situation with Sort

    ‏2012-10-12T11:53:51Z  
    • LeBuiHung
    • ‏2012-10-12T01:27:13Z
    This is the code for my source:
    stream<smsRawSchema> smsRawStr = FileSource(*smsFilename*)
    {
    param
    format : txt;
    parsing : permissive ;
    }

    stream<smsSchema> smsSorted = Sort(smsRawStr as inTuple)
    {
    window
    inTuple : sliding, count(10) ;
    param
    sortBy : SERIALNO ;
    }
    Since FileSource has input ports, it does not generate final punctuations. (source: http://publib.boulder.ibm.com/infocenter/streams/v2r0/index.jsp?topic=%2Fcom.ibm.swg.im.infosphere.streams.spl-bp-operator-development.doc%2Fstr_punct.html)
    The FileSource will generate final punctuation if it gets final punct on its input port. For example, in the following code:
    
    composite Main() 
    { graph stream<rstring file> Beat = Beacon() 
    { param iterations : 1; output Beat : file = 
    "data.in"; 
    } 
    /* stream<rstring file> Beat = DirectoryScan() { param directory : "/homes/hny9/nasgaard/forum/sort/data"; } */ stream<rstring name, int32 age> In = FileSource(Beat) 
    { 
    } stream<rstring name, int32 age> Sorted = Sort(In) 
    { window In : sliding, count(5); param sortBy : name; 
    } () as O = Custom(Sorted) 
    { logic onTuple Sorted : println(name); 
    } 
    }
    

    when a Beacon is used to drive it, the Beacon generates final punctuation when its done. This ripples through and causes the Sort's window to flush. On the other hand, if you use DirectoryScan it will not generate final punct because it never finishes scanning. Do you have a finite set of files to read? What is driving the FileSource?
  • LeBuiHung
    LeBuiHung
    9 Posts

    Re: Sticky situation with Sort

    ‏2012-10-15T00:43:01Z  
    • hnasgaard
    • ‏2012-10-12T11:53:51Z
    The FileSource will generate final punctuation if it gets final punct on its input port. For example, in the following code:
    <pre class="jive-pre"> composite Main() { graph stream<rstring file> Beat = Beacon() { param iterations : 1; output Beat : file = "data.in"; } /* stream<rstring file> Beat = DirectoryScan() { param directory : "/homes/hny9/nasgaard/forum/sort/data"; } */ stream<rstring name, int32 age> In = FileSource(Beat) { } stream<rstring name, int32 age> Sorted = Sort(In) { window In : sliding, count(5); param sortBy : name; } () as O = Custom(Sorted) { logic onTuple Sorted : println(name); } } </pre>
    when a Beacon is used to drive it, the Beacon generates final punctuation when its done. This ripples through and causes the Sort's window to flush. On the other hand, if you use DirectoryScan it will not generate final punct because it never finishes scanning. Do you have a finite set of files to read? What is driving the FileSource?
    I don't have a finite set of files to read. That's the reason why I have to use DirectoryScan operator to read. Without the FinalMarker, Join does not work as well :(

    composite Main {
    graph
    stream<rstring HOUR, rstring CALLINGPARTYNUMBER, int32 COUNT,int32 CHARGEFROMPREPAID, rstring RESULTCODE> smsHourly =
    FileSource()
    {
    param
    file : "/home/stream/workspace/CDR/data/smsHourly";
    format : csv ;
    parsing : permissive ;
    }

    stream<rstring name> dynamicCallingNumberWatchlistFile = DirectoryScan()
    {
    param
    directory : getSubmissionTimeValue("CDRfiles") ;
    pattern : "^dynamicCallingNumberWatchlist" ;
    sleepTime : 5.0 ;
    initDelay : 1.0 ;
    }

    stream<rstring PHONENUMBER> dynamicCallingNumberWatchlist =
    FileSource(dynamicCallingNumberWatchlistFile)
    {
    param
    //file : "/home/stream/workspace/CDR/data/dynamicCallingNumberWatchlist";
    format : csv ;
    parsing : permissive ;
    }

    stream<rstring PHONENUMBER, rstring HOUR, int32 COUNT>
    callerWatch = Join(dynamicCallingNumberWatchlist as L ; smsHourly as R)
    {
    window
    L : sliding, count(100) ;
    R : sliding, count(0) ;
    param
    equalityLHS : PHONENUMBER ;
    equalityRHS : CALLINGPARTYNUMBER ;
    }

    () as callerWatchSink = FileSink(callerWatch)
    {
    param
    file : "callerWatchlist" ;
    flush : 1u ;
    }
    }
  • hnasgaard
    hnasgaard
    200 Posts

    Re: Sticky situation with Sort

    ‏2012-10-15T12:36:50Z  
    • LeBuiHung
    • ‏2012-10-15T00:43:01Z
    I don't have a finite set of files to read. That's the reason why I have to use DirectoryScan operator to read. Without the FinalMarker, Join does not work as well :(

    composite Main {
    graph
    stream<rstring HOUR, rstring CALLINGPARTYNUMBER, int32 COUNT,int32 CHARGEFROMPREPAID, rstring RESULTCODE> smsHourly =
    FileSource()
    {
    param
    file : "/home/stream/workspace/CDR/data/smsHourly";
    format : csv ;
    parsing : permissive ;
    }

    stream<rstring name> dynamicCallingNumberWatchlistFile = DirectoryScan()
    {
    param
    directory : getSubmissionTimeValue("CDRfiles") ;
    pattern : "^dynamicCallingNumberWatchlist" ;
    sleepTime : 5.0 ;
    initDelay : 1.0 ;
    }

    stream<rstring PHONENUMBER> dynamicCallingNumberWatchlist =
    FileSource(dynamicCallingNumberWatchlistFile)
    {
    param
    //file : "/home/stream/workspace/CDR/data/dynamicCallingNumberWatchlist";
    format : csv ;
    parsing : permissive ;
    }

    stream<rstring PHONENUMBER, rstring HOUR, int32 COUNT>
    callerWatch = Join(dynamicCallingNumberWatchlist as L ; smsHourly as R)
    {
    window
    L : sliding, count(100) ;
    R : sliding, count(0) ;
    param
    equalityLHS : PHONENUMBER ;
    equalityRHS : CALLINGPARTYNUMBER ;
    }

    () as callerWatchSink = FileSink(callerWatch)
    {
    param
    file : "callerWatchlist" ;
    flush : 1u ;
    }
    }
    These operators are working as they are intended to work. The problem seems to be that they are being driven by the DirectoryScan operator which will continue looking for new files forever. Yet you seem to want the processing to terminate and so flush the windows in the Sort/Join operators. How do you know what your terminating condition is? If there is an infinite set of files then there is no terminating condition and final punctuation will never be sent. On the other hand, if you can determine when there are no more files, perhaps you could write an operator similar to DirectoryScan that could determine a terminating condition and the send final punctuation. In a normal streaming app final punct doesn't occur because streaming apps are intended to process indefinitely. Once final punctuation has been sent no more tuples will be processed an a job has effectively terminated.