Topic
  • 9 replies
  • Latest Post - ‏2013-10-25T09:42:24Z by RajeshKumarMandal
RajeshKumarMandal
RajeshKumarMandal
21 Posts

Pinned topic Extract data from subsequent records using Join or Map

‏2013-10-24T10:22:50Z |

Hello,

I have 2 types of streaming input data as below:

Start message schema: MSISDN, IMEI

Stop message schema: MSISDN, IMSI

I want to process both streams and extract output as MSISDN, IMEI, IMSI

I have found this can be done using join operator as output port is mutating. Also using a custom operator where we can use a Map.

Case 1:

In case of join we are planning to use below window configuration:

Start message : port 1 : sliding, time (1800)

Start message : port 2 : sliding, count (1)

Using above we shall keep Start message for some time and once we get stop messages join operation will be performed and output will be sent.

Case 2:

In case of custom operator we shall use 2 input ports

Start message port 1

Stop message port 2

We shall insert all tuples from port 1 into a map. Once we have a tuple in port 2 we shall check in map and send a output along with removal of matching key removal in map.

From both functionality join or custom using map, which one will have good performance.

Or is there any other good way to process this ?

Thanks,

Rajesh

  • Kevin_Foster
    Kevin_Foster
    98 Posts
    ACCEPTED ANSWER

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-24T20:17:44Z  

    Hello Kevin,

    Thanks for your reply.

    The input data which we are expecting is AAA data from RADIUS server with Accounting Start and Accounting stop messages.

    I agree with count(0) on second stream in case of going with join.

    The 30 min eviction policy is just arbitary in my question which shall be modified accordingly.

    In case of using custom operator with map, how many records can be saved in a map object?

    Assuming if we can have more than 300 million events (4-5 events per second) from RADIUS per day and the we have to provide results every 15 minutes (4500 events in 15 minutes) can the map save this much amount of data in memory.

    What are map limitations for such huge data volumes ?

    Thanks,

    Rajesh

     

    You would only store the start tuple in the map until the corresponding stop tuple arrived. When the stop tuple arrives you remove the start tuple from the map and also submit a result tuple from your Custom.

    You will probably also want to periodically scroll through the entire map looking for older start tuples where you never received a stop tuple and then delete them. I would do that once per day triggered by a Beacon whose output is sent to an added input port on the Custom. I'd pick a time of day with least activity, but that's not required.

    You can store millions of entries in a map just fine. Total size will depend on the size of your tuple, but as a rough estimate for a handful of attributes per tuple you can assume 200 bytes per stored tuple. Then measure the actual memory usage of the corresponding PE when you do your load/stress testing.

    -Kevin

  • wbratton
    wbratton
    76 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-24T15:58:21Z  

    We will research your question and get back to you.

  • Kevin_Foster
    Kevin_Foster
    98 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-24T17:37:27Z  
    • wbratton
    • ‏2013-10-24T15:58:21Z

    We will research your question and get back to you.

    Some initial thoughts:

    Drop any tuple attributes that you don't need before the Join or Custom. Smaller tuples use less memory.

    Joins:

    A join should be fine if you are sure that the start data will always arrive before the stop data? Are you positive?

    If so, then you can use count(0) on the second stream of stop data for what is called a "one sided join".

    I assume that your call data records are all 30 minutes or less, so I would increase your time duration on the start data stream to ensure that you retain the start data for long enough to match with any delays of stop data. (If you have enough memory then maybe double the duration.)

    If there is any chance that your stop data could arrive before your start data (file creation timing for example) then you should not use a one-sided join, and instead need a time duration on both streams.

    Custom:

    A Custom operator is still an option of course, and would allow you to highly customize the retention and removal of start and stop entries in corresponding start and stop entry maps. I would use this method if I could not always predict the arrival time of either start or stop data, or if the memory requirements of the Join grew too large and I needed to reduce my memory footprint.

    -Kevin

  • RajeshKumarMandal
    RajeshKumarMandal
    21 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-24T18:30:07Z  

    Some initial thoughts:

    Drop any tuple attributes that you don't need before the Join or Custom. Smaller tuples use less memory.

    Joins:

    A join should be fine if you are sure that the start data will always arrive before the stop data? Are you positive?

    If so, then you can use count(0) on the second stream of stop data for what is called a "one sided join".

    I assume that your call data records are all 30 minutes or less, so I would increase your time duration on the start data stream to ensure that you retain the start data for long enough to match with any delays of stop data. (If you have enough memory then maybe double the duration.)

    If there is any chance that your stop data could arrive before your start data (file creation timing for example) then you should not use a one-sided join, and instead need a time duration on both streams.

    Custom:

    A Custom operator is still an option of course, and would allow you to highly customize the retention and removal of start and stop entries in corresponding start and stop entry maps. I would use this method if I could not always predict the arrival time of either start or stop data, or if the memory requirements of the Join grew too large and I needed to reduce my memory footprint.

    -Kevin

    Hello Kevin,

    Thanks for your reply.

    The input data which we are expecting is AAA data from RADIUS server with Accounting Start and Accounting stop messages.

    I agree with count(0) on second stream in case of going with join.

    The 30 min eviction policy is just arbitary in my question which shall be modified accordingly.

    In case of using custom operator with map, how many records can be saved in a map object?

    Assuming if we can have more than 300 million events (4-5 events per second) from RADIUS per day and the we have to provide results every 15 minutes (4500 events in 15 minutes) can the map save this much amount of data in memory.

    What are map limitations for such huge data volumes ?

    Thanks,

    Rajesh

     

  • Kevin_Foster
    Kevin_Foster
    98 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-24T20:17:44Z  

    Hello Kevin,

    Thanks for your reply.

    The input data which we are expecting is AAA data from RADIUS server with Accounting Start and Accounting stop messages.

    I agree with count(0) on second stream in case of going with join.

    The 30 min eviction policy is just arbitary in my question which shall be modified accordingly.

    In case of using custom operator with map, how many records can be saved in a map object?

    Assuming if we can have more than 300 million events (4-5 events per second) from RADIUS per day and the we have to provide results every 15 minutes (4500 events in 15 minutes) can the map save this much amount of data in memory.

    What are map limitations for such huge data volumes ?

    Thanks,

    Rajesh

     

    You would only store the start tuple in the map until the corresponding stop tuple arrived. When the stop tuple arrives you remove the start tuple from the map and also submit a result tuple from your Custom.

    You will probably also want to periodically scroll through the entire map looking for older start tuples where you never received a stop tuple and then delete them. I would do that once per day triggered by a Beacon whose output is sent to an added input port on the Custom. I'd pick a time of day with least activity, but that's not required.

    You can store millions of entries in a map just fine. Total size will depend on the size of your tuple, but as a rough estimate for a handful of attributes per tuple you can assume 200 bytes per stored tuple. Then measure the actual memory usage of the corresponding PE when you do your load/stress testing.

    -Kevin

  • RajeshKumarMandal
    RajeshKumarMandal
    21 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-25T03:49:47Z  

    You would only store the start tuple in the map until the corresponding stop tuple arrived. When the stop tuple arrives you remove the start tuple from the map and also submit a result tuple from your Custom.

    You will probably also want to periodically scroll through the entire map looking for older start tuples where you never received a stop tuple and then delete them. I would do that once per day triggered by a Beacon whose output is sent to an added input port on the Custom. I'd pick a time of day with least activity, but that's not required.

    You can store millions of entries in a map just fine. Total size will depend on the size of your tuple, but as a rough estimate for a handful of attributes per tuple you can assume 200 bytes per stored tuple. Then measure the actual memory usage of the corresponding PE when you do your load/stress testing.

    -Kevin

    Hello Kevin,

    Thanks for the reply again.

    I would go ahead with my development and check the performance with both join and custom operator as suggested for our input data and choose one with good results in performance.

    Will post further queries in case of any issues.

    Thanks,

    Rajesh

  • AnuragDubey
    AnuragDubey
    7 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-25T04:59:20Z  

    You would only store the start tuple in the map until the corresponding stop tuple arrived. When the stop tuple arrives you remove the start tuple from the map and also submit a result tuple from your Custom.

    You will probably also want to periodically scroll through the entire map looking for older start tuples where you never received a stop tuple and then delete them. I would do that once per day triggered by a Beacon whose output is sent to an added input port on the Custom. I'd pick a time of day with least activity, but that's not required.

    You can store millions of entries in a map just fine. Total size will depend on the size of your tuple, but as a rough estimate for a handful of attributes per tuple you can assume 200 bytes per stored tuple. Then measure the actual memory usage of the corresponding PE when you do your load/stress testing.

    -Kevin

    Kevin,

    Just a question, can we use dynamic filter operator in such a scenario. Here key will be from stream1 and then we keep on using addkey from stream2. And in logic of stream1 and stream2 we can send out concanated stream.

     

    -Anurag

  • Kevin_Foster
    Kevin_Foster
    98 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-25T05:21:47Z  

    Kevin,

    Just a question, can we use dynamic filter operator in such a scenario. Here key will be from stream1 and then we keep on using addkey from stream2. And in logic of stream1 and stream2 we can send out concanated stream.

     

    -Anurag

    I've never seen filtering used that way, so I'm not sure. Try it maybe, but be sure to use realistic volumes of data and normal application data rates.

    And please post the resulting code if it works for you.  :-)

    -Kevin

  • RajeshKumarMandal
    RajeshKumarMandal
    21 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-25T09:41:07Z  

    Kevin,

    Just a question, can we use dynamic filter operator in such a scenario. Here key will be from stream1 and then we keep on using addkey from stream2. And in logic of stream1 and stream2 we can send out concanated stream.

     

    -Anurag

    Hello Anurag,

    The problem with dynamic filter is output port is not mutating.

    So the output port schema will be same as that of first input port schema.

    Thanks,

    Rajesh

  • RajeshKumarMandal
    RajeshKumarMandal
    21 Posts

    Re: Extract data from subsequent records using Join or Map

    ‏2013-10-25T09:42:24Z  

    Hello Anurag,

    The problem with dynamic filter is output port is not mutating.

    So the output port schema will be same as that of first input port schema.

    Thanks,

    Rajesh

    Hello,

    If the schema is not a concern then Dynamic filter is more fast compared to join.

    Thanks,

    Rajesh