Operator FileSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$FileSink.svg

The FileSink operator writes tuples to a file.

Checkpointed data

When the FileSink operator is checkpointed in a consistent region, the file name which the operator is writing to is saved in the checkpoint. When the truncateOnReset parameter is set, the current position of the file stream is also saved in the checkpoint. Logic state varialbes (if present) are included in the checkpoint. When the FileSink operator is checkpointed in an autonomous region, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The FileSink operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. On a drain, the operator flushes tuples to the output file and synchronizes the file with the storage device.

When the closeMode is never and the file expression does not contain attributes from the input stream, the file is created on operator startup, and the filename is stored in the checkpoint. If the operator is reset before the first checkpoint, then when the operator is reset, a new file may be opened if the file name was not deterministic because modifiers were present in the filename. If the operator is reset after the first checkpoint, the original file is re-opened.

When the closeMode is never and the file expression uses attributes from the input stream, the file is not created until the first tuple arrives at the operator. If the operator is reset before the first checkpoint, then a new file may be opened if the computed filename expression (including modifiers) is different than the original filename. If the operator is reset after the filename has been saved to a checkpoint, the original file is reopend.

On a drain, the operator flushes tuples to the output file. Logic state variables (if present) are automatically checkpointed and reset, as needed to maintain consistency.

When the truncateOnReset parameter is set, the operator also checkpoints the current position of the file stream. On reset, the file is truncated to the checkpointed position. If truncateOnReset is not set, replayed tuples are written to the end of the reopened file.

When the closeMode parameter is set, the operator closes the file according to the specified policy and when there is a drain. When the moveFileToDirectory is set, the move of the file is made immediately after closing the file. This behavior means that a file can be rewritten and moved again on a replay. If file names are deterministic, previously moved files are overwritten. The origin and destination directories are not synchronized on a drain.

Checkpointing behavior in an autonomous region

When the FileSink operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its internal state to its initial state, and restores logic state variables (if present) from the last checkpoint.

When the FileSink operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The FileSink operator throws an exception in the following case:
  • The file output file cannot be opened for writing.

Examples

These examples uses the FileSink operator.


composite Main {                                                                           
  graph                                                                                    
    stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {}                     
    // sink operator with the hasDelayField option, and fields separated by ":             
    // rstrings will not be printed with double quotes                                     
    () as Sink1 = FileSink(Beat)                                                           
    {                                                                                      
      param                                                                                
        file   : "/tmp/People.dat";                                                        
        format : `csv`;                                                                      
        separator : ":";                                                                   
        hasDelayField : true;                                                              
        quoteStrings: false;                                                               
    }                                                                                      
    // sink operator with a txt format specifier and compression                           
    () as Sink2 = FileSink(Beat)                                                           
    {                                                                                      
      param                                                                                
        file        : "People.dat";                                                        
        format      : txt;                                                                 
        compression : zlib;                                                                
    }                                                                                      
    // sink operator with a bin format specifier and flush option                          
    () as Sink3 = FileSink(Beat)                                                           
    {                                                                                      
      param                                                                                
        file   : "People.dat";                                                             
        format : bin;                                                                      
        flush  : 1u;                                                                       
    }                                                                                      
    // sink operator with a writePunctuations option and no flushing on punctuation        
    () as Sink4 = FileSink(Beat)                                                           
    {                                                                                         
      param                                                                                
        file              : "People.dat";                                                  
        writePunctuations : true;                                                          
        flushOnPunctuation: false;                                                         
    }                                                                                      
}                                                                                           

The following example creates a series of files that have the current time embedded in the filename:


() as Sink5 = FileSink(Beat)                                                                      
{                                                                                                 
  param                                                                                           
    file        : "F_{localtime:m%d_1/15/13M%S}_{id}.out";                                         
    closeMode   : punct;                                                                          
}                                                                                                  

The following example appends multiple tuples with the same Beat.name value to the same file:


() as Sink6 = FileSink(Beat)                                                                
{                                                                                           
  param                                                                                     
    file      : Beat.name + ".out"; // Use the name field to generate the filename.         
    closeMode : dynamic;                                                                    
    append    : true;   
}                                                                                            

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 21 parameters.

Required: file

Optional: append, bytesPerFile, closeMode, compression, encoding, eolMarker, flush, flushOnPunctuation, format, hasDelayField, moveFileToDirectory, quoteStrings, separator, suppress, timePerFile, truncateOnReset, tuplesPerFile, writeFailureAction, writePunctuations, writeStateHandlerCallbacks

Metrics
This operator reports 2 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The FileSink operator is configurable with a single input port, which ingests tuples to be written to a file.

Properties

Output Ports

Assignments
This operator does not allow assignments to output attributes.
Ports (0)

The FileSink operator is configurable with an optional output stream of type stream<rstring>, which has the file name that was closed. If the file is moved, the destination file name is generated as the output stream.

Properties

Parameters

This operator supports 21 parameters.

Required: file

Optional: append, bytesPerFile, closeMode, compression, encoding, eolMarker, flush, flushOnPunctuation, format, hasDelayField, moveFileToDirectory, quoteStrings, separator, suppress, timePerFile, truncateOnReset, tuplesPerFile, writeFailureAction, writePunctuations, writeStateHandlerCallbacks

append

Specifies that the generated tuples are appended to the output file. If this parameter is false or not specified, the output file is truncated before the tuples are generated.

The append parameter is not supported when FileSink is in a consistent region.

Properties

bytesPerFile

Specifies the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. This parameter must be specified when the closeMode parameter is set to size.

Properties

closeMode

Specifies when the file closes and a new one opens. This parameter has type enum {punct, count, size, time, dynamic, never}. The default value is never. For any other value except dynamic, when the specified condition is satisfied, the current output file is closed and a new file is opened for writing.

The parameter value:
  • punct specifies to close the file when a window or final punctuation is received.
  • count is used with the tuplesPerFile parameter to close the file when the specified number of tuples have been received.
  • size is used with the bytesPerFile parameter to close the file when the specified number of bytes have been received.
  • time is used with the timePerFile parameter to close the file when the specified time has elapsed.

If this parameter value is dynamic, the file parameter can reference input attributes and is evaluated at each input tuple to compute the file name. If the file name is different from the previous value, the output file closes, and a new file opens.

In all cases, the file parameter can contain modifiers that are used to generate the file name to be used. The supported modifiers are:
  • {id}: Each {id} in the file name is replaced with the file number created by the FileSink operator. It has value 0 for the first file, 1 for the second file, and so on.
  • {localtime:strftimeString}: The contents are replaced by the current local time, formatted as if by the strftime library call.
  • {gmtime:strftimeString}: The contents are replaced by the current time in the GMT timezone. They are formatted as if by the strftime library call.

The modifiers can be repeated in the string, and are all replaced with their values. If closeMode is dynamic, the file names are compared after the modifiers are substituted.

Using param append: true is often useful for dynamic file names.

The dynamic and time parameter values are not supported in a consistent region.

Properties

compression

Specifies the compression mode. For more information, see the compression parameter in the FileSource operator.

This parameter is not supported when FileSink is in a consistent region.

Properties

encoding

Specifies the character set encoding that is used in the output file. Data that is written to the output file is converted from the UTF-8 character set to the specified character set before any compression is performed. The encoding parameter is not valid with formats bin or block.

Properties

eolMarker

Specifies the end of line marker. For more information, see the eolMarker parameter in the FileSource operator.

Properties

file

Specifies the name of the output file. See the corresponding parameter in the FileSource operator for details. Only the last component of the path name is created if it does not exist. All directories in the path name up to the last component must exist. For example, in file: "/a/b/c", /a and /a/b must exist and be directories. The file is created as an empty file, discarding any previous contents. The user ID and the umask of the instance owner are used. The tuples that are written to the file are flushed to disk according to the flush and flushOnPunctuation parameters.

The filename may be an SPL expression containing input attributes. If the value of the closeMode parameter dynamic the file is not opened until the first tuple arrives, and the file name expression is evaluated on every tuple. When the resulting file name changes, the file is closed and a new file is opened with the new name.

If the closeMode is not dynamic and the file expression references input attributes, the file is created when the first tuple arrives. The expression is evaluated using that tuple to give the filename. The expression is not re-evaluated until the file has been closed and a new file needs to be opened. If the file expression does not reference input attributes, the file is created on operator startup.

In all the cases where an expression is allowed, the file expression is examined for modifiers, which are expanded when the expression is evaluated. See the supported modifiers of the closeMode parameter. Any punctuations that are received before the file is opened or between the file closing and the next tuple are logged and dropped because there is no open file to write them to. An example of a file expression is:


param file : "{localtime:%m.%d}_" + Input.CompanyName + ".txt"

This example generates a file name where %m is the current month number, %d is the current day of the month, and with an underscore, the company name, and then .txt.

Properties

flush

Specifies the number of tuples after which to flush the output file. By default no flushing on tuple numbers is performed.

Note: If an application expects low volumes of data, use the flush parameter to ensure that the output file is written to disk.

Properties

flushOnPunctuation

Specifies to flush the output file when a window or final punctuation is received. This parameter defaults to true.

Properties

format

Specifies the format of the data. For more information, see the format parameter in the FileSource operator.

Properties

hasDelayField

Specifies whether to output an extra attribute per tuple, which specifies the inter-arrival delays between the input tuples. For more information, see the hasDelayField parameter in the FileSource operator.

Properties

moveFileToDirectory

Specifies that the file is moved to the named directory after the file is closed. Any existing file with same name is removed before the file is moved to the moveFileToDirectory directory.

If the target directory is on a different file system, a .rename subdirectory might be created in the target directory. This parameter is used to ensure that the files appear atomically at the target directory.

Properties

quoteStrings

Controls the quoting of top-level rstrings. This parameter can be used only with the csv format. This parameter value is true by default.

If this parameter value is true, rstrings in the tuple are generated with a leading and trailing double quotation mark ("), and control characters are escaped. If this parameter value is false, rstrings in the tuple are written as is.

Properties

separator

Specifies the separator character for the csv format. For more information, see the separator parameter in the FileSource operator.

Properties

suppress

Specifies input attributes to be omitted from the output file. This parameter accepts one or more input attributes as values. Those named attributes are not output to the file. For example, you can use this parameter to omit file name inputs from the output file.

Properties

timePerFile

Specifies the approximate time, in seconds, after which the current output file is closed and a new file is opened. If the closeMode parameter is set to time, this parameter must be specified.

Properties

truncateOnReset

Specifies to truncate the file when a consistent region is reset. The file is truncated to the position the file was in when the last successful drain was processed. If the file is not truncated, replayed tuples are appended at the end of the file.

This parameter is valid only when the operator is in a consistent region. By default, the value of this parameter is true.

This parameter is valid only when the closeMode parameter value is never.

Properties

tuplesPerFile

Specifies the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. This parameter must be specified when the closeMode parameter is set to count.

Properties

writeFailureAction

Specifies the action to take when file write fails. This parameter has values of ignore, log, and terminate. If this parameter is not specified, ignore is assumed.

If the parameter value is ignore, no action is taken on a write failure, and all future writes fail as well.

If the parameter value is log, the error is logged, and the error condition is cleared. If the underlying cause is not corrected, further writes might again cause failures. Even if the underlying cause is corrected, there are gaps in the file due to the failed writes.

If the parameter value is terminate, the error is logged, and the operator terminates.

When a file is closed, the error condition is reset. For more information, see the closeMode parameter. If the underlying problem has been corrected, future writes should succeed.

Properties

writePunctuations

Specifies to write punctuations to the output file. It is false by default. This parameter can be used only with txt, csv, and bin formats.

If the format is bin, then a byte is used to represent tuple or window punctuation or final punctuation. For more information about the bin format, see the FileSource operator.

Properties

writeStateHandlerCallbacks

Specifies to write to the output file a commented out line that contains the name of the invoked StateHandler callbacks. This parameter is valid only when the file is in csv format.

Properties

Code Templates

FileSink

() as ${sinkPrefix}Sink = FileSink(${inputStream})   {
            param
                file : "${filename}";
        }
      

Metrics

nFilesOpened - Counter

The number of files opened by the FileSink operator.

nTupleWriteErrors - Counter

The number of tuple writes that had an error on the file stream after the write completed. Due to buffering, write failures might not be detected immediately. You can use param flush : 1u; to ensure quicker detection, but with a (possibly large) performance penalty. After a failure is detected, all future writes fail unless the error condition is cleared or the file is closed.

For more information, see the writeFailureAction parameter.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime, streams_boost_iostreams, streams_boost_filesystem, streams_boost_system
Include Path: ../../../impl/include