Operator TCPSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$TCPSink.svg

The TCPSink operator writes data to a TCP socket in the form of tuples. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes, it handles a single connection at a time.

Checkpointed data

When the TCPSink operator is checkpointed, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The TCPSink operator is not supported in a consistent region.

Checkpointing behavior in an autonomous region

When the TCPSink operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the TCPSink operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The TCPSink operator throws an exception in the following cases:
  • The host cannot be resolved.
  • The name cannot be located.

Examples

These examples use the TCPSink operator.

The following example sets the closeMode parameter to dynamic to send tuples to different TCP servers. The suppress parameter is used to omit the host and port tuple attributes from the output sent to the TCP server.


// Function to compute desired port for each tuple                                                       
uint32 getPort (int32 count) {                                                                           
  return (count % 3) == 0 ? 8888u : 8889u;                                                               
}                                                                                                        
                                                                                                         
composite Main {                                                                                         
  graph                                                                                                  
    stream<rstring host, uint32 port, uint32 value> A = Beacon() {                                       
      param iterations : 100;                                                                            
            initDelay  : 4.0;                                                                            
      output A         : host = "localhost", port = getPort ((int32) IterationCount()),                  
                         value = (uint32) IterationCount();                                              
    }                                                                                                    
    () as TCPSink1 = TCPSink(A) {                                                                        
      param                                                                                              
        closeMode        : dynamic;                                                                      
        suppress         : host, port; // Do not send host and port information on TCP/IP connection     
        address          : host;                                                                         
        port             : port;                                                                         
        role             : client;                                                                       
        retryFailedSends : true;                                                                         
    }                                                                                                    
}                                                                                                         

The following example uses various common parameters in the TCPSink operator:


composite Main {                                                                     
  graph                                                                              
    stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {}               
    // server sink with an alias string as port                                      
    () as Beat1 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role : server;                                                               
        port : "ftp";                                                                
    }                                                                                
    // server sink with a number string as port                                      
    () as Beat2 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role : server;                                                               
        port : 23145u;                                                               
    }                                                                                
    // server sink with a name                                                       
    () as Beat3 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role  : server;                                                              
        name  : "my_server";                                                         
    }                                                                                
    // server sink with a name and port                                              
    () as Beat4 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role  : server;                                                              
        port  : 23145u;                                                              
        name  : "my_server";                                                         
    }                                                                                
    // server sink with a port and infinite reconnection                             
    () as Beat5 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role               : server;                                                 
        port               : "ftp";                                                  
        reconnectionPolicy : InfiniteRetry;                                          
    }                                                                                
    // client sink with an IP address and port                                       
    () as Beat6 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role    : client;                                                            
        address : "99.2.45.67";                                                      
        port    : "ftp";                                                             
    }                                                                                
    // client sink with an host name as the address                                  
    () as Beat7 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role    : client;                                                            
        address : "mynode.mydomain";                                                 
        port    : 23145u;                                                            
    }                                                                                
    // client sink with name                                                         
    () as Beat8 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role  : client;                                                              
        name  : "my_server";                                                         
    }                                                                                
    // client sink with reconnection (25 connections)                                
    () as Beat9 = TCPSink(Beat)                                                      
    {                                                                                
      param                                                                          
        role               : client;                                                 
        address            : "mynode.mydomain";                                      
        port               : "ftp";                                                  
        reconnectionPolicy : BoundedRetry;                                           
        reconnectionBound  : 25u;                                                    
    }                                                                                
}                                                                                     

Summary

Ports
This operator has 1 input port and 0 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 30 parameters.

Required: role

Optional: address, closeMode, compression, confirmWireFormat, detectDisconnectsBeforeSends, encoding, eolMarker, flush, flushOnPunctuation, format, hasDelayField, interface, name, password, port, quoteStrings, reconnectionBound, reconnectionPolicy, retryFailedSends, sendBufferSize, separator, sslAuthenticateClient, sslCertificateAuthorityFile, sslCertificateFile, sslCipherSuites, sslConfigName, sslProtocol, suppress, writePunctuations

Metrics
This operator reports 3 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The TCPSink operator is configurable with a single input port, which ingests tuples to be written to the TCP connection.

Properties

Parameters

This operator supports 30 parameters.

Required: role

Optional: address, closeMode, compression, confirmWireFormat, detectDisconnectsBeforeSends, encoding, eolMarker, flush, flushOnPunctuation, format, hasDelayField, interface, name, password, port, quoteStrings, reconnectionBound, reconnectionPolicy, retryFailedSends, sendBufferSize, separator, sslAuthenticateClient, sslCertificateAuthorityFile, sslCertificateFile, sslCipherSuites, sslConfigName, sslProtocol, suppress, writePunctuations

address

Specifies the destination address of the TCP connection (client-mode). For more information, see the address parameter in the TCPSource operator.

Properties

closeMode

Specifies when a client connection is closed and a new one opened. This parameter has type enum {dynamic, never}. The default value is never.

If the parameter value is dynamic:
  • The retryFailedSends parameter value must be true.
  • The role parameter value must be client.
  • The address and port parameters must be provided. The address and port parameter values might reference attributes from the input tuple. Each time a tuple is received, the address and port parameter values are compared to the previous values. If either changes, then the TCP connection to the current server is closed and a new TCP connection is opened to the new address and port parameter values. The tuple is sent to the new connection.

If the closeMode parameter value is never, the address and port parameter values must not reference input attributes from the input tuple.

Properties

compression

Specifies the compression mode. For more informaition, see the compression parameter in the FileSink operator.

Properties

confirmWireFormat

Specifies whether an exchange of the wire format description is required to confirm that the client and server are passing compatible data. If this paramaeter is not specified, the default value is false. The wire format is defined in etc/xsd/SPL/wireFormatModel.xsd.

Properties

detectDisconnectsBeforeSends

Instructs the TCPSink operator to check at each tuple or punctuation to see whether the TCP/IP connection is disconnected. If the role parameter value is server and another client is already connected, it switches immediately to the new client connection.

Properties

encoding

Specifies the character set encoding of the output. For more information, see the encoding parameter in the FileSink operator.

Properties

eolMarker

Specifies the end of line marker. For more information, see the eolMarker parameter in the FileSink operator.

Properties

flush

Specifies the number of tuples after which to flush the output. For more information, see the flush parameter in the FileSink operator.

Properties

flushOnPunctuation

Specifies to flush the output when a punctuation is received. For more information, see the flushOnPunctuation parameter in the FileSink operator.

Properties

format

Specifies the format of the data. For more information, see the format parameter in the FileSink operator.

Properties

hasDelayField

Specifies that the format contains inter-arrival delays as the first field. For more information, see the hasDelayField parameter in the FileSink operator.

Properties

interface

Specifies the network interface to use to register when the name parameter is specified. This parameter is valid only when the role parameter value is server and the name parameter is specified. Using the interface parameter with the name parameter ensures that a matching operator with a role parameter value of client and the same name parameter uses the wanted interface.

Properties

name

In the case of a server-based TCPSink operator, this parameter specifies the name that is used to register the address and port pair for the server with the name service that is part of the Streams instance. This name can be used by a corresponding client-based TCPSource operator to connect to this operator by just specifying the name, without the need for an address or port number.

These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set by using config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators keep trying periodically to register the name, with an error message for each failure.

In the case of a client-based TCPSink, this parameter specifies the name that is used to look up the address and port pair for the destination server from the name service that is part of the Streams instance.

When the name parameter is specified in the client-mode, then the port and address parameters cannot be specified.

Properties

password

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the password or passphrase required for an encrypted private key in sslCertificateFile. It is strongly recommended that this be supplied in the application configuration given by sslConfigName instead of a hard-coded parameter to prevent the password from being stored in clear-text in the SAB.

Properties

port

Specifies the listening port address (server-mode) or the destination port address (client-mode). For more information, see the port parameter in the TCPSource operator.

Properties

quoteStrings

Specifies whether to generate quoted strings for csv format. For more information, see the quoteStrings parameter in the FileSink operator.

Properties

reconnectionBound

Specifies the number of successive connections that are accepted (server-mode) or attempted (client-mode). For more information, see the reconnectionBound parameter in the TCPSource operator.

Properties

reconnectionPolicy

In server mode, it specifies whether additional connections are allowed once the initial connection terminates. In client mode, it specifies whether additional connection attempts are made once the initial connection to the server terminates. For more information, see the reconnectionPolicy parameter in the TCPSource operator.

Properties

retryFailedSends

Specifies whether to retry until a connection is established and the send succeeds.

Properties

role

Specifies whether the operator acts as a TCP server or client. For more information, see the role parameter in the TCPSource operator.

Properties

sendBufferSize

Specifies the kernel send buffer size for the TCP socket.

Properties

separator

Specifies the separator character for the csv format. For more information, see the separator parameter in the FileSource operator.

Properties

sslAuthenticateClient

If SSL / TLS is being used (sslConfigName or sslProtocol is set) and the role is server, this controls whether or not the server requests a client certificate, and if so whether or not the client is required to send one. When the role is client this parameter is ignored and the client always validates the server certificate. The default value if the parameter is not provided is request. If the value is none the server will not request a client certificate. This is equivaled to using setWantClientAuth(false) and setNeedClientAuth(false) with the Java SSL API, or SSL_VERIFY_NONE with the OpenSSL API. If the value is request, the server will request a client certificate and send the list of certificate authorities from sslCertificateAuthorityFile as trusted CAs. The client is not required to send a certificate, but if it does, it will be validated and failure will cause the connection to be rejected. This behavior is equivalent to using setWantClientAuth(true) but setNeedClientAuth(false) with the Java SSL API, or using SSL_VERIFY_PEER with the OpenSSL API. If the value is require, the server will request a client certificate and send the list of certificate authorities from sslCertificateAuthorityFile as trusted CAs. The client must send a certificate that the server can successfully validate or the connection will be rejected. This behavior is equivalent to using  setNeedClientAuth(true) with the Java SSL API, or using both SSL_VERIFY_PEER and SSL_VERIFY_FAIL_IF_NO_PEER_CERT with the OpenSSL API.

Properties

sslCertificateAuthorityFile

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the path to a file containing trusted Certificate Authority certificates in PEM format. Relative paths are relative to the data directory. When validating certificates these will be checked before checking the default trusted certificat authorities. If you have a Java truststore named trust.jks a file in the correct format for TCPSink named ca_certs.pem can be created by running the command: keytool -list -rfc -keystore trust.jks > ca_certs.pem

Properties

sslCertificateFile

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the path to a PEM format file that includes a certificate chain and private key. Relative paths are relative to the data directory. If the private key is encrypted (recommended), the password must be given either by the an application configuration named by sslConfigName and containing a property password (recommended), or directly with the password parameter. Using sslConfigName is recommended, since it prevents the password from being included in the SAB. If you have a Java keystore named keys.jks with an entry alias mycert a file in the correct format for TCPSink named cert.pem can be created by running the command: keytool -list -rfc -keystore keys.jks -alias mycert > cert.pem If the key was encrypted in the Java keystore it will also be encrypted in cert.pem and a password will be required. When the role is server the parameter is required and the certificate will be offered to clients. If the role is client the certificate will be given to the server if a client certificate is requested during negotiation.

Properties

sslCipherSuites

If SSL / TLS is being used (sslConfigName or sslProtocol is set), this is the list of cipher suites in OpenSSL format that will be available during negotiation.

Properties

sslConfigName

Instructs the TCPSink operator to configure SSL / TLS with the properties from the application configuration named. The key in the application configuration can be any of the other remaining parameters related to SSL / TLS. Values from the application configuration will supercede the equivalent parameter. If this parameter or sslProtocol is given SSL / TLS will be available even if the sslProtocol is none at runtime. Since this may have performance and memory overhead, if you are certain you do not require SSL / TLS then leave this and sslProtocol unset. Using the password parameter is less secure than using an application configuration with the password as a key because the parameter may be stored as clear text in the SAB file. Using the application configuration is strongly recommended. This parameter may be supplied at runtime to allow the application to use different configurations.

Properties

sslProtocol

Instructs the TCPSink operator which, if any, SSL / TLS protocol to use at a minimum. The currently recognized values are: none, SSLv3, TLSv1, TLSv1.1, TLSv1.2. If this parameter or sslConfigName is given SSL / TLS will be available even if the sslProtocol is none at runtime. Since this may have performance and memory overhead, if you are certain you do not require SSL / TLS then leave this and sslConfigName unset. If the value is none, SSL / TLS will not be used. For other values it represents the minimum allowed SSL / TLS protocol version. The actual protocol used will be the highest version mutually supported by both ends of the connection if one can be negotiated, otherwise the connection will fail.

Properties

suppress

Specifies a list of input attributes that are not output to the TCP connection.

Properties

writePunctuations

Specifies whether to write punctuations to output. For more information, see the writePunctuations parameter in the FileSink operator.

Properties

Code Templates

TCPSink (client)

() as ${sinkPrefix}Sink = TCPSink(${inputStream}) {
            param
                role: client;
                address : "${hostOrIp}";
                port : "${sinkPort}";
        }
      

TCPSink (server)

() as ${sinkPrefix}Sink = TCPSink(${inputStream}) {
            param
                role: server;
                port : "${sinkPort}";
        }
      

TCPSink with Name (client or server)

() as ${sinkPrefix}Sink = TCPSink(${inputStream}) {
            param
                role: ${serverOrClient};
                name : "${name}";
        }
      

Metrics

nConfirmWireFormatFailures - Counter

The number of times the output connection wire format handshake failed.

nConnections - Gauge

The number of current TCP/IP connections. If the TCPSink operator is waiting for a connection or a reconnection, the value is 0. If the operator is currently connected, the value is 1.

nReconnections - Counter

The number times the TCP connection was re-established.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime, streams_boost_iostreams, streams_boost_system
Include Path: ../../../impl/include