Operator HDFS2DirectoryScan

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.hdfs/op$com.ibm.streamsx.hdfs$HDFS2DirectoryScan.svg

The HDFS2DirectoryScan operator scans a Hadoop Distributed File System directory for new or modified files.

The HDFS2DirectoryScan is similar to the DirectoryScan operator. The HDFS2DirectoryScan operator repeatedly scans an HDFS directory and writes the names of new or modified files that are found in the directory to the output port. The operator sleeps between scans.

Behavior in a consistent region

The HDFS2DirectoryScan operator can participate in a consistent region. The operator can be at the start of a consistent region if there is no input port. The operator supports periodic and operator-driven consistent region policies.

If consistent region policy is set as operator driven, the operator initiates a drain after each tuple is submitted. This allows for a consistent state to be established after a file is fully processed. If consistent region policy is set as periodic, the operator respects the period setting and establishes consistent states accordingly. This means that multiple files can be processed before a consistent state is established.

At checkpoint, the operator saves the last submitted filename and its modification timestamp to the checkpoint. Upon application failures, the operator resubmits all files that are newer than the last submitted file at checkpoint.

Exceptions

The operator terminates in the following cases:
  • The operator cannot connect to HDFS.
  • The strictMode parameter is true but the directory is not found.
  • The path that is given by the directory name exists, but is an ordinary file and not a directory.
  • HDFS failed to give a list of files in the directory.
  • The pattern that is specified in the pattern parameter fails to compile.
Examples

Summary

Ports
This operator has 1 input port and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 19 parameters.

Optional: authKeytab, authPrincipal, configPath, credFile, directory, hdfsPassword, hdfsUri, hdfsUser, initDelay, keyStorePassword, keyStorePath, libPath, pattern, policyFilePath, reconnectionBound, reconnectionInterval, reconnectionPolicy, sleepTime, strictMode

Metrics
This operator reports 1 metric.

Properties

Implementation
Java

Input Ports

Ports (0)

The HDFS2DirectoryScan operator has an optional control input port. You can use this port to change the directory that the operator scans at run time without restarting or recompiling the application. The expected schema for the input port is of tuple<rstring directory>, a schema containing a single attribute of type rstring. If a directory scan is in progress when a tuple is received, the scan completes and a new scan starts immediately after and uses the new directory that was specified. If the operator is sleeping, the operator starts scanning the new directory immediately after it receives an input tuple.

Windowing

This operator does not support any window configurations.

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

The HDFS2DirectoryScan operator has one output port. This port provides tuples of type rstring that are encoded in UTF-8 and represent the file names that are found in the directory, one file name per tuple. The file names do not occur in any particular order. The port is non-mutating and punctuation free.

Properties

Parameters

Optional: authKeytab, authPrincipal, configPath, credFile, directory, hdfsPassword, hdfsUri, hdfsUser, initDelay, keyStorePassword, keyStorePath, libPath, pattern, policyFilePath, reconnectionBound, reconnectionInterval, reconnectionPolicy, sleepTime, strictMode

authKeytab

This parameter specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.

Properties
authPrincipal

This optional parameter specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.

Properties
configPath
This optional parameter specifies the absolute path to the directory that contains the HDFS configuration files. If this parameter is not specified, by default the operator looks for the core-site.xml file in the following locations:
  • $HADOOP_HOME/../hadoop-conf
  • $HADOOP_HOME/etc/hadoop
  • $HADOOP_HOME/conf
  • $HADOOP_HOME/share/hadoop/hdfs/*
  • $HADOOP_HOME/share/hadoop/common/*
  • $HADOOP_HOME/share/hadoop/common/lib/*
  • $HADOOP_HOME/lib/*
  • $HADOOP_HOME/*

You can use the hdfsUri parameter to override the value that is specified for the fs.defaultFS or fs.default.name option in the core-site.xml configuration file. Note: For connections to Hadoop instances deployed On IBM Cloud, the $HADOOP_HOME environment variable is not supported and should not be used.

Properties
credFile

This optional parameter contains the login credentials to use when you connect to GPFS remotely by using the webhdfs://hdfshost:webhdfsport schema. The credentials file must contain information on how to authenticate with IBM Analytics Engine when using the webhdfs schema. For example, the file must contain the user name and password for an IBM Analytics Engine user. When connecting to HDFS instances deployed On IBM Cloud, the credentials are provided using the hdfsUser and hdfsPassword parameters.

Properties
directory

This optional parameter specifies the name of the directory to be scanned. If the name starts with a slash, it is considered an absolute directory that you want to scan. If it does not start with a slash, it is considered a relative directory, relative to the '/user/userid/ directory. This parameter is mandatory if the input port is not specified.

Properties
hdfsPassword

This parameter specifies the password to use when you connecting to a Hadoop instance deployed On IBM Cloud. If this parameter is not specified, attempts to connect to a Hadoop instance deployed On IBM Cloud will cause an exception.

Properties
hdfsUri
This optional parameter of type rstring specifies the uniform resource identifier (URI) that you can use to connect to HDFS. The URI defines the host and the port that you can use to connect to HDFS. The URI has the following format:
  • To access HDFS locally or remotely, use hdfs://hdfshost:hdfsport
  • To access GPFS locally, use gpfs:///
  • To access GPFS remotely, use webhdfs://hdfshost:webhdfsport
  • To access HDFS via a web connection for HDFS deployed On IBM Cloud, use webhdfs://webhdfshost:webhdfsport.
If this parameter is not specified, the operator finds the HDFS URI from the fs.defaultFS or fs.default.name option in the core-site.xml HDFS configuration file. The path to the folder containing the core-site.xml can be set using the configPath parameter. Otherwise, the file is expected to be located in one of the following locations:
  • $HADOOP_HOME/../hadoop-conf
  • $HADOOP_HOME/etc/hadoop
  • $HADOOP_HOME/conf
  • $HADOOP_HOME/share/hadoop/hdfs/*
  • $HADOOP_HOME/share/hadoop/common/*
  • $HADOOP_HOME/share/hadoop/common/lib/*
  • $HADOOP_HOME/lib/*
  • $HADOOP_HOME/*
Note: For connections to Hadoop instances deployed On IBM Cloud, the $HADOOP_HOME environment variable is not supported and so either the hdfsUri or the configPath parameter must be specified.
Properties
hdfsUser

This optional parameter specifies the user ID to use when you connect to HDFS. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. When connecting to Hadoop instances On IBM Cloud, this parameter must be specified otherwise the connection will be unsuccessful. When you use Kerberos authentication, the operator authenticates with the Hadoop file system as the instance owner by using the values specified by the authPrincipal and authKeytab parameters. After successful authentication, the operator uses the user ID that is specified by the hdfsUser parameter to perform all other operations on the file system.

Note: When using Kerberos authentication, to perform operations as the user specified by the hdfsUser parameter, the IBM Streams instance owner must have super user privileges on HDFS or GPFS.

Properties
initDelay

This optional parameter specifies the time to wait in seconds before the operator scans the directory for the first time. The default value is 0.

Properties
keyStorePassword

This optional attribute is only supported when connecting to a Hadoop instance deployed On IBM Cloud. It specifies the password for the keystore file. This attribute is specified when the keyStore attribute is specified and the keystore file is protected by a password. If the keyStorePassword is invalid the operator terminates.

Properties
keyStorePath

This optional attribute is only supported when connecting to a Hadoop instance deployed On IBM Cloud. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to. See the section on "SSL Configuration" in the main page of this toolkit's documentation for information on how to configure the keystore. Note: If this parameter is omitted, invalid certificates for secure connections will be accepted. If the keystore file does not exist, or if the certificate it contains is invalid, the operator terminates. The location of the keystore file can be absolute path on the file system or a path that is relative to the application directory.

Properties
libPath

This optional parameter specifies the absolute path to the directory that contains the Hadoop library files. If this parameter is omitted and $HADOOP_HOME is not set, the IBM Analytics Engine specific libraries within the impl/lib/ext folder of the toolkit will be used. When specified, this parameter takes precedence over the $HADOOP_HOME environment variable and the libraries within the folder indicated by $HADOOP_HOME will not be used.

Properties
pattern

This optional parameter limits the file names that are listed to the names that match the specified regular expression. The HDFS2DirectoryScan operator ignores file names that do not match the specified regular expression.

Properties
policyFilePath

This optional parameter is relevant when connecting to IBM Analytics Engine On IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar). The policy files enable the Java operators to use encryption with key sizes beyond the limits specified by the JDK. See the section on "Policy file configuration" in the main page of this toolkit's documentation for information on how to configure the policy files. If this parameter is omitted the JVM default policy files will be used. When specified, this parameter takes precedence over the JVM default policy files. Note: This parameter changes a JVM property. If you set this property, be sure it is set to the same value in all HDFS operators that are in the same PE. The location of the policy file directory can be absolute path on the file system or a path that is relative to the application directory.

Properties
reconnectionBound

This optional parameter specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5.

Properties
reconnectionInterval

This optional parameter specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10.

Properties
reconnectionPolicy

This optional parameter specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. The default value is BoundedRetry. If NoRetry is specified and a HDFS connection failure occurs, the operator does not try to connect to the HDFS again. The operator shuts down at startup time if the initial connection attempt fails. If BoundedRetry is specified and a HDFS connection failure occurs, the operator tries to connect to the HDFS again up to a maximum number of times. The maximum number of connection attempts is specified in the reconnectionBound parameter. The sequence of connection attempts occurs at startup time. If a connection does not exist, the sequence of connection attempts also occurs before each operator is run. If InfiniteRetry is specified, the operator continues to try and connect indefinitely until a connection is made. This behavior blocks all other operator operations while a connection is not successful. For example, if an incorrect connection password is specified in the connection configuration document, the operator remains in an infinite startup loop until a shutdown is requested.

Properties
sleepTime

This optional parameter specifies the minimum time between directory scans. The default value is 5.0 seconds.

Properties
strictMode

This optional parameter determines whether the operator reports an error if the directory to be scanned does not exist. If you set this parameter to true and the specified directory does not exist or there is a problem accessing the directory, the operator reports an error and terminates. If you set this parameter to false and the specified directory does not exist or there is a problem accessing the directory, the operator treats it as an empty directory and does not report an error.

Properties

Code Templates

HDFS2DirectoryScan

stream<rstring name> ${outputStream} = HDFS2DirectoryScan()   {
            param
                directory: "${directoryToScan}";
        }
      

HDFS2DirectoryScan with HDFS2FileSource connecting to IBM Analytics Engine on Cloud

stream<rstring name> ${outputStream} = HDFS2DirectoryScan()   {
            param
                directory: "${directoryToScan}";
                hdfsUri   :"{webhdfs://hdfsServer:8443"};
				hdfsPassword: "{password"};
				hdfsUser: "{biuser"};
        }
        
        stream<${schema}> ${fileSourceStream} = HDFS2FileSource(${outputStream})   {
        	param
				hdfsUri	:"{webhdfs://hdfsServer:8443}";
				hdfsPassword	:"{password}";
				hdfsUser	:"{user}";
        }
      

Metrics

nScans - Counter

The number of times the operator scanned the directory.

Libraries

Java operator class library
Library Path: ../../impl/java/bin, ../../impl/lib/BigData.jar