Topic
4 replies Latest Post - ‏2013-02-06T16:40:54Z by Jim Sharpe
Jim Sharpe
Jim Sharpe
98 Posts
ACCEPTED ANSWER

Pinned topic Aggregate CountDistinct with a time based trigger?

‏2013-02-05T23:20:46Z |
I've bumped into an issue with the Aggregate function CountDistinct when using a time based sliding window trigger.

Specifically it doesn't output anything until it's received it's first tuple (after that it behaves as expected). I can understand that the aggregate operator would not output anything until a tuple arrives if the trigger were count based, but with a time based trigger logically I expected it to output zeros as soon as the first time trigger had been reached but before any tuples had been received. After all the window is empty so the number of distinct values it contains is correctly described as being zero and remains so each time the trigger time is reached. If existing behavior is as designed (which IMO wouldn't make sense)then what are my options to work around the behavior? In my actual use case there are downstream operations that need to receive a count of how many distinct values have been produced on this stream, even if none have been produced yet (it's a relatively rare event that produces tuples on this stream). I thought about using a combination of other standard operators to generate zeros until a tuple eventually arrives on the target stream, but that seemed a convoluted way to work around a behavior that at least on the surface appears to be a bug.

The small test application below demonstrates the behavior. Nothing gets printed until after the initDelay period and after that both print statements produce output. (If it matters this is with 64 bit Streams v3.0 on RHEL 6):


namespace application;

composite countTest {
graph
stream<uint32 val> values = Beacon() {
param
iterations : 2000 ;
period : 1.0 ;
initDelay : 30.0 ;
output
values : val = (uint32)(random() * 20.0) ;
}

stream <uint32 v2> otherValues = Functor(values) {
logic
onTuple values :
{
println("beacon output:" + (rstring)val) ;
}
output otherValues : v2 = val / 2u ;
}

stream<int32 valCount> distinctValues = Aggregate(otherValues)
{
window
otherValues : sliding, time(1000), time(1) ;
output
distinctValues : valCount = CountDistinct(v2) ;
}
  • hnasgaard
    hnasgaard
    200 Posts
    ACCEPTED ANSWER

    Re: Aggregate CountDistinct with a time based trigger?

    ‏2013-02-06T12:08:28Z  in response to Jim Sharpe
    Try specifying the aggregateIncompleteWindows parameter.
    • Jim Sharpe
      Jim Sharpe
      98 Posts
      ACCEPTED ANSWER

      Re: Aggregate CountDistinct with a time based trigger?

      ‏2013-02-06T13:25:37Z  in response to hnasgaard
      Hi Howard,

      Good catch. The version of the test program I had posted was slightly older and didn't include that setting which I had subsequently added(as below). Unfortunately it doesn't change the behavior. Even with the value set to true nothing comes out until the first tuple arrives.

      stream<int32 valCount> distinctValues = Aggregate(otherValues)
      {
      window
      otherValues : sliding, time(1000), time(1) ;
      param
      aggregateIncompleteWindows : true ;
      output
      distinctValues : valCount = CountDistinct(v2) ;
      }
      • hnasgaard
        hnasgaard
        200 Posts
        ACCEPTED ANSWER

        Re: Aggregate CountDistinct with a time based trigger?

        ‏2013-02-06T14:09:34Z  in response to Jim Sharpe
        I dug a little deeper. It is actually behaving as described. Try the following:
        
        composite Main 
        { graph stream<uint32 val> values = Beacon() 
        { param iterations : 2000 ; period : 1.0 ; initDelay : 30.0 ; output values : val = (uint32)(random() * 20.0) ; 
        }   stream <uint32 v2> otherValues = Functor(values) 
        { logic onTuple values : 
        { println(
        "beacon output:" + (rstring)val) ; 
        } output otherValues : v2 = val / 2u ; 
        }   stream<int32 valCount> distinctValues = Aggregate(otherValues) 
        { window otherValues : sliding, time(1000), time(1) ; param aggregateIncompleteWindows : 
        
        true; output distinctValues : valCount = CountDistinct(v2) ; 
        }   () as O = Custom(distinctValues) 
        { logic onTuple distinctValues : println(
        "aggregate output: " + (rstring)valCount); onPunct distinctValues : println(
        "aggregate output: punct recieved"); 
        } 
        }
        

        Note that I've added the aggregateIncompleteWindows and a Custom after the aggregate that outputs on both tuple and punct. You will see that you get window punct on every trigger starting immediately. Referring to the toolkit ref,
        http://pic.dhe.ibm.com/infocenter/streams/v3r0/index.jsp?topic=%2Fcom.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc%2Fdoc%2Faggregate.html the behavior is "Both for tumbling and sliding windows, when a time-based window with no tuples in it fires, just a window marker punctuation is output."
        Hope that helps.
        • Jim Sharpe
          Jim Sharpe
          98 Posts
          ACCEPTED ANSWER

          Re: Aggregate CountDistinct with a time based trigger?

          ‏2013-02-06T16:40:54Z  in response to hnasgaard
          So it turns out the answer was right there in the documentation all the time. In hindsight I should have paid more attention to what I swear I had reread at least a dozen times. I think what threw me was the statement "This rule does not apply to sliding windows with time-based trigger policies. Such windows are assumed to be full when they start out." My mind pretty much shut down after accepting that description as it makes it sound like there never will be such a thing as an incomplete window for a sliding window with a time based trigger, and therefore the aggregateIncompleteWindows shouldn't really be necessary, ...except that the very next paragraph says it is. And then on top of that confusion what it outputs isn't a tuple with an attribute representing the value of the aggregate count but rather a window punctuation.

          While I still think it's a bit odd to receive a punctuation instead of a zero when aggregateIncompleteWindows is set to true, at least that behavior is indeed documented and allows the simple workaround of a single extra operator as you illustrated.

          Using your pattern I modified the original example to that below and it now behaves as I need. I.e., zeros come out as the the empty window is triggered and then after the first tuple is received the counts reflect the distinct values. Thanks for your help. I've marked the question answered.



          stream<int32 valCount> distinctValues = Aggregate(otherValues)
          {
          window
          otherValues : sliding, time(1000), time(1) ;
          param
          aggregateIncompleteWindows : true ;
          output
          distinctValues : valCount = CountDistinct(v2) ;
          }

          stream <int32 c> counts = Custom(distinctValues) {
          logic
          onTuple distinctValues : submit({c=valCount}, counts) ;
          onPunct distinctValues : submit({c=0}, counts) ;
          }