Hadoop FS
The Hadoop FS destination writes data to Hadoop Distributed File System (HDFS). You can also use the destination to write to Azure Blob storage. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
You can write data as flat files or Hadoop sequence files. You can also use the whole file data format to write whole files to HDFS.
When you configure a Hadoop FS destination, you can define a directory template and time basis to determine the output directories that the destination creates and the files where records are written.
As part of the Drift Synchronization Solution for Hive, you can alternatively use record header attributes to perform record-based writes. You can write records to the specified directory, use the defined Avro schema, and roll files based on record header attributes. For more information, see Record Header Attributes for Record-Based Writes.
You can define a file prefix and suffix, the data time zone, and properties that define when the destination closes a file. You can specify the amount of time that a record can be written to its associated directory and what happens to late records.
The destination can generate events for an event stream. For more information about the event framework, see Dataflow Triggers Overview.
When necessary, you can enable Kerberos authentication. You can also specify a Hadoop user to impersonate, define a Hadoop configuration file directory, and add Hadoop configuration properties as needed.
You can use Gzip, Bzip2, Snappy, LZ4, and other compression formats to write output files.
Directory Templates
By default, the Hadoop FS destination uses directory templates to create output and late record directories. Hadoop FS writes records to the directories based on the configured time basis.
You can alternatively write records to directories based
on the targetDirectory
record header attribute. Using the
targetDirectory
attribute disables the ability to define
directory templates.
When you define a directory template, you can use a mix
of constants, field values, and datetime variables. You can use the
every
function to create new directories at regular
intervals based on hours, minutes, or seconds, starting on the hour. You can also
use the record:valueOrDefault
function to create new directories
from field values or a default in the directory template.
/outputfiles/${record:valueOrDefault("/State", "unknown")}/${YY()}-${MM()}-${DD()}-${hh()}
- Constants
- You can use any constant, such as
output
. - Datetime Variables
- You can use datetime variables, such as
${YYYY()}
or${DD()}
. The destination creates directories as needed, based on the smallest datetime variable that you use. For example, if the smallest variable is hours, then the directories are created for every hour of the day that receives output records. every
function- You can use the
every
function in a directory template to create directories at regular intervals based on hours, minutes, or seconds, beginning on the hour. The intervals should be a submultiple or integer factor of 60. For example, you can create directories every 15 minutes or 30 seconds. record:valueOrDefault
function- You can use the
record:valueOrDefault
function in a directory template to create directories with the value of a field or the specified default value if the field does not exist or if the field is null:${record:valueOrDefault(<field path>, <default value>)}
Time Basis
When using directory templates, the time basis helps determine when directories are created. It also determines the directory Hadoop FS uses when writing a record, and whether a record is late.
When using the targetDirectory
record
header attribute to write records, the time basis determines only whether a
record is late.
You can use the following times as the time basis:
- Processing Time
- When you use processing time as the time basis, the destination creates directories based on the processing time and the directory template, and writes records to the directories based on when they are processed.
- Record Time
- When you use the time associated with a record as the time basis, you specify a Date field in the record. The destination creates directories based on the datetimes associated with the records and writes the records to the appropriate directories.
Late Records and Late Record Handling
When you use a record time as the time basis, you can define a time limit for records to be written to their associated output file. When the destination creates a new output file in a new directory, the previous output file is kept open for the specified late record time limit. When records that belong in that file arrive within the time limit, the destination writes the records to the open output file. When the late record time limit is reached, the output file is closed and any record that arrives past this limit is considered late.
You can send late records to a late records file or to the stage for error handling. When you send records to a late records file, you define a late records directory template.
/tmp/out/${YYYY()}-${MM()}-${DD()}-${hh()}
The first records that arrive have a datetime between the hours of 02:00 and 02:59, and so are written to an output file in the 02 directory. When records with a datetime between the hours of 03:00 and 03:59 arrive, the destination creates a new file in an 03 directory. The destination keeps the file in the 02 directory open for another hour.
If a record with a datetime between the hours of 02:00 and 02:59 arrives before the hour time limit, the destination writes the record to the open file in the 02 directory. After one hour, the destination closes the output file in the 02 directory. Any records with a datetime between the hours of 02:00 and 02:59 that arrive after the one-hour time limit are considered late. The late records are sent to the stage for error handling.
Timeout to Close Idle Files
You can configure the maximum time that an open output file can remain idle. After no records are written to an output file for the specified amount of time, the Hadoop FS destination closes the file.
You might want to configure an idle timeout when output files remain open and idle for too long, thus delaying another system from processing the files.
- You configured the maximum number of records to be written to output files or the maximum size of output files, but records have stopped arriving. An output file that has not reached the maximum number of records or the maximum file size stays open until more records arrive.
- You configured a date field in the record as the time basis and have
configured a late record time limit, but records arrive in
chronological order. When a new directory is created, the output file
in the previous directory remains open for the configured late record
time limit. However, no records are ever written to the open file in
the previous directory.
For example, when a record with a datetime of 03:00 arrives, the destination creates a new file in a new 03 directory. The previous file in the 02 directory is kept open for the late record time limit, which is an hour by default. However, when records arrive in chronological order, no records that belong in the 02 directory arrive after the 03 directory is created.
In either situation, configure an idle timeout so that other systems can process the files sooner, instead of waiting for the configured maximum records, maximum file size, or late records conditions to occur.
Recovery
The Hadoop FS destination supports recovery after an unexpected stop of the pipeline by renaming temporary files when the pipeline restarts.
_tmp_<prefix>_<runnerId>
Where
<prefix>
is the file prefix defined for the destination
and <runnerId>
is the ID of the pipeline runner performing the
pipeline processing. For example, when the destination prefix is defined as
sdc
and the destination runs from a single-threaded
pipeline, the temporary file is named like so: _tmp_sdc_0
. _tmp_
string and to
replace the pipeline runner ID with a random unique identifier like
so:<prefix>_e7ce67c5-013d-47a7-9496-8c882ddb28a0
However, when the pipeline stops unexpectedly, the temporary files remain. When the pipeline restarts, the destination scans all subdirectories of the defined directory template to rename any temporary files that match the defined prefix for the destination. After the destination renames the temporary files, it continues writing to new output files.
_tmp_<prefix>
-
the destination renames that file also.- The directory template includes an expression with the record:value or record:valueOrDefault function.
- If the record:value or record:valueOrDefault function evaluates to an empty string or to a subdirectory, the destination cannot determine those locations when the pipeline restarts. As a result, the destination cannot rename any temporary files written to those locations.
- The directory is defined in the targetDirectory record header attribute.
- When the directory is defined in the targetDirectory record header attribute, the destination cannot determine where to look for temporary files when the pipeline restarts. As a result, it cannot rename the temporary files.
In either of these situations, you must manually rename the temporary files.
File recovery can slow down the pipeline as it restarts. If needed, you can configure the destination to skip file recovery.
Data Formats
- Avro
- The destination writes records based on the Avro schema. You can use one of the following methods to specify the location of the Avro schema definition:
- Binary
- The stage writes binary data to a single field in the record.
- Delimited
- The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
- JSON
- The destination writes records as JSON data. You can use one of
the following formats:
- Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
- Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
- Parquet
- The destination writes a Parquet file for each partition and includes the Parquet schema in every file.
- Protobuf
- Writes a batch of messages in each file.
- SDC Record
- The destination writes records in the SDC Record data format.
- Text
- The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
- Whole File
- Streams whole files to the destination system. The destination writes the data to the file and location defined in the stage. If a file of the same name already exists, you can configure the destination to overwrite the existing file or send the current file to error.
Writing to Azure Blob Storage
Using the HDP stage library, the Hadoop FS destination can access Azure Blob storage with the WASB protocol. To write to Azure Blob storage, configure the Hadoop FS destination to use the HDP stage library and to connect with the appropriate URI and Azure credentials.
Writing to Azure Blob Storage with HDInsight
If you installed Data Collector on Azure HDInsight, then you can write to Azure Blob storage with HDInsight.
- On the General tab of the Hadoop FS destination, set the Stage Library property to the HDP stage library version 2.4 or later.
- Configure Azure credentials in one of the following ways:
- If the Azure credentials are defined in the HDFS configuration file
core-site.xml
, configure the destination to access the file.- On the Connection tab, configure the Configuration Files Directory property to point to the directory that includes the file.
- If the credentials are not defined in the
core-site.xml
file, configure the destination to pass an additional configuration property that contains the Azure credentials:- On the Connection tab, configure the
Additional Configuration property to
pass the Azure credentials.
You can use simple or bulk edit mode to add configuration properties.
- If necessary, click the Add icon to add a new additional configuration property.
- In the Name property, enter the Azure
storage account name as
follows:
For example, if the storage account name isfs.azure.account.key.<storage account name>.blob.core.windows.net
sdchd
, then enter the following name for the property:fs.azure.account.key.sdchd.blob.core.windows.net
Tip: You can find the Azure storage account name on the Access Keys page in the Microsoft Azure portal. To view the page in the Microsoft Azure portal, click . A page like the following appears, with the storage account name and access keys: - In the Value property, enter an access
key value for the Azure storage account. You can use any valid key.Tip: The account key value also displays on the Access Keys page. For example, on the image above, you could use either the key1 or key2 value.
- On the Connection tab, configure the
Additional Configuration property to
pass the Azure credentials.
- If the Azure credentials are defined in the HDFS configuration file
- On the Connection tab, configure the File System
URI property as
follows:
<wasb[s]>://<container name>@<storage account name>.blob.core.windows.net/<path to files>
In the URI, <container name> is the Azure container name, and <storage account name> is the Azure storage account name.
For example, for asdc-hd
container in a storage account namedsdchd
, with all files in a files directory, you would define the file system URI as follows:wasbs://sdc-hd@sdchd.blob.core.windows.net/files
Tip: You can find the container name and storage account name on the Essentials page in the Microsoft Azure portal. For a standard storage account, in the Microsoft Azure portal, click . For a blob storage account, click .The page shows the container name and storage account name:
Though the host name for the file system URI is<storage account name>.blob.core.windows.net
, you can alternatively use the host name of the Azure blob service endpoint as the host name for the file system URI.
Example
The following image shows how to configure the Hadoop FS destination to write to Azure Blob storage with HDInsight using the Azure account information in the examples above:
Event Generation
The Hadoop FS destination can generate events that you can use in an event stream. When you enable event generation, the destination generates event records each time the destination closes a file or completes streaming a whole file.
- With the HDFS File Metadata executor to move
or change permissions on closed files.
For an example, see Managing Output Files.
- With the Hive Query executor to run Hive or
Impala queries after closing output files.
For an example, see Automating Impala Metadata Updates for Drift Synchronization for Hive.
- With the MapReduce executor to convert
completed Avro files to ORC files or to Parquet.
For an example, see Converting Data to the Parquet Data Format.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
- With a destination to store event information.
For an example, see Preserving an Audit Trail of Events.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Event Records
Hadoop FS event records include the following event-related record header attributes. Record header attributes are stored as String values:
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses one of the following types:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
- File closure
- The destination generates a file closure event record when it closes an output file.
- Whole file processed
- The
destination generates an event record when it completes
streaming a whole file. Whole file event records have the
sdc.event.type
record header attribute set towholeFileProcessed
and have the following fields:Field Description sourceFileInfo A map of attributes about the original whole file that was processed. The attributes include: - size - Size of the whole file in bytes.
Additional attributes depend on the information provided by the origin system.
targetFileInfo A map of attributes about the whole file written to the destination. The attributes include: - path - An absolute path to the processed whole file.
checksum Checksum generated for the written file. Included only when you configure the destination to include checksums in the event record.
checksumAlgorithm Algorithm used to generate the checksum. Included only when you configure the destination to include checksums in the event record.
Kerberos Authentication
You can use Kerberos authentication to connect to HDFS. When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to HDFS. By default, Data Collector uses the user account who started it to connect.
The Kerberos principal and keytab are defined in the Data Collector configuration file,
$SDC_CONF/sdc.properties
. To use Kerberos
authentication, configure all Kerberos properties in the Data Collector
configuration file, and then enable Kerberos in the Hadoop FS destination.
For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication in the Data Collector documentation.
Impersonation User
Data Collector can either use the currently logged in Data Collector user or a user configured in the destination to write to HDFS.
A Data Collector configuration property can be set that requires using the currently logged in Data Collector user. When this property is not set, you can specify a user in the origin. For more information about Hadoop impersonation and the Data Collector property, see Hadoop Impersonation Mode in the Data Collector documentation.
Note that the destination uses a different user account to connect to HDFS. By default, Data Collector uses the user account who started it to connect to external systems. When using Kerberos, Data Collector uses the Kerberos principal.
- On Hadoop, configure the user as a proxy user and
authorize the user to impersonate a Hadoop user.
For more information, see the Hadoop documentation.
- In the Hadoop FS destination, on the Connection tab, configure the Impersonation User property.
HDFS Properties and Configuration Files
- HDFS configuration files
- You can use the following HDFS configuration files with the Hadoop FS
destination:
- core-site.xml
- hdfs-site.xml
- Individual properties
- You can configure individual HDFS properties in the destination. To add an
HDFS property, you specify the exact property name and the value. The Hadoop
FS destination does not validate the property names or
values.Note: Individual properties override properties defined in the HDFS configuration file.
Configuring a Hadoop FS Destination
Configure a Hadoop FS destination to write data to HDFS.