Operator TCPSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$TCPSource.svg

The TCPSource operator reads data from a TCP socket and creates tuples out of it. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes, it handles a single connection at a time. It works with both IPv4 and IPv6 addresses.

The TCPSource operator accepts an optional output clause. Any SPL expression and supported custom output functions can be used in the output clause. Output attributes can be assigned values on the output clause. If they have an assignment, the expression value is assigned to the attribute and the attribute is not part of the value that is read from the source. For the line and block formats, there is only one attribute that is not assigned by an output assignment and this attribute must have type rstring (for line format) or blob (for block format). For all other formats, any attributes that are not assigned in the output clause are read from the input by using csv, txt, or bin format.

Checkpointed data

When the TCPSource operator is checkpointed, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The TCPSource operator is not supported in a consistent region.

Checkpointing behavior in an autonomous region

When the TCPSource operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the TCPSource operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The TCPSource operator throws an exception in the following cases:
  • The host cannot be resolved.
  • The name cannot be located.
  • Unable to set SO_REUSEADDR on TCP socket.
  • Unable to bind to port.

Examples

This example uses the TCPSource operator.


composite Main {                                                                
  graph                                                                         
    // server source with an alias string as port                               
    stream<rstring name, uint32 age, uint64 salary> Beat = TCPSource()          
    {                                                                           
      param                                                                     
        role : server;                                                          
        port : "ftp";                                                           
    }                                                                           
    // server source with a number string as port                               
    stream<Beat> Beat1 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role : server;                                                          
        port : 23145u;                                                          
        keepAlive : {time=7200, probes=9, interval=75};
    }                                                                           
    // server source with a name, registering interface eth1                     
    stream<Beat> Beat2 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role  : server;                                                         
        name : "my_server";                                                     
        interface : "eth1";                                                     
    }                                                                           
    // server source with a name and port                                       
    stream<Beat> Beat3 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role  : server;                                                         
        port  : 23145u;                                                         
        name : "my_server";                                                     
    }                                                                           
    // server source with a port and infinite reconnection                      
    stream<Beat> Beat4 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role              : server;                                             
        port              : "ftp";                                              
        reconnectionPolicy : InfiniteRetry;                                     
    }                                                                           
    // server source with a port and reconnection (5 times)                     
    stream<Beat> Beat4r = TCPSource()                                           
    {                                                                           
      param                                                                     
        role              : server;                                             
        port              : "ftp";                                              
        reconnectionPolicy : BoundedRetry;                                      
        reconnectionBound  : 5u;                                                
    }                                                                           
    // client source with an IP address and port                                
    stream<Beat> Beat5 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role    : client;                                                       
        address : "99.2.45.67";                                                 
        port    : "ftp";                                                        
    }                                                                           
    // client source with an host name as the address                            
    stream<Beat> Beat6 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role    : client;                                                       
        address : "mynode.mydomain";                                            
        port    : 23145u;                                                       
    }                                                                           
    // client source with name                                                  
    stream<Beat> Beat7 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role  : client;                                                          
        name : "my_server";                                                     
    }                                                                           
    // client source with reconnection                                          
    stream<Beat> Beat8 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role              : client;                                             
        address           : "mynode.mydomain";                                  
        port              : "ftp";                                              
        reconnectionPolicy : InfiniteRetry;                                     
    }                                                                           
    // client source with reconnection interval (and 10 connections)            
    // Wait 5 seconds before starting                                           
    stream<Beat> Beat9=  TCPSource()                                            
    {                                                                           
      param                                                                     
        role                : client;                                           
        address             : "mynode.mydomain";                                
        port                : "ftp";                                            
        reconnectionPolicy   : BoundedRetry;                                    
        reconnectionBound    : 10u;                                             
        initDelay           : 5.0;                                              
    }                                                                           
}                                                                                  

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 29 parameters.

Required: role

Optional: address, blockSize, compression, confirmWireFormat, defaultTuple, encoding, eolMarker, format, hasDelayField, ignoreExtraCSVValues, initDelay, interface, keepAlive, name, parsing, password, port, readPunctuations, receiveBufferSize, reconnectionBound, reconnectionPolicy, separator, sslAuthenticateClient, sslCertificateAuthorityFile, sslCertificateFile, sslCipherSuites, sslConfigName, sslProtocol

Metrics
This operator reports 4 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
OutputFunctions
rstring RemoteIP()

Returns the address of the remote end of the TCP connection. If the name cannot be resolved, this address is a host name or a dotted IP address.

uint32 RemotePort()

Returns the port number of the remote end of the TCP connection.

uint32 LocalPort()

Returns the port number of the local end of the TCP connection.

uint32 ServerPort()

Returns the port number that the TCP server is listening on. This custom output function is valid only when the role parameter value is server.

int64 TupleNumber()

Returns the number of tuples that are generated by this operator since the start of the current TCP connection. The first tuple that is generated has number 0.

<any T> T AsIs(T)

Return the input value

Ports (0)

The TCPSource operator is configurable with a single output port, which produces tuples read from the TCP connection. The TCPSource operator outputs a window marker punctuation when a TCP connection terminates unless readPunctuations is true.

Properties

Parameters

This operator supports 29 parameters.

Required: role

Optional: address, blockSize, compression, confirmWireFormat, defaultTuple, encoding, eolMarker, format, hasDelayField, ignoreExtraCSVValues, initDelay, interface, keepAlive, name, parsing, password, port, readPunctuations, receiveBufferSize, reconnectionBound, reconnectionPolicy, separator, sslAuthenticateClient, sslCertificateAuthorityFile, sslCertificateFile, sslCipherSuites, sslConfigName, sslProtocol

address

In the case of a client-based TCPSource operator, this parameter specifies the destination server address of the TCP connection.

The address parameter must be specified when the role parameter value is client and the name parameter is not specified. In all other cases, it cannot be specified.

The parameter value can be a host name or an IP address. The address might not be used for a server-based TCPSource operator, as the address used is always on the current host.

Properties

blockSize

Specifies the block size for the block format. For more information, see the blockSize parameter in the FileSource operator.

Properties

compression

Specifies the compression mode. For more information, see the compression parameter in the FileSource operator.

Properties

confirmWireFormat

Specifies whether an exchange of the wire format description is required to confirm that the client and server are passing compatible data. If this parameter is not specified, the default value is false. The wire format is defined in etc/xsd/SPL/wireFormatModel.xsd.

Properties

defaultTuple

Specifies the default tuple to use for missing fields. For more information, see the defaultTuple parameter in the FileSource operator.

Properties

encoding

Specifies the character set encoding of the input. For more information, see the encoding parameter in the FileSource operator.

Properties

eolMarker

Specifies the end of line marker. For more information, see the eolMarker parameter in the FileSource operator.

Properties

format

Specifies the format of the data. For more information, see the format parameter in the FileSource operator.

Properties

hasDelayField

Specifies whether the format contains inter-arrival delays as the first field. For more information, see the hasDelayField parameter in the FileSource operator.

Properties

ignoreExtraCSVValues

Specifies whether to skip any extra fields before the end of line when reading in CSV format. For more information, see the ignoreExtraCSValues parameter in the FileSource operator.

Properties

initDelay

Specifies the number of seconds to delay before starting to produce tuples. For more information, see the initDelay parameter in the FileSource operator.

Properties

interface

Specifies the network interface to use to register when the name parameter is specified. This parameter is valid only when the role parameter value is server and the name parameter is specified.

Using this parameter with the name parameter ensures that a matching operator with a role parameter value of client and the same name parameter uses the wanted interface.

Properties

keepAlive

Specifies to use the keepalive timer to check if the connection is alive. This parameter applies only to TCPSource operators for which the role parameter value is server. The parameter prevents a server from hanging on a connection when it drops and it keeps a connection live when there is no normal activity.

The value of the parameter is a tuple literal that specifies the configurable attributes. If any of the configurable attribute values for a specific keepAlive parameter are set to zero, the default system value of the keepAlive parameter is used. This parameter has the following configurable attributes:
  • int32 time: The number of seconds that the connection sits idle before a keepalive probe is sent.
  • int32 probes: The maximum number of probes to send to establish the state of the connection.
  • int32 interval: The time interval in seconds between each probe.
Properties

name

In the case of a server-based TCPSource operator, this parameter specifies the name that is used to register the address and port pair for the server with the name service that is part of the Streams instance. This name can be used by a corresponding client-based TCPSink operator to connect to this operator by just specifying the name.

These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set by using config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators keep trying periodically to register the name, with an error message for each failure.

In the case of a client-based TCPSource, this parameter specifies the name to be used to look up the address and port pair for the destination server from the name service that is part of the Streams instance.

You can use the streamtool getnsentry command to query server-based TCPSource addresses. The Value field contains host:port.

When the name parameter is specified in the client-mode, the port and address parameters cannot be specified.

Properties

parsing

Specifies the parsing behavior of the TCPSource operator. There are three valid values: strict, permissive, and fast.

When the parameter value is strict, incorrectly formatted tuples result in a runtime error and termination of the operator.

When the parameter value is permissive, incorrectly formatted tuples result in the creation of a runtime log entry and the parser makes an effort to skip to the next tuple (formats txt and csv) and continue. If the format is bin, the parser closes the current connection and starts reading the next connection (if the reconnectionPolicy permits). The permissive parameter value can be used with only txt, csv, and bin formats.

When the parameter value is fast, the input is assumed to be formatted correctly and no runtime checks are performed. Incorrect input in fast mode causes undefined behavior.

The default parsing mode is strict.

Properties

password

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the password or passphrase required for an encrypted private key in sslCertificateFile. It is strongly recommended that this be supplied in the application configuration given by sslConfigName instead of a hard-coded parameter to prevent the password from being stored in clear-text in the SAB.

Properties

port

In the case of a server-based TCPSource operator, this parameter specifies the port address on which the connections are accepted. In the case of a client-based TCPSource operator, it specifies the destination server port address.

The parameter value might be a well-known port alias, such as http or ftp, as specified in etc/services. It can also be a plain port number, such as 45134u.

It is an optional parameter for server-based TCPSource operators. Its default value is 0, which picks any available port.

For client-based TCPSource operators, the port parameter must be specified when the name parameter is not specified and it cannot be specified otherwise.

Properties

readPunctuations

Specifies whether to read punctuations from 'bin' format input. For more information, see the readPunctuations parameter in the FileSource operator.

Properties

receiveBufferSize

Specifies the kernel receive buffer size for the TCP socket.

Properties

reconnectionBound

Specifies the number of successive connections that are attempted for a client-based TCPSource operator or accepted for a server-based TCPSource operator. This parameter must be specified when the reconnectionPolicy parameter value is BoundedRetry and cannot be used otherwise.

Properties

reconnectionPolicy

Specifies the reconnection policy. In the case of a server-based TCPSource operator, this parameter specifies whether more connections are allowed when the initial connection terminates. In the case of a client-based TCPSource operator, this parameter specifies whether more connection attempts are made when the initial connection to the server terminates.

The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. If this parameter is not specified, it is set to InfiniteRetry.

When this parameter value is NoRetry, the TCPSource operator produces a final marker punctuation right away after the initial connection is terminated and a window marker punctuation is sent.

Properties

role

Specifies whether the TCPSource operator is server-based or client-based. It takes one of the following two values: server and client.

Properties

separator

Specifies the separator character for the csv format. For more information, see the separator parameter in the FileSource operator.

Properties

sslAuthenticateClient

If SSL / TLS is being used (sslConfigName or sslProtocol is set) and the role is server, this controls whether or not the server requests a client certificate, and if so whether or not the client is required to send one. When the role is client this parameter is ignored and the client always validates the server certificate. The default value if the parameter is not provided is request. If the value is none the server will not request a client certificate. This is equivaled to using setWantClientAuth(false) and setNeedClientAuth(false) with the Java SSL API, or SSL_VERIFY_NONE with the OpenSSL API. If the value is request, the server will request a client certificate and send the list of certificate authorities from sslCertificateAuthorityFile as trusted CAs. The client is not required to send a certificate, but if it does, it will be validated and failure will cause the connection to be rejected. This behavior is equivalent to using setWantClientAuth(true) but setNeedClientAuth(false) with the Java SSL API, or using SSL_VERIFY_PEER with the OpenSSL API. If the value is require, the server will request a client certificate and send the list of certificate authorities from sslCertificateAuthorityFile as trusted CAs. The client must send a certificate that the server can successfully validate or the connection will be rejected. This behavior is equivalent to using  setNeedClientAuth(true) with the Java SSL API, or using both SSL_VERIFY_PEER and SSL_VERIFY_FAIL_IF_NO_PEER_CERT with the OpenSSL API.

Properties

sslCertificateAuthorityFile

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the path to a file containing trusted Certificate Authority certificates in PEM format. Relative paths are relative to the data directory. When validating certificates these will be checked before checking the default trusted certificat authorities. If you have a Java truststore named trust.jks a file in the correct format for TCPSource named ca_certs.pem can be created by running the command: keytool -list -rfc -keystore trust.jks > ca_certs.pem

Properties

sslCertificateFile

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the path to a PEM format file that includes a certificate chain and private key. Relative paths are relative to the data directory. If the private key is encrypted (recommended), the password must be given either by the an application configuration named by sslConfigName and containing a property password (recommended), or directly with the password parameter. Using sslConfigName is recommended, since it prevents the password from being included in the SAB. If you have a Java keystore named keys.jks with an entry alias mycert a file in the correct format for TCPSource named cert.pem can be created by running the command: keytool -list -rfc -keystore keys.jks -alias mycert > cert.pem If the key was encrypted in the Java keystore it will also be encrypted in cert.pem and a password will be required. When the role is server the parameter is required and the certificate will be offered to clients. If the role is client the certificate will be given to the server if a client certificate is requested during negotiation.

Properties

sslCipherSuites

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the list of cipher suites in OpenSSL format that will be available during negotiation.

Properties

sslConfigName

Instructs the TCPSource operator to configure SSL / TLS with the properties from the application configuration named. The key in the application configuration can be any of the other remaining parameters related to SSL / TLS. Values from the application configuration will supercede the equivalent parameter. If this parameter or sslProtocol is given SSL / TLS will be available even if the sslProtocol is none at runtime. Since this may have performance and memory overhead, if you are certain you do not require SSL / TLS then leave this and sslProtocol unset. Using the password parameter is less secure than using an application configuration with the password as a key because the parameter may be stored as clear text in the SAB file. Using the application configuration is strongly recommended. This parameter may be supplied at runtime to allow the application to use different configurations.

Properties

sslProtocol

Instructs the TCPSource operator which, if any, SSL / TLS protocol to use at a minimum. The currently recognized values are: none, SSLv3, TLSv1, TLSv1.1, TLSv1.2. If this parameter or sslConfigName is given SSL / TLS will be available even if the sslProtocol is none at runtime. Since this may have performance and memory overhead, if you are certain you do not require SSL / TLS then leave this and sslConfigName unset. If the value is none, SSL / TLS will not be used. For other values it represents the minimum allowed SSL / TLS protocol version. The actual protocol used will be the highest version mutually supported by both ends of the connection if one can be negotiated, otherwise the connection will fail.

Properties

Code Templates

TCPSource (client)

stream<${schema}> ${outputStream} = TCPSource() {
            param
                role: client;
                address : "${hostOrIp}";
                port : "${sourcePort}";
        }
      

TCPSource (server)

stream<${schema}> ${outputStream} = TCPSource() {
            param
                role: server;
                port : "${sourcePort}";
        }
      

TCPSource with Name (client or server)

stream<${schema}> ${outputStream} = TCPSource() {
            param
                role: ${serverOrClient};
                name : "${name}";
        }
      

Metrics

nConfirmWireFormatFailures - Counter

The number of times the input connection wire format handshake failed.

nConnections - Gauge

The number of currently active TCP/IP connections. If the TCPSource operator is waiting for a connection or a reconnection, the value is 0. If the operator is connected, the value is 1.

nInvalidTuples - Counter

The number of tuples that failed to read correctly in csv or txt format.

nReconnections - Counter

The number of times the input connection was re-establised.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime, streams_boost_iostreams, streams_boost_system
Include Path: ../../../impl/include