IBM Streams 4.2.1

Operator DirectoryWatch

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streams.teda/op$com.ibm.streams.teda.adapter$DirectoryWatch.svg

The DirectoryWatch operator monitors configured directories and reports changes in them. The operator does not regularly scan the directories, as the DirectoryScan operator does, but instead uses the Linux kernel subsystem called 'inotify'. This kernel subsystem generates events for file changes in directories it watches https://en.wikipedia.org/wiki/Inotify.

Instead of scanning all its configured directories on a regular basis, the DirectoryWatch operator creates an inotify instance and adds watches for the configured directories. Then, triggered by inotify events, the operator reports file names via its output port. You can configure the events the operator reacts to: you choose from new/renewed/moved-in, deleted/moved-out, or both kinds of file events.

You also decide if already existing files are reported during start-up or when adding another directory. At the end of such a scan the operator sends a WindowMarker punctuation.

All file events are sent as they occur without a delay or interval time. That means that no file event is missed. If a file is updated more than once within a short period of time, all file close events are reported.

The DirectoryWatch operator has an optional control port, too. You use the control port to stop, start, and resume processing of inotify events, and to add or remove directories.

Limitations

  • A shared/mounted file system which does not produce events in inotify can not use this operator to monitor its directories.
  • If you rename a directory to a name that is already in your watch list and this directory has files in it, these files are not reported as existing files. Subsequent file changes are reported.
  • Sub-directories are not automatically included. You need to add them to the watch list separately.
  • Files that appeared and disappeared during a checkpoint reset phase are not reported.

Operator features in detail:

  • add a watch for a single directory or a list of directories
  • report file events like added/renewed or removed files in each watched directory
  • pattern to filter files to report
  • add and remove directories at run time via the control port
  • start/stop/resume the operator via the control port
  • flag switch to report existing files at start
  • resilient against non-existing directories to be watched
  • running or stopped at application start
  • start with variable delay

Note: No file event is reported twice. Beside that fact, some file actions can generate more than one event, for example, editors might generate temporary files, delete the original file, and rename the temporary file. It is the task of the subsequent operators to process these events in a reasonable way.

Reported action names are:
  • CLOSE_WRITE - A file open for writing was closed
  • MOVED_FROM - A file was moved from the watched folder
  • MOVED_TO - A file was moved to the watched folder
  • DELETE - A sub-directory or file was deleted
  • EXISTING - While event processing was not running new files appeared in configured directories
  • DELETED - While event processing was not running known files disappeared in configured directories

When should this operator be used

The DirectoryWatch is an alternative for the DirectoryScan operator. Its advantages are:
  • The operator reduces the CPU load - the operator needs CPU time only in case of file changes. It can help especially in case of:
    • directories with many unchanged files
    • short scan periods
  • It misses no file change - This operator reports each file event. Thus you miss no file change event for fast changing files.
  • The operator avoids (wrong) multiple file reports – for instance, when a copy action of a large file takes longer than the scan period. In that case, the DirectoryScan reports an extra file event before the file is closed. The DirectoryWatch reports only one event after the copy is completed.
  • It avoids delay – the DirectoryWatch generates a tuple immediately after the file event occurs without a delay as caused by the scan period of the DirectoryScan operator.

Behavior in a consistent region

The DirectoryWatch operator can be an operator within the reachability graph of a consistent region. It can be the start of a consistent region. The Streams system may apply restrictions when the operator participates in a consistent region but uses a control port, too. On a checkpoint the operator stores its internal state. On reset the internal state is restored and files that appeared in configured directories during the checkpoint and reset phase are reported as existing files to downstream operators. If there are no checkpoints stored, the operator is reset to its initial state like it has been newly constructed.

Checkpointing

When the DirectoryWatch operator is in an autonomous region it may be configured with a config checkpoint : periodic(T) clause. The operator does not support the config checkpoint : operatorDriven clause.

Summary

Ports
This operator has 1 input port and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 10 parameters.

Required: directories

Optional: commandAttribute, eventsToWatch, getNanoseconds, initDelay, initialScan, pathAttribute, pattern, runAtStart, watchedDirectoriesSyncInterval

Metrics
This operator reports 2 metrics.

Properties

Implementation
C++
Threading
Never - Operator never provides a single threaded execution context.

Input Ports

Ports (0)

This port is an optional control port of the DirectoryWatch operator. The input port schema must contain a command and a path attribute. The names of these attributes are configured with the 'commandAttribute' and 'pathAttribute' parameters. The command attribute must have the type 'DirectoryWatch.Command' while the path attribute is of type 'rstring'.

Each tuple represents a command used to control the behavior of the operator. The path attribute is only relevant for 'addDirectory' and 'removeDirectory' commands.

Accepted commands are:
  • start - starts processing of watch events, existing files are reported (if configured)
  • resume - resumes processing of watch events, existing files are NOT reported
  • stop - stops processing of watch events
  • addDirectory - adds a new directory to the list
  • removeDirectory - removes a directory from the list
Properties

Output Ports

Output Functions
DirectoryWatchDataPortFunctions
rstring FilePath()

The absolute path name of a file in a watched directory.

rstring FileName()

The basename of the file.

rstring Directory()

The pathname of the files directory.

rstring Event()
The name of the file event. Possible values are:
  • CLOSE_WRITE - if the file was closed after writing
  • MOVED_TO - the file was moved into the watched directory
  • DELETE - the file was deleted
  • MOVED_FROM - the file was moved out of the watched directory
  • EXISTING - the file exists in configured directories when the operator starts
  • DELETED - a file known before disappeared in configured directories
Remark: existing files are sorted by the last modification time in ascending order.

uint64 Size()

The size of the file in bytes.

timestamp FileTime()

The modification time (mtime) of the file.

<any T> T AsIs(T)

Return the attribute unchanged.

uint64 RequestedWatches()

Returns the number of directories that are configured either by parameter or over the control port.

uint64 AvailableWatches()

Returns the number of directories for which watches have been successfully added to the inotify instance. The value can be less than nRequestedWatches, for example, if some directories do not exist. The value is 0 if the operator is stopped.

DirectoryWatchResponsePortFunctions
DirectoryWatch.CommandResult Result()

The result from performing the command provided via the command port.

rstring Message()

An informational text message providing more information regarding the command if applicable.

rstring MessageId()

The unique identifier of the informational text message.

<any T> T AsIs(T)

It returns the argument unchanged.

Ports (0)

This mandatory port puts out the file data. You assign values to the attributes of your output tuple by using the custom output functions in the operator's 'output' clause. If you omit an attribute in the 'output' clause, its type must be rstring and it will be assigned the reported file's complete path by default. If you completely omit the 'output' clause, the operator expects a rstring attribute in the output tuple and will assign the filepath to it.

Assignments
This port set requires that assignments made to output attributes cannot reference input stream attributes.

Properties

Ports (1)
This optional port produces the response for a received command. Use the 'output' clause to fill the attributes of the output tuple. Data of attributes with the same type and name as command port attributes is copied to the output tuple. You can explicitly assign input attributes to output attributes if their names differ. Additionally, the following custom output functions are provided:
  • Result()
  • MessageId()
  • Message()
Assignments
This port set allows any SPL expression of the correct type to be assigned to output attributes.

Properties

Parameters

This operator supports 10 parameters.

Required: directories

Optional: commandAttribute, eventsToWatch, getNanoseconds, initDelay, initialScan, pathAttribute, pattern, runAtStart, watchedDirectoriesSyncInterval

commandAttribute

The name of the command port's tuple attribute that holds the command to the operator. Must be of type 'DirectoryWatch.Command'. Defined as optional but is mandatory when using the command port.

Properties

directories

The parameter that defines a list of directory paths to be watched.

Properties

eventsToWatch

Parameter to define which file event groups should be watched. Default is added.

Properties

getNanoseconds

This parameter specifies whether the nanosecond information of the file time stamp is considered when sorting existing files and in the FileTime() function. The default is false.

Properties

initDelay

Parameter to specify the number of seconds that the DirectoryWatch operator waits before it starts to produce tuples.

Properties

initialScan

This parameter specifies whether existing files are reported at application start, after a 'start' command, and after adding a new directory. The default is true.

Properties

pathAttribute

The attribute that holds the directory path. The name of the command port's tuple attribute that holds the path to the directory to add or remove. Defined as optional but is mandatory when using the command port.

Properties

pattern

Parameter that lets the DirectoryWatch operator ignore files with names that do not match the regular expression pattern.

Properties

runAtStart

This parameter sets the run state of the operator after start. Its default value is true. If you do not use the control port the parameter must have the value true or the operator will not produce data. If you use the control port you use the start, resume, and stop commands to start or hold the processing.

Properties

watchedDirectoriesSyncInterval

Some of the configured directories in the DirectoryWatch operator may point to non-existing directories. The operator cannot create watches for these non-existing directories, but checks regularly if the directories have been created in the meantime. This parameter here specifies the interval in seconds between these checks. The default is 10.0 seconds. Set this parameter to 0.0 to disable the synchronizing, which means directories not available at operator start will never be monitored.

Properties

Metrics

nRequestedWatches - Gauge

Number of directories that are configured either by parameter or over the control port.

nAvailableWatches - Gauge

Number of directories for which watches have been successfully added to the inotify instance. The value can be less than nRequestedWatches, for example, if some directories do not exist. The value is 0 if the operator is stopped.