Operator DeDuplicate

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.utility$DeDuplicate.svg

The DeDuplicate operator suppresses duplicate tuples that are seen within a specified time period.

Checkpointed data

When the DeDuplicate operator is checkpointed, it checkpoints the most recently seen tuples according to the given 'timeOut', 'count', or 'delta' value. Logic state variables (if present) are also included in checkpoint.

Behavior in a consistent region

The DeDuplicate operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. The DeDuplicate operator checkpoints and resets the most recently seen tuples according to the given 'timeOut', 'count', or 'delta' value. Any logic state variables (if present) are also automatically checkpointed and resetted. The behavior of the 'timeOut' parameter does not change. This means that the time spent draining, checkpointing, and resetting the consistent region are part of the elapsed time accounted for evicting tuples from the operator.

Checkpointing behavior in an autonomous region

When the DeDuplicate operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the DeDuplicate operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Examples

This example uses the DeDuplicate operator.


composite Main {                                                                                 
  graph                                                                                          
    stream<rstring name, uint32 age> Data = Beacon () {                                          
        param period : 0.1;                                                                      
        output                                                                                   
            Data : name = "test", age = (uint32)(random()*80.0);                                 
    }                                                                                            
                                                                                             
    // only pass tuples that have unique ages in the last minute                                 
    stream<rstring name, uint32 age> Out = DeDuplicate(Data)                                     
    {                                                                                            
      param                                                                                      
        timeOut : 60.0;  // only remember for 60 seconds                                         
        key : age / 3u;  // expression to be used for checking duplicates                        
    }                                                                                            
}                                                                                                 

//detect duplicates based on couple of attribute values                                          
// Duplicated data will go to the Duplicates stream                                              
(stream<rstring name , rstring country> Out; stream<Data> Duplicates) = DeDuplicate (Data) { 
  param timeOut : 60.0;                                                                          
  key : name, country; //duplicate checking uses both name and country.                          
  // "Harvey", "Canada" is not a duplicate of "Harvey", "USA"                                    
}                                                                                                 

Summary

Ports
This operator has 1 input port and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 7 parameters.

Optional: count, delta, deltaAttribute, flushOnPunctuation, key, resetOnDuplicate, timeOut

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The DeDuplicate operator has one input port, which ingests the possibly duplicated tuples.

Properties

Output Ports

Ports (0)

The DeDuplicate operator has one or two output ports. The first output port receives the tuples that are not duplicates for the criteria.

The first output port allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the operator expects all output tuple attributes to be completely assigned.

Assignments
This port set allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.

Properties

Ports (1)

The second output port (if present) receives the tuples that are duplicates for the criteria. The stream type of the second output port must match that of the input port.

Assignments
This port set does not allow assignments to output attributes.

Properties

Parameters

This operator supports 7 parameters.

Optional: count, delta, deltaAttribute, flushOnPunctuation, key, resetOnDuplicate, timeOut

count

Specifies that identical tuples are suppressed within the next count tuples that are received. This parameter cannot be used with the timeOut or deltaAttribute parameters.

Properties

delta

Specifies the difference between the original value of the deltaAttribute parameter on an unduplicated tuple and that of the current tuple. If the difference is greater than the delta parameter value, the tuple is emitted.

Properties

deltaAttribute

Specifies the input attribute that is used to control the suppression of duplicates. This parameter is analogous to the delta function for window clauses. Together with the delta parameter, duplicate tuples are suppressed unless the original value of the deltaAttribute parameter is increased more than the delta value. This parameter is of type integral, floating point, enum, or time stamp.

This parameter cannot be used with the count or timeOut parameters.

Properties

flushOnPunctuation

Specifies whether punctuation causes the operator to forget all history of remembered tuples. If this parameter is not specified, the default value is false. If the parameter value is true, all remembered keys are erased when punctuation is received.

Properties

key

Specifies a list of expressions that are used to determine whether a tuple is a duplicate. If this parameter is omitted, the whole tuple is used as the key.

Properties

resetOnDuplicate

Specifies whether a duplicate tuple that is suppressed causes the timeOut, count, or deltaAttribute on the saved tuple to be reset to that of the current value. The timeOut resets the time to the current time. The count resets to the current tuple number. The deltaAttribute resets to the current value of the attribute.

If this parameter is not specified, the default value is true.

Properties

timeOut

Specifies the number of seconds during which no duplicate of a tuple is emitted. If this parameter is not specified, the default value is 600.0 seconds (10 minutes). Identical tuples, which are separated by more than timeOut seconds, are seen on the output port.

This parameter cannot be used with the count or deltaAttribute parameters.

Note: Tuples are retained by the DeDuplicate operator until timeOut seconds elapse, count tuples are processed, or until delta is large enough. If the rate of incoming unique tuples is large, large values of these parameters might cause the operator to occupy a huge amount of memory.

Properties

Code Templates

DeDuplicate

stream<${schema}> ${outputStream} = DeDuplicate(${inputStream})   {
            param
                ${parameter}:${parameterExpression};
            output
                ${outputStream} : ${outputExpression};
        }
      

DeDuplicate with Suppressed Tuples

(stream<${schema}> ${outputStream}; stream<${inputStream}> ${outputStream2}) = DeDuplicate(${inputStream})   {
            param
                ${parameter}:${parameterExpression};
            output
                ${outputStream} : ${outputExpression};
        }