Operator HDFS2FileSource

The HDFS2FileSource operator reads files from a Hadoop Distributed File System (HDFS).

The operator opens a file on HDFS and sends out its contents in tuple format on its output port.

If the optional input port is not specified, the operator reads the HDFS file that is specified in the file parameter and provides the file contents on the output port. If the optional input port is configured, the operator reads the files that are named by the attribute in the tuples that arrive on its input port and places a punctuation marker between each file.

Behavior in a consistent region

The HDFS2FileSource operator can participate in a consistent region. The operator can be at the start of a consistent region if there is no input port.

The operator supports periodic and operator-driven consistent region policies. If the consistent region policy is set as operator driven, the operator initiates a drain after a file is fully read. If the consistent region policy is set as periodic, the operator respects the period setting and establishes consistent states accordingly. This means that multiple consistent states can be established before a file is fully read.

At checkpoint, the operator saves the current file name and file cursor location. If the operator does not have an input port, upon application failures, the operator resets the file cursor back to the checkpointed location, and starts replaying tuples from the cursor location. If the operator has an input port and is in a consistent region, the operator relies on its upstream operators to properly reply the filenames for it to re-read the files from the beginning.


The HDFS2FileSource operator terminates in the following cases:
  • The operator cannot connect to HDFS.
  • The file cannot be opened.
  • The file does not exist.
  • The file becomes unreadable.
  • A tuple cannot be created from the file contents (such as a problem with the file format).


This operator has 1 input port and 1 output port.
This operator does not accept any windowing configurations.
This operator supports 18 parameters.

Optional: authKeytab, authPrincipal, blockSize, configPath, credFile, encoding, file, hdfsPassword, hdfsUri, hdfsUser, initDelay, keyStorePassword, keyStorePath, libPath, policyFilePath, reconnectionBound, reconnectionInterval, reconnectionPolicy

This operator reports 1 metric.



Input Ports

Ports (0)

The HDFS2FileSource operator has one optional input port. If an input port is specified, the operator expects an input tuple with a single attribute of type rstring. The input tuples contain the file names that the operator opens for reading. The input port is non-mutating.


Output Ports

Java operators do not support output assignments.
Ports (0)

The HDFS2FileSource operator has one output port. The tuples on the output port contain the data that is read from the files. The operator supports two modes of reading. To read a file line-by-line, the expected output schema of the output port is tuple<rstring line> or tuple<ustring line>. To read a file as binary, the expected output schema of the output port is tuple<blob data>. Use the blockSize parameter to control how much data to retrieve on each read. The operator includes a punctuation marker at the conclusion of each file. The output port is mutating.



This parameter specifies the file that contains the encrypted keys for the user that is specified by the authPrincipal parameter. The operator uses this keytab file to authenticate the user. The keytab file is generated by the administrator. You must specify this parameter to use Kerberos authentication.


This parameter specifies the Kerberos principal that you use for authentication. This value is set to the principal that is created for the IBM Streams instance owner. You must specify this parameter if you want to use Kerberos authentication.


This parameter specifies the maximum number of bytes to be read at one time when reading a file into binary mode (ie, into a blob); thus, it is the maximum size of the blobs on the output stream. The parameter is optional, and defaults to 4096.

This parameter specifies the absolute path to the directory that contains the core-site.xml file, which is an HDFS configuration file. If this parameter is not specified, by default the operator looks for the core-site.xml file in the following locations:
  • $HADOOP_HOME/../hadoop-conf
  • $HADOOP_HOME/etc/hadoop
  • $HADOOP_HOME/conf
  • $HADOOP_HOME/share/hadoop/hdfs/*
  • $HADOOP_HOME/share/hadoop/common/*
  • $HADOOP_HOME/share/hadoop/common/lib/*
  • $HADOOP_HOME/lib/*
Note: For connections to Hadoop instances deployed on IBM Analytics Engine, the $HADOOP_HOME environment variable is not supported and should not be used.

This parameter specifies a file that contains login credentials. The credentials are used to connect to GPFS remotely by using the webhdfs://hdfshost:webhdfsport schema. The credentials file must contain information about how to authenticate with IBM Analytics Engine when using the webhdfs schema. For example, the file must contain the user name and password for an IBM Analytics Engine user. When connecting to HDFS instances deployed on IBM Analytics Engine, the credentials are provided using the hdfsUser and hdfsPassword parameters.


This parameter specifies the encoding to use when reading files. The default value is UTF-8.


This parameter specifies the name of the file that the operator opens and reads. This parameter must be specified when the optional input port is not configured. If the optional input port is used and the file name is specified, the operator generates an error.


This parameter specifies the password to use when you connecting to a Hadoop instance deployed on IBM Analytics Engine. If this parameter is not specified, attempts to connect to a Hadoop instance deployed on IBM Analytics Engine will cause an exception.

This parameter specifies the uniform resource identifier (URI) that you can use to connect to the HDFS file system. The URI has the following format:
  • To access HDFS locally or remotely, use hdfs://hdfshost:hdfsport
  • To access GPFS locally, use gpfs:///.
  • To access GPFS remotely, use webhdfs://hdfshost:webhdfsport.
  • To access HDFS via a web connection for HDFS deployed on IBM Analytics Engine, use webhdfs://webhdfshost:webhdfsport.

If this parameter is not specified, the operator expects that the HDFS URI is specified as the fs.defaultFS or fs.default.name property in the core-site.xml HDFS configuration file. The operator expects the core-site.xml file to be in $HADOOP_HOME/../hadoop-conf or $HADOOP_HOME/etc/hadoop or in the directory specified by the configPath parameter. Note: For connections to HDFS on IBM Analytics Engine, the $HADOOP_HOME environment variable is not supported and so either hdfsUri or configPath must be specified.


This parameter specifies the user ID to use when you connect to the HDFS file system. If this parameter is not specified, the operator uses the instance owner ID to connect to HDFS. When connecting to Hadoop instances on IBM Analytics Engine, this parameter must be specified otherwise the connection will be unsuccessful. When you use Kerberos authentication, the operator authenticates with the Hadoop file system as the instance owner by using the values that are specified in the authPrincipal and authKeytab parameters. After successful authentication, the operator uses the user ID that is specified by the hdfsUser parameter to perform all other operations on the file system.

NOTE: When using Kerberos authentication, the IBM Streams instance owner must have super user privileges on HDFS or GPFS to perform operations as the user that is specified by the hdfsUser parameter.


This parameter specifies the time to wait in seconds before the operator reads the first file. The default value is 0.


This optional attribute is only supported when connecting to a Hadoop instance deployed on IBM Analytics Engine. It specifies the password for the keystore file. This attribute is specified when the keyStore attribute is specified and the keystore file is protected by a password. If the keyStorePassword is invalid the operator terminates.


This optional attribute is only supported when connecting to a Hadoop instance deployed on IBM Analytics Engine. It specifies the path to the keystore file, which is in PEM format. The keystore file is used when making a secure connection to the HDFS server and must contain the public certificate of the HDFS server that will be connected to. Note: If this parameter is omitted, invalid certificates for secure connections will be accepted. If the keystore file does not exist, or if the certificate it contains is invalid, the operator terminates. The location of the keystore file can be absolute path on the filesystem or a path that is relative to the application directory. See the section on "SSL Configuration" in the main page of this toolkit's documentation for information on how to configure the keystore. The location of the keystore file can be absolute path on the filesystem or a path that is relative to the application directory.


This optional parameter specifies the absolute path to the directory that contains the Hadoop library files. If this parameter is omitted and $HADOOP_HOME is not set, the IBM Analytics Engine specific libraries within the impl/lib/ext folder of the toolkit will be used. When specified, this parameter takes precedence over the $HADOOP_HOME environment variable and the libraries within the folder indicated by $HADOOP_HOME will not be used.


This optional parameter is relevant when connecting to IBM Analytics Engine on IBM Cloud. It specifies the path to the directory that contains the Java Cryptography Extension policy files (US_export_policy.jar and local_policy.jar). The policy files enable the Java operators to use encryption with key sizes beyond the limits specified by the JDK. See the section on "Policy file configuration" in the main page of this toolkit's documentation for information on how to configure the policy files. If this parameter is omitted the JVM default policy files will be used. When specified, this parameter takes precedence over the JVM default policy files. Note: This parameter changes a JVM property. If you set this property, be sure it is set to the same value in all HDFS operators that are in the same PE. The location of the policy file directory can be absolute path on the file system or a path that is relative to the application directory.


This optional parameter specifies the number of successive connection attempts that occur when a connection fails or a disconnect occurs. It is used only when the reconnectionPolicy parameter is set to BoundedRetry; otherwise, it is ignored. The default value is 5.


This optional parameter specifies the amount of time (in seconds) that the operator waits between successive connection attempts. It is used only when the reconnectionPolicy parameter is set to BoundedRetry or InfiniteRetry; othewise, it is ignored. The default value is 10.


This optional parameter specifies the policy that is used by the operator to handle HDFS connection failures. The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. The default value is BoundedRetry. If NoRetry is specified and a HDFS connection failure occurs, the operator does not try to connect to the HDFS again. The operator shuts down at startup time if the initial connection attempt fails. If BoundedRetry is specified and a HDFS connection failure occurs, the operator tries to connect to the HDFS again up to a maximum number of times. The maximum number of connection attempts is specified in the reconnectionBound parameter. The sequence of connection attempts occurs at startup time. If a connection does not exist, the sequence of connection attempts also occurs before each operator is run. If InfiniteRetry is specified, the operator continues to try and connect indefinitely until a connection is made. This behavior blocks all other operator operations while a connection is not successful. For example, if an incorrect connection password is specified in the connection configuration document, the operator remains in an infinite startup loop until a shutdown is requested.


Code Templates


stream<${streamType}> ${streamName} = HDFS2FileSource() {
                 file : "${filename}";

HDFS2FileSource connecting to IBM Analytics Engine

stream<${streamType}> ${streamName} = HDFS2FileSource() {
               	 file: "${filename}";
               	 hdfsUser: "${hdfsUser}";
               	 hdfsPassword: "${hdfsPassword}";
                 hdfsUri: "${hdfsUri}";


nFilesOpened - Counter

The number of files that are opened by the operator for reading data.


