Operator UDPSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$UDPSource.svg

The UDPSource operator reads data from a UDP socket and creates tuples and punctuation out of it. Each tuple must fit into a single UDP packet and a single UDP packet must contain only a single tuple or punctuation.

The UDPSource operator accepts an optional output clause. Any SPL expression and supported custom output functions can be used in the output clause. Output attributes can be assigned values on the output clause. If they have an assignment, the expression value is assigned to the attribute and the attribute is not part of the value that is read from the source. For the line and block formats, there must be only one attribute that is not assigned by an output assignment and this attribute must have type rstring (for line format) or blob (for block format). For all other formats, any attributes that are not assigned in the output clause are read from the input by using csv, txt, or bin format.

Checkpointed data

When the UDPSource operator is checkpointed, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The UDPSource operator is not supported in a consistent region.

Checkpointing behavior in an autonomous region

When the UDPSource operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the UDPSource operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The UDPSource operator throws an exception in the following cases:
  • The host cannot be resolved.
  • The name cannot be located.
  • Unable to bind to port.
  • Unable to locate interface.

Examples

This example uses the UDPSource operator.


composite Main {                                                                                
   graph                                                                                        
     // udp socket on local host, using port for "ftp"                                          
     stream<rstring name, uint32 age, uint64 salary> Beat = UDPSource()                         
     {                                                                                          
       param                                                                                    
         port : "ftp";                                                                          
     }                                                                                           
    // udp on local host, accepts packets from "some.node.some.host" on port 23145              
    stream<Beat> Beat1 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
        address : "some.node.some.host";                                                        
        port    : 23145u;                                                                       
    }                                                                                           
    // same as above, but using a receive buffer size, and waiting 5 secs at start-up           
    stream<Beat> Beat2 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
        address           : "some.node.some.host";                                              
        port              : 23145u;                                                             
        receiveBufferSize : 10240u;                                                             
        initDelay         : 5.0;                                                                
    }                                                                                           
    // same as above but also registers its location to the name server                         
    stream<Beat> Beat3 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
         port    : 23145u;                                                                       
         name    : "my_server";                                                                  
    }                                                                                           
    // udp on local host, uses any port, registers its location to the name server              
    stream<Beat> Beat4 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
         name : "my_server";                                                                     
    }                                                                                           
    // udp on local host, uses port "ftp", registers its location to the name server            
    stream<Beat> Beat5 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
        port  : "ftp";                                                                          
        name  : "my_server";                                                                    
    }                                                                                           
    // udp on local host, accepts packets sent to a multicast address, on port 23145            
    stream<Beat> Beat6 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
        address : "some.multicast.address";                                                     
        port    : 23145u;                                                                       
    }                                                                                           
    // same as above but also registers its location to the name server                         
    stream<Beat> Beat7 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
        port : 23145u;                                                                          
        name : "my_server";                                                                     
    }                                                                                           
    // same as above but uses a specific interface for receiving packets                        
    stream<Beat> Beat8 = UDPSource()                                                            
    {                                                                                           
      param                                                                                     
        address   : "some.multicast.address";                                                   
        port      : 23145u;                                                                     
        interface : "ib1";                                                                      
    }                                                                                           
}                                                                                                

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 15 parameters.

Optional: address, compression, defaultTuple, encoding, format, hasDelayField, ignoreExtraCSVValues, initDelay, interface, name, parsing, port, readPunctuations, receiveBufferSize, separator

Metrics
This operator reports 1 metric.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
OutputFunctions
rstring RemoteIP()

Returns the address of the current UDP packet. If the name cannot be resolved, this value is a host name or a dotted IP address

uint32 RemotePort()

Returns the port number of the remote UDP packet.

uint32 ServerPort()

Returns the port number that the UDP server is listening on.

int64 TupleNumber()

Returns the number of tuples that are generated by this operator since the processing element (PE) was last started. The first tuple that is generated has number 0.

<any T> T AsIs(T)

Return the input value

Ports (0)

The UDPSource operator is configurable with a single output port, which produces tuples read from UDP packets.

Properties

Parameters

This operator supports 15 parameters.

Optional: address, compression, defaultTuple, encoding, format, hasDelayField, ignoreExtraCSVValues, initDelay, interface, name, parsing, port, readPunctuations, receiveBufferSize, separator

address

Specifies the address of the sender whose UDP packets are accepted.

If the address is a multicast address, then all UDP packets that are destined to that multicast address at the specified port are accepted regardless of the address of the sender.

If this parameter is not specified, all UDP packets that are destined at the port are accepted.

The parameter value can be a host name or an IP address.

Properties

compression

Specifies the compression mode. For more information, see the compression parameter in the FileSource operator. The compression applies to each packet, independently.

Properties

defaultTuple

Specifies the default tuple to use for missing fields. For more information, see the defaultTuple parameter in the FileSource operator.

Properties

encoding

Specifies the character set encoding of the input. For more information, see the encoding parameter in the FileSource operator. The encoding applies to each packet, independently.

Properties

format

Specifies the format of the data. For more information, see the format parameter in the FileSource operator.

The formatting applies to each packet individually. For csv, txt, bin formats, each UDP packet is a single tuple in the corresponding format. For line format, each packet contains a single line and there is no eolMarker parameter. Trailing carriage returns ("\r") and new lines ("\n") are removed from the string. For block format, each packet contains a single block and there is no blockSize parameter.

Properties

hasDelayField

Specifies whether the format contains inter-arrival delays as the first field. For more information, see the hasDelayField parameter in the FileSource operator.

Properties

ignoreExtraCSVValues

Specifies whether to skip any extra fields before the end of line when reading in CSV format. For more information, see the ignoreExtraCSValues parameter in the FileSource operator.

Properties

initDelay

Specifies the number of seconds to delay before starting to produce tuples. For more information, see the initDelay parameter in the FileSource operator.

Properties

interface

Specifies the network interface to use for UDP multicast packets or to register by using the name parameter. This parameter is only valid when the address or name parameters are specified. Using the interface parameter with the name parameter ensures that UDPSink operators with the same name parameter use the wanted interface.

Properties

name

Specifies the name that is used to register the address of the node that is hosting the operator and port pair for this operator with the name service. The corresponding UDPSink operator can look it up and send its UDP packets to the registered address at the registered port.

These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set by using config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators keep trying periodically to register the name with an error message for each failure.

Properties

parsing

Specifies the parsing behavior of the UDPSource operator. There are three valid values: strict, permissive, and fast.

When the parameter value is strict, incorrectly formatted tuples result in a runtime error and termination of the operator.

When the parameter value is permissive, incorrectly formatted tuples result in the creation of a runtime log entry and the parser continues with the next packet. This parameter value can be used with only txt, csv, and bin formats.

When the parameter value is fast, the input packet is assumed to be formatted correctly and no runtime checks are performed. Incorrect input in fast mode causes undefined behavior.

The default parsing mode is strict.

Properties

port

Specifies the port address on which the UDP packets are accepted. It takes a single value of type rstring or type uint33. The parameter value can be a well-known port alias, such as http or ftp, or a plain port number, such as 45134u.

This parameter must be specified when the name parameter is not specified; it is optional otherwise.

Properties

readPunctuations

Specifies whether to read punctuations from 'bin' format input. For more information, see the readPunctuations parameter in the FileSource operator.

Properties

receiveBufferSize

Specifies the kernel receive buffer size for the UDP socket.

Properties

separator

Specifies the separator character for the csv format. For more information, see the separator parameter in the FileSource operator.

Properties

Code Templates

UDPSource with Address

stream<${schema}> ${outputStream} = UDPSource() {
            param
                address : "${hostOrIp}";
                port : "${sourcePort}";
        }
      

UDPSource with Name

stream<${schema}> ${outputStream} = UDPSource() {
            param
                name : "${name}";
        }
      

Metrics

nInvalidTuples - Counter

The number of tuples that failed to read correctly in csv or txt format.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime, streams_boost_iostreams, streams_boost_system
Include Path: ../../../impl/include