Topic
  • 5 replies
  • Latest Post - ‏2013-05-09T20:02:35Z by lakshgupta
lakshgupta
lakshgupta
34 Posts

Pinned topic Operate on tuples across lists in a window

‏2013-05-07T18:20:18Z |

Hi

Assuming I have 10 lists in a window, how to check for tuples over all the 10 lists based on two column values? The lists can be unequal in size.

Basically I am trying to find the average distance between a pair among many entities over a t unit sliding window. Hence for each instance of time I am thinking of :

1. collecting all the tuples for an instance of time.

2. then take a self join and find the distance. More filtering to tackle:

- association with itself <A,A>, which is not required.

- association for both the pairs <A,B> and <B,A>, which is also not required

3. Collect the list in a t  unit sliding window.

4. find the average distance of each pair considering all the lists over t unit sliding window and filter those with distance less than threshold.

 

I am running IBM InfoSphere Streams v2. Any pointers will be helpful.

  • wbratton
    wbratton
    76 Posts

    Re: Operate on tuples across lists in a window

    ‏2013-05-07T19:14:25Z  

    I will look into your questions and get back to you.

    Thanks

  • KanatT.
    KanatT.
    13 Posts

    Re: Operate on tuples across lists in a window

    ‏2013-05-08T15:48:58Z  

    First, let me make sure I understand your problem correctly:  You want to keep a window some size, say t, and you will compute the average pair-wise distance between the entries, where the distance is given by some function.  That is, if you have a window of size 3 containing

             a, b, c

    you will want to compute dist(a, b); dist(a, c); dist(b,c) -- and find the average.

    In Streams v2, the easiest solution is probably to use the Custom operator, where you will maintain your own window (using e.g. List<T> where T will be the type of each tuple, which in your case is probably also a List). When a new tuple arrives (which will activate your onTuple code), you can decide what to evict from the window and perform your pairwise distance calculation on it.   This is assuming the window is count-based (that is, your window contains the latest t entries). Handling a time-base window (i.e. everything that arrived in the past t seconds) is trickier.  In this case, you can set up a Beacon that sends you a "trigger" event on a different port.  

     

    I am happy to elaborate more once you confirm the setting you need.  You can, of course, implement this in C++ or Java, which will give you access to the windowing library, but I believe using the Custom operator is an easier route. 

    I will also note that in Streams v3, there is an easier solution: the Aggregate operator will handle the window aspect for you; all you need to do is to provide a few functions for the Custom feature. 

     

  • lakshgupta
    lakshgupta
    34 Posts

    Re: Operate on tuples across lists in a window

    ‏2013-05-09T03:20:51Z  
    • KanatT.
    • ‏2013-05-08T15:48:58Z

    First, let me make sure I understand your problem correctly:  You want to keep a window some size, say t, and you will compute the average pair-wise distance between the entries, where the distance is given by some function.  That is, if you have a window of size 3 containing

             a, b, c

    you will want to compute dist(a, b); dist(a, c); dist(b,c) -- and find the average.

    In Streams v2, the easiest solution is probably to use the Custom operator, where you will maintain your own window (using e.g. List<T> where T will be the type of each tuple, which in your case is probably also a List). When a new tuple arrives (which will activate your onTuple code), you can decide what to evict from the window and perform your pairwise distance calculation on it.   This is assuming the window is count-based (that is, your window contains the latest t entries). Handling a time-base window (i.e. everything that arrived in the past t seconds) is trickier.  In this case, you can set up a Beacon that sends you a "trigger" event on a different port.  

     

    I am happy to elaborate more once you confirm the setting you need.  You can, of course, implement this in C++ or Java, which will give you access to the windowing library, but I believe using the Custom operator is an easier route. 

    I will also note that in Streams v3, there is an easier solution: the Aggregate operator will handle the window aspect for you; all you need to do is to provide a few functions for the Custom feature. 

     

    Thank you Kanat for the reply. So considering a window of size 3 time units, assume we have:

    list at time t1=t for incoming elements a,b,c: {dist(a, b); dist(a, c); dist(b,c)}

    list at time t2=t+1for incoming elements a,c:{ dist(a, c))}

    list at time t3=t+2 for incoming elements b,c:{ dist(b, c)}

    then in the end I would like to compute:

    avg(distance between pair a and b) = dist(a,b) at t1 + dist(a,b) at t2 + dist(a,b) at t3 ... similarly for pair (a,c) and (b,c).

    Hence in the window I need to remove the list at t1 to make room for new incoming elements as well as output the list of pairs whose average distance is greater than a threshold.

    For the window I was thinking of using time-based eviction policy based on time difference where on trigger I want to calculate the average distance. Hence I need to access the same pair in 3 different list, which I am looking as a major problem.

    I guess the aggregate operator you mentioned are also present in v2:

    http://pic.dhe.ibm.com/infocenter/streams/v2r0/topic/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/aggregate.html?resultof=%22%73%6c%69%64%69%6e%67%22%20%22%73%6c%69%64%65%22%20%22%77%69%6e%64%6f%77%22%20

     

  • KanatT.
    KanatT.
    13 Posts

    Re: Operate on tuples across lists in a window

    ‏2013-05-09T15:21:32Z  

    Thank you Kanat for the reply. So considering a window of size 3 time units, assume we have:

    list at time t1=t for incoming elements a,b,c: {dist(a, b); dist(a, c); dist(b,c)}

    list at time t2=t+1for incoming elements a,c:{ dist(a, c))}

    list at time t3=t+2 for incoming elements b,c:{ dist(b, c)}

    then in the end I would like to compute:

    avg(distance between pair a and b) = dist(a,b) at t1 + dist(a,b) at t2 + dist(a,b) at t3 ... similarly for pair (a,c) and (b,c).

    Hence in the window I need to remove the list at t1 to make room for new incoming elements as well as output the list of pairs whose average distance is greater than a threshold.

    For the window I was thinking of using time-based eviction policy based on time difference where on trigger I want to calculate the average distance. Hence I need to access the same pair in 3 different list, which I am looking as a major problem.

    I guess the aggregate operator you mentioned are also present in v2:

    http://pic.dhe.ibm.com/infocenter/streams/v2r0/topic/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/aggregate.html?resultof=%22%73%6c%69%64%69%6e%67%22%20%22%73%6c%69%64%65%22%20%22%77%69%6e%64%6f%77%22%20

     

    A couple of questions to help us understand your scenario better:

    - In your example, you seem to indicate that at each time step, several elements can arrive simultaneously. What is the tuple type you're dealing with? More specifically, when you say that elements a, b, and c arrive at time t1, what do you actually see in the stream at t1 --- a tuple which itself is a List of 3 elements OR 3 tuples OR something else. 

    - What would like to keep in your window? (Everything from the past t time units; the latest t tuples that arrived; or something else)  When do you want to trigger the generation of an output tuple? (This can be different from--and even out of sync with--the eviction policy).

    You're right that the Aggregate operator is already present in v2. Though, I believe the specific feature where we can extend the Aggregate operator with user-supplied functions was added later in v3. 

  • lakshgupta
    lakshgupta
    34 Posts

    Re: Operate on tuples across lists in a window

    ‏2013-05-09T20:02:35Z  
    • KanatT.
    • ‏2013-05-09T15:21:32Z

    A couple of questions to help us understand your scenario better:

    - In your example, you seem to indicate that at each time step, several elements can arrive simultaneously. What is the tuple type you're dealing with? More specifically, when you say that elements a, b, and c arrive at time t1, what do you actually see in the stream at t1 --- a tuple which itself is a List of 3 elements OR 3 tuples OR something else. 

    - What would like to keep in your window? (Everything from the past t time units; the latest t tuples that arrived; or something else)  When do you want to trigger the generation of an output tuple? (This can be different from--and even out of sync with--the eviction policy).

    You're right that the Aggregate operator is already present in v2. Though, I believe the specific feature where we can extend the Aggregate operator with user-supplied functions was added later in v3. 

    In the start I'll receive the object ID, their longitude-latitude information along with the timestamp. I am thinking of collecting them on a delta-based window, to observe difference in time as eviction policy. As an output this 1 unit time delta-based window will produce list of pairs ,by taking a join, with the information: IDs of both the objects, timestamp, distance. I'll be sending this list to the t unit count-based (time-based?) window we have been discussing above. The size of each list may differ in this window. This t unit time window will keep the list of tuples for past t units and will generate a trigger for every new list entering the window. The output would be the list of tuple containing: both object IDs, avg distance, time.