Operator DirectoryScan

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$DirectoryScan.svg

The DirectoryScan operator watches a directory, and generates file names on the output, one for each file that is found in the directory. The absolute path name of the file is generated. The file name is only generated the first time that the file is seen during a directory scan until it is re-created. The change time (ctime) is used to detect if a file was re-created. Output clause and custom output functions can be used to specify additional information about a file. Subdirectories and all non-regular files that are found in the directory are ignored during the scan.

Note: Because the change time of the file is used to detect if a file was re-created, it is possible that large files are still being written when a directory is being scanned. In this case, the same file name can be generated multiple times, if the time between scans is less than the time to write the file. To avoid this situation, write the file into a different directory on the same file system as the directory that is being scanned, and then rename to the target directory when complete. If the files are on the same file system, /bin/mv does this. If a regular expression pattern is being used to match only certain files, creating the new files under a name that fails to match the pattern, and then renaming, also works.

Before you submit the file name to the output stream, the DirectoryScan operator can optionally move processed files to a different directory by using the moveToDirectory parameter. If the moveToDirectory parameter is specified, the file (or symbolic link) is moved to the moveToDirectory directory before the output tuple is generated.

When moveToDirectory is specified, it is valid to have multiple DirectoryScan operators that are reading the same directory. The DirectoryScan operator ensures that each file is submitted by only one operator by creating a temporary .rename subdirectory in the directory and moveToDirectory directories.

Checkpointed data

When the DirectoryScan operator is checkpointed in a consistent region, the list of files processed (or ignored) and the file change time (ctime) of each file when it was processed, and logic state variables (if present) are saved in checkpoint. When the DirectoryScan operator is checkpointed in an autonomous region, logic state variables (if present) are saved in checkpoint.

Behavior in a consistent region

The DirectoryScan operator can be the start operator or an operator within the reachability graph of a consistent region. When the trigger is operatorDriven, drain processing is triggered after each submitted tuple. The persisted state includes the list of files processed (or ignored), and the file change time (ctime) of each file when it was processed. Logic state variables (if present) are also automatically checkpointed and resetted.

If the sleepTime parameter is specified, the elapsed time since the last scan is not part of the checkpoint state of the operator. If the DirectoryScan operator is reset without a processing element (PE) restart, the time since last scan is not updated. If the reset is due to a PE restart, the next scan is performed when the PE is restarted and the initDelay (if any) is complete.

Tip: Use the DirectoryScan operator as the start of a consistent region when used with a FileSource operator. It is recommended for the DirectoryScan and the FileSource operators to be fused and without threaded ports when in a consistent region.

Checkpointing behavior in an autonomous region

When the DirectoryScan operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its internal state to its initial state, and restores logic state variables (if present) from the last checkpoint.

When the DirectoryScan operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The DirectoryScan operator throws an exception in the following cases:
  • The directory or moveToDirectory does not exist.
  • The directory or moveToDirectory is not a directory.
  • The pattern is not a valid regular expression.
  • The .rename directories cannot be created when moveToDirectory is specified.

Examples

These examples use the DirectoryScan operator.


composite Main {                                                 
  graph                                                          
    // DirectoryScan operator with a relative directory argument 
    stream<rstring name> Dir1 = DirectoryScan()            
    {                                                            
      param                                                      
        directory : "People.dir";                                
        initDelay: 10.0;                                         
    }                                                 
    // DirectoryScan operator with an absolute file argument and a file name pattern         
    stream<rstring name> Dir2 = DirectoryScan()                                        
    {                                                                                        
      param                                                                                  
        directory : "/tmp/work";                                                             
        pattern : "^work.*";                                                                 
    }                                                                                        
    // use a FileSource operator to process the file names                                   
    stream<rstring line> Beat6 = FileSource(Dir2)                                      
    {                                                                                        
      param // note: param file is not specified                                             
        format : line;                                                                         
        deleteFile : true; // delete the file when processing is finished                      
    }                                                                                          
    // Use DirectoryScan operator to move files to a different directory.                      
    // Move the scanned files to the /tmp/active directory. Generate a tuple containing        
    // the original filename in /tmp/work (sourceFile), and the moved filename                 
    // in /tmp/active (movedFile).                                                             
    // Generate the size of the file (fileSize).                                               
    stream<rstring sourceFile, rstring movedFile, uint64 fileSize> Dir3 = DirectoryScan()
    {                                                                                          
      param                                                                                    
        directory : "/tmp/work";                                                               
        moveToDirectory : "/tmp/active";                                                       
        output Dir3 : sourceFile = FilePath(), movedFile = DestinationFilePath(),              
                      fileSize = Size();                                                       
    }                                                                                          
}                                                                                               

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 9 parameters.

Required: directory

Optional: ignoreDotFiles, ignoreExistingFilesAtStartup, initDelay, moveToDirectory, order, pattern, sleepTime, sortBy

Metrics
This operator reports 1 metric.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
DirectoryScanFunctions
<any T> T AsIs(T)

Returns the argument unchanged.

rstring FilePath()

Returns the relative pathname to the file in the source directory.

rstring FullPath()

Returns the absolute pathname to the file in the source directory.

rstring FileName()

Returns the basename of the file.

rstring Directory()

Returns the pathname to the source directory.

rstring DestinationDirectory()

Returns the pathname to the destination directory that contains the file.

rstring DestinationFilePath()

Returns the relative pathname to the file in the destination directory.

rstring DestinationFullPath()

Returns the absolute pathname to the file in the destination directory.

uint64 Size()

Returns the size of the file in bytes.

uint64 Atime()

Returns the access time (atime) of the file in seconds since the epoch. Note: The atime field is set from the original file in the source directory.

uint64 Ctime()

Returns the change time (ctime) of the file in seconds since the epoch. Note: The ctime field is set from the original file in the source directory.

uint64 Mtime()

Returns the modification time (mtime) of the file in seconds since the epoch. Note: The mtime field is set from the original file in the source directory.

Ports (0)

The DirectoryScan operator is configurable with a single output port, which produces tuples that contain the names of the scanned files. The output schema for DirectoryScan operator is a tuple. The generated tuple is populated by using the output clause. If there is no output clause, or an attribute in the tuple is not assigned by using an output clause, then the attribute must be of type rstring.

Properties

Parameters

This operator supports 9 parameters.

Required: directory

Optional: ignoreDotFiles, ignoreExistingFilesAtStartup, initDelay, moveToDirectory, order, pattern, sleepTime, sortBy

directory

Specifies the name of the directory to be scanned. In a consistent region, do not include more than one instance of a DirectoryScan operator that is configured with the same directory parameter.

Properties

ignoreDotFiles

Specifies whether the DirectoryScan operator ignores files with a leading period (.) in the directory. By default, the value is set to false and files with a leading period are processed.

Properties

ignoreExistingFilesAtStartup

Specifies whether the DirectoryScan operator ignores pre-existing files in the directory. By default, the value is set to false and all files are processed as usual. If set to true, any files present in the directory are marked as already processed, and not submitted.

If initDelay is specified, this check is done before the DirectoryScan operator delays.

Properties

initDelay

Specifies the number of seconds that the DirectoryScan operator delays before it starts to produce tuples.

Properties

moveToDirectory

Specifies the name of the directory to which files are moved before the output tuple is generated.

If the moveFileToDirectory parameter is specified for an operator in a consistent region, the output tuple contains the file name before the file is moved. The file is moved after the file name is submitted and a new consistent state is successfully established. Write operators that consume the output tuples from the DirectoryScan operator before they establish the new consistent state. This parameter is not supported in periodic consistent regions.

Properties

order

Controls how the sortBy parameter sorts the files. The valid values are ascending and descending. If the order parameter is not specified, the default value is set to ascending.

If sortBy is set to date, the file with the oldest change time (ctime) is generated first for ascending order. If sortBy is set to name, the file name that is lexically smallest is generated first for ascending order.

Properties

pattern

Instructs the DirectoryScan operator to ignore file names that do not match the regular expression pattern, using the same matching method as regexMatch(rstring, rstring).

Properties

sleepTime

Specifies the minimal time between scans of the directory, in seconds. If this parameter is not specified, the default is 5.0 seconds. If the time difference between the start of the last scan and the current time is less than sleepTime seconds, the DirectoryScan operator sleeps until the time since the last scan is sleepTime seconds. If more than sleepTime seconds already passed, the next scan begins immediately.

Properties

sortBy

Determines the order in which file names are generated during a single scan of the directory when there are multiple valid files at the same time. The valid values are date and name. If the sortBy parameter is not specified, the default sort order is set to date.

Properties

Code Templates

DirectoryScan

stream<rstring name> ${outputStream} = DirectoryScan()   {
            param
                directory: "${directoryToScan}";
        }
      

DirectoryScan with FileSource

stream<rstring name> ${outputStream} = DirectoryScan()   {
            param
                directory: "${directoryToScan}";
        }
        
        stream<${schema}> ${fileSourceStream} = FileSource(${outputStream})   {
            param
                ${cursor}
        }
      

Metrics

nScans - Counter

The number of times the directory has been scanned for files.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime, streams_boost_filesystem, streams_boost_system
Include Path: ../../../impl/include