Implementing tuple load balancing

SPL standard and specialized toolkits > spl 1.4.0 > spl.utility > Gate > Implementing tuple load balancing

A ThreadedSplit operator can be used to split a stream and process tuples in parallel. This method works for many applications. If the processing time of a tuple varies considerably depending on the tuple data, however, it might cause problems when a tuple with a long processing time causes subsequent tuples to be backed up in the stream that is waiting for processing. This problem can occur even though there might be another thread available and idle. This problem can be aggravated by tuples that are in a TCP/IP connection to another processing element (PE).

To ensure that the load is balanced, a ThreadedSplit operator with a buffer size of 1 can be tied to two or more Gate operators with a maximum unacknowledged tuple count of 1 or more. The ThreadedSplit and the Gate operators must be on the same PE to avoid tuples queued between PEs. To put operators on the same PE, use a partitionColocation placement config.

The Gate operator allows only the specified number of tuples to pass at a time. It waits until a subsequent operator acknowledges receipt of the tuple before it passes the next tuple. In this manner, no tuple can ever be queued behind another, waiting to be processed.

The following example demonstrates balancing the load:


composite Main {                                                                               
  graph                                                                                        
    // Generate a stream of data to process                                                    
    stream<uint32 i> I = Beacon(){                                                             
      logic                                                                                    
        state : mutable uint32 c = 0 ;                                                         
    output                                                                                     
        I : i = c ++ ;                                                                         
    }                                                                                          
                                                                                           
    // Split the stream into 2 streams.  Use a following Gate to ensure load balancing         
    (stream<I> X ; stream<I> Y)= ThreadedSplit(I){                                             
      param                                                                                    
        bufferSize : 1u ;                                                                      
      config                                                                                   
        placement : partitionColocation("Split"), // ensure same PE as the Gates               
                    partitionExlocation("Process");                                            
    }                                                                                          
                                                                                           
    stream<I> O0 = Gate(X ; Control0){                                                 
      param                                                                                    
        maxUnackedTupleCount : 1u ;                                                            
      config                                                                                   
        placement : partitionColocation("Split"); 
        // ensure same PE as ThreadedSplit                                                    
    }                                                                                          
                                                                                       
    stream<I> O1 = Gate(Y ; Control1){                                                   
      param                                                                                    
        maxUnackedTupleCount : 1u ;                                                            
      config                                                                                   
        placement : partitionColocation("Split"); 
        // ensure same PE as ThreadedSplit                                                                                              
    }                                                                                          
                                                                                           
    (stream<I> R0 as out ; stream<uint32 i> Control0 as control)= Custom(O0 as In){            
      logic                                                                                    
        onTuple In : {                                                                         
          // do some processing                                                                
          submit(In, out); // forward tuple                                                    
          submit({ i = 1u }, control);                                                         
        }                                                                                      
        // Place on a different PE from Gate or other processing operator                      
                                                                                           
      config                                                                                   
        placement : partitionExlocation("Process");                                            
    }                                                                                          
                                                                                           
    (stream<I> R1 as out ; stream<uint32 i> Control1 as control)= Custom(O1 as In){
      logic                                                                                    
        onTuple In : {                                                                         
          // do some processing                                                                
          submit(In, out); // forward tuple                                                    
          submit({ i = 1u }, control);                                                         
        }                                                                                      
        // Place on a different PE from Gate and other processing operator                     
                                                                                           
      config                                                                                   
        placement : partitionExlocation("Process");                                            
    }                                                                                          
}