IBM Streams 4.2.1

Operator Join

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.relational$Join.svg

The Join operator is used to correlate tuples from two streams that are based on user-specified match predicates and window configurations. When a tuple is received on an input port, it is inserted into the window corresponding to the input port, which causes the window to trigger. As part of the trigger processing, the tuple is compared against all tuples inside the window of the opposing input port. If the tuples match, then an output tuple is produced for each match. If at least one output was generated, a window punctuation is generated after all the outputs.

If you specify equalityRHS and equalityLHS parameters, the matching is done by using a hash table. Otherwise, a scan of the tuples in the window is done to find the matches.

In an outer join configuration, if a tuple does not get involved in a match during its stay in the join window, then it is sent out to an output port right before its eviction from the window. See the algorithm parameter for details.

Partitioning can be used to split the tuples into partitioned windows.

Checkpointed data

When the Join operator is checkpointed, the current state of the operator's windows is saved in checkpoint. The windows are checkpointed incrementally to reduce checkpoint size and latency. For count and delta trigger and eviction policies, the checkpoint includes the correct information to honor the policies. For example, for count, the number of tuples until an eviction or trigger is checkpointed. For an outer join, the checkpoint includes an indication of which tuples were matched. Logic state variables (if present) are also included in checkpoint.

Behavior in a consistent region

The Join operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region.

When in a consistent region, the current state of the operator is checkpointed. When the operator is reset, the current state of the windows is discarded and reloaded from the previous checkpoint. When a window with time-based trigger and eviction policies is checkpointed or reset, timers continue to trigger at the moments that are indicated by the policy. Trigger and eviction policies are disabled until the next tuple is inserted. For example, take a Join operator with a sliding window with a policy of time(10) where timers are triggered at every 10 seconds. If the operator is checkpointed at second 18 and the next tuple is inserted at second 22, the time trigger at second 20 is ignored, and the first window eviction occurs at second 30.

Checkpointing behavior in an autonomous region

When the Join operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Join operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Windowing

The Join operator supports the following window configurations for an input port:

sliding, (count | delta | time)-based eviction, count-based trigger of 1 (, partitioned (, partitionEvictionSpec)? )?

All window configurations have a count-based trigger of 1. This means that every time a tuple is received on a port, it is inserted into its window, which triggers the join processing. The newly inserted tuple is matched against the tuples resident in the window that is defined over the other input port. In case of matches, a result is output for each match and a window marker punctuation is output at the end.

For the partitioned variants, the window specification applies to individual subwindows identified by the partitionBy parameter corresponding to the port. The left input port of the join cannot have a partitioned window that is defined unless a partitionByLHS parameter is specified. Similarly, the right input port of the join cannot have a partitioned window that is defined unless a partitionByRHS parameter is specified.

Assignments

The Join operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the Join operator expects all output tuple attributes to be completely assigned.

Examples

This example uses the Join operator.

composite Main {                                                                           
  graph                                                                                    
    stream<rstring firstName, rstring lastName, uint64 salary> BeatL = Beacon() {}         
    stream<rstring name, rstring manager, rstring department> BeatR = Beacon() {}          
    // join with a match condition                                                         
    stream<BeatL, BeatR> Join1 = Join(BeatL; BeatR) {                                      
      window                                                                               
        BeatL : sliding, count(100);                                                       
        BeatR : sliding, time(10);                                                         
      param                                                                                
        match : BeatR.name == BeatL.firstName + " " + BeatL.lastName &&                    
                department == "HR";                                                        
      output                                                                               
        Join1 : salary = salary * 2ul;                                                     
    }                                                                                      
    // equi-join with an additional match condition                                        
    stream<BeatL, BeatR> Join2 = Join(BeatL; BeatR) {                                      
      window                                                                               
        BeatL : sliding, count(100);                                                       
        BeatR : sliding, time(10);                                                         
      param                                                                                
        match       : department == "HR";                                                  
        equalityLHS : BeatL.firstName + " " + BeatL.lastName;                              
        equalityRHS : name;                                                                
      output                                                                               
        Join2 : salary = salary * 2ul;                                                     
    }                                                                                      
    // equi-join with multiple equality expressions                                        
    stream<BeatL, BeatR> Join3 = Join(BeatL; BeatR) {                                      
      window                                                                               
        BeatL : sliding, count(100);                                                       
        BeatR : sliding, time(10);                                                         
      param                                                                                
        equalityLHS : BeatL.firstName + " " + BeatL.lastName, "HR";                        
        equalityRHS : name, department;                                                    
      output                                                                               
        Join3 : salary = salary * 2ul;                                                     
    }                                                                                      
    // single-sided partitioned join with a 0 sized window on the right hand side          
    // and a partitioned window of 1 on the left hand side                                 
    stream<rstring ticker, decimal64 vwap> VWAP = Beacon() {}                              
    stream<rstring ticker, decimal64 askprice, decimal64 asksize> Quote = Beacon() {}      
    stream<rstring ticker, decimal64 bargainIndex>                                         
        Bargain = Join(VWAP; Quote)                                                        
    {                                                                                      
      window                                                                               
        VWAP  : sliding, count(1), partitioned;                                            
        Quote : sliding, count(0);                                                         
      param                                                                                
        match           : vwap > askprice*100.0d;                                          
        partitionByLHS  : VWAP.ticker;                                                     
        equalityLHS     : VWAP.ticker;                                                     
        equalityRHS     : Quote.ticker;                                                    
      output                                                                               
        Bargain : bargainIndex = exp(vwap-askprice*100.0d)*asksize;                        
    }                                                                                      
    // a left outer join with single output                                                
    stream<rstring message, uint32 kind> MsgLHS = Beacon() {}                              
    stream<rstring message, uint32 kind, uint64 tm> MsgRHS = Beacon() {}                   
    stream<rstring message1, rstring message2>                                             
        Msgs1 = Join(MsgLHS as L; MsgRHS as R)                                             
    {                                                                                      
      window                                                                               
        L : sliding, count(0);                                                             
        R : sliding, delta(tm, 10ul), partitioned;                                         
      param                                                                                
        algorithm       : leftOuter;                                                       
        partitionByRHS  : R.kind;                                                          
        defaultTupleRHS : { message = "N/A", kind = 0u, tm = 0ul};                         
        equalityLHS     : L.message, L.kind;                                               
        equalityRHS     : R.message, R.kind;                                               
      output                                                                               
        Msgs1 : message1 = L.message, message2 = R.message;                                
    }                                                                                      
    // a right outer join with two outputs                                                 
    (stream<rstring message1, rstring message2> Msgs2;                                     
     stream<rstring message, uint32 kind, uint64 tm> MsgsRHS2)                             
        = Join(MsgLHS as L; MsgRHS as R)                                                   
    {                                                                                      
      window                                                                               
        L : sliding, count(0);                                                             
        R : sliding, delta(tm, 10ul), partitioned;                                         
      param                                                                                
        algorithm       : rightOuter;                                                      
        partitionByRHS  : R.kind;                                                          
        equalityLHS     : L.message;                                                       
        equalityRHS     : R.message;                                                       
      output                                                                               
        Msgs2 : message1 = L.message, message2 = R.message;                                
    }                                                                                      
    // an outer join with three outputs                                                    
    (stream<rstring message1, rstring message2> Msgs3;                                     
     stream<rstring message, uint32 kind> MsgsLHS3;                                        
     stream<rstring message, uint32 kind, uint64 tm> MsgsRHS3)                             
        = Join(MsgLHS as L; MsgRHS as R)                                                   
    {                                                                                      
       window                                                                              
        L : sliding, count(0);                                                             
        R : sliding, delta(tm, 10ul), partitioned;                                          
      param                                                                                
        algorithm       : outer;                                                           
        partitionByRHS  : R.kind;                                                          
        equalityLHS     : L.message;                                                       
        equalityRHS     : R.message;                                                       
      output                                                                               
        Msgs3 : message1 = L.message, message2 = R.message;                                
    }                                                                                      
    // an outer join with a single output.                                                 
    //Discard unreferenced partitions after 60 seconds.                                     
    stream<rstring message1, rstring message2>                                             
        Msgs4 = Join(MsgLHS as L; MsgRHS as R)                                             
    {                                                                                      
      window                                                                               
        L : sliding, count(0);                                                             
        R : sliding, delta(tm, 10ul), partitioned, partitionAge(60.0);                      
      param                                                                                
        algorithm       : outer;                                                           
        partitionByRHS  : R.kind;                                                          
        equalityLHS     : L.message;                                                       
        equalityRHS     : R.message;                                                       
      output                                                                               
        Msgs4 : message1 = L.message, message2 = R.message;                                
    }                                                                                      
}

Summary

Ports
This operator has 2 input ports and 3 output ports.
Windowing
This operator requires a windowing configuration.
Parameters
This operator supports 8 parameters.

Optional: algorithm, defaultTupleLHS, defaultTupleRHS, equalityLHS, equalityRHS, match, partitionByLHS, partitionByRHS

Metrics
This operator reports 2 metrics.

Properties

Implementation
C++
Threading
WindowEvictionBound - Operator provides a single threaded execution context only if a time-based window eviction policy is not used.

Input Ports

Ports (0...1)

The Join operator is configurable with two input ports. These ports (LHS and RHS) ingest tuples to be joined.

Properties

Output Ports

Ports (0)

The Join operator is configurable with a single output port in the case of an inner join, one or two output ports in the case of a rightOuter or leftOuter join, and one or three output ports in the case of an outer join. The Join operator generates a punctuation after each batch of joined tuples it outputs on its first output port. For all of the outer join algorithms, the batch includes evicted tuples that were never involved in a match earlier. Evicted tuples will be in separate batches for windows with a time-based eviction policy, as evictions and tuple insertions happen asynchronously.

Assignments
This port set allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.

Properties

Ports (1...2)

These optional ports (LHS and RHS) produce the evicted tuples that were not joined.

Assignments
This port set does not allow assignments to output attributes.

Properties

Parameters

This operator supports 8 parameters.

Optional: algorithm, defaultTupleLHS, defaultTupleRHS, equalityLHS, equalityRHS, match, partitionByLHS, partitionByRHS

algorithm

Specifies the join algorithm. The valid options are leftOuter, rightOuter, outer, and inner.

In a left outer join, a tuple that is being evicted from the left port's window and was never involved in a match earlier is paired with a default initialized tuple (whose attributes are default-constructed) from the right port and output. If a defaultTupleRHS parameter is specified, its value is used instead of the default constructed tuple.

A right outer join is similar, but applies to tuples that are being evicted from the right port's window and employs the defaultTupleLHS parameter if present.

An outer join is a combination of left and right outer joins.

The default for this parameter is the inner join option, which does not perform any action upon eviction of tuples.

For leftOuter and rightOuter joins, an optional second output port can be specified. In this case, the evicted tuples that have no matches are output on the second output port and are not joined with an empty tuple from the opposite window. The schema of the second output port must match that of the left input port in the case of a leftOuter join and the right input port in the case of a rightOuter join.

For an outer join, optional second and third output ports can be specified. This means that the outer join can have either one output port or three output ports. When specified, the second port is used to output evicted tuples from the left input port that have no matches and the third port is used to output the ones from the right input port. The schemas of the second and third output ports must match the schemas of the first and second input ports.

Properties

defaultTupleLHS

Specifies the tuple to be used from the left stream, for matching an expiring tuple from the right window that needs to be output as part of a right outer join or outer join algorithm. It is only valid for join operators with a single output port and those operators that have rightOuter or outer as the join algorithm. It can take a single value of tuple type, which must match the type of the tuples from the left stream.

Properties

defaultTupleRHS

Specifies the tuple to be used from the right stream for matching an expiring tuple from the left window that needs to be output as part of a left outer join or outer join algorithm. It is only valid for join operators with a single output port and those that have leftOuter or outer as the join algorithm. It can take a single value of tuple type, which must match the type of the tuples from the right stream.

Properties

equalityLHS

Specifies the equality condition expressions from the left port. The number of expressions and their types must match those specified in the equalityRHS parameter. The expressions can refer to attributes from the left input port only.

Properties

equalityRHS

Specifies equality condition expressions from the right port. The number of expressions and their types must match those specified in the equalityLHS parameter. The expressions can refer to attributes from the right input port only.

The equalityLHS and equalityRHS parameters can be used to specify equijoin match predicates, which result in using a hash-based join implementation, rather than a nested-loop one. They are not mutually exclusive with the match parameter and can be used together.

Properties

match

Specifies an expression to be used for matching the tuples. The expression might refer to attributes from both input ports. When this parameter is omitted, the default value of true is used.

Properties

partitionByLHS

Specifies one or more expressions to be used for partitioning the input tuples from the left port into subwindows, where all window and parameter configurations apply to the subwindows, independently. It can be used only if a partitioned window is defined for the left port. The expressions can refer to attributes from the left input port only.

Properties

partitionByRHS

Specifies one or more expressions to be used for partitioning the input tuples from the right port into subwindows, where all window and parameter configurations apply to the subwindows, independently. It can be only used if a partitioned window is defined for the right port. The expressions can refer to attributes from the right input port only.

Properties

Code Templates

Join
stream <${schema}> ${outputStream} as O = Join (${inputStream1} as I1; ${inputStream2} as I2) {
            window
                I1: sliding, ${windowMode1};
                I2: sliding, ${windowMode2};
            param
                match : ${matchExpression};
            output
                O: ${outputExpression};
        }
      

Join with Equality Expression
stream <${schema}> ${outputStream} as O = Join (${lhsStream} as L; ${rhsStream} as R) {
            window
                L: sliding, ${lhsWindow};
                R: sliding, ${rhsWindow};
            param
                equalityLHS : ${lhsQuery};
                equalityRHS : ${rhsQuery};
            output
                O: ${outputExpression};
        }
      

Join with One-sided Lookup
stream <${schema}> ${outputStream} as O = Join(${queryStream} as Q; ${dataStream} as D) {
            window
                Q : sliding, count(0);
                D : sliding, count(${windowSize});
            param
                equalityLHS : ${queryMatchAttributes};
                equalityRHS : ${dataMatchAttributes};
            output
                O : ${outputExpressions};
        }
      

Metrics

nCurrentPartitionsLHS - Gauge

Returns the number of partitions in the left input port window.

nCurrentPartitionsRHS - Gauge

Returns the number of partitions in the right input port window.