Directory
The Directory source reads data from files in a local directory. The source can use multiple threads to enable the parallel processing of files.
When you configure the Directory source, you define the directory to use, read order, file name pattern, file name pattern mode, and the first file to process. You can use glob patterns or regular expressions to define the file name pattern that you want to use.
When using the Last Modified Timestamp read order, you can configure the source to read from subdirectories. To use multiple threads for processing, specify the number of threads to use.
You can also enable reading compressed files or files in a late arriving directory. After processing a file, the Directory source can keep, archive, or delete the file.
When the flow stops, the Directory source notes where it stops reading. When the flow starts again, the source continues processing from where it stopped by default. You can reset the offset to process all requested files.
The source generates record header attributes that enable you to use the sources of a record in flow processing.
The source can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
File name pattern and mode
Use a file name pattern to define the files that the Directory source processes. You can use either a glob pattern or a regular expression to define the file name pattern.
The Directory source
processes files based on the file name pattern mode, file
name pattern, and specified directory. For example, if you specify a
/logs/weblog/ directory, glob mode, and *.json
as the file name pattern, the source processes all files with the
json extension in the /logs/weblog/
directory.
The source processes files in order based on the specified read order.
For more information about glob syntax, see the Oracle Java documentation. For more information about regular expressions, see Regular expressions overview.
Read order
The Directory source reads files in ascending order based on the timestamp or file name:
- Last Modified Timestamp
- The Directory source can read files in ascending order based on the timestamp associated with the file. The source checks both the last-modified timestamp and the changed timestamp, then uses the highest - the more recent - of the two when ordering files for processing.
- Lexicographically Ascending File Names
- The Directory source can read files in lexicographically ascending order based on file names. Lexicographically ascending order reads the numbers 1 through 11 as follows:
Multithreaded processing
The Directory source can use multiple threads to perform parallel processing based on the Number of Threads property.
When using multiple threads in a flow, each thread reads data from a single file, and each file can have a maximum of one thread read from it at a time. The file read order is based on the configuration for the Read Order property.
As the flow runs, each thread connects to the source system, creates a batch of data, and passes the batch to an available flow runner. A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source.
Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
For example, say you configure the source to read files from a directory using 5 threads and the Last Modified Timestamp read order. When you start the flow, the source creates five threads, and Data Collector creates a matching number of flow runners.
The Directory source assigns a thread to each of the five oldest files in the directory. Each thread processes its assigned file, passing batches of data to the source. Upon receiving data, the source passes a batch to each of the flow runners for processing.
After each thread completes processing a file, it continues to the next file based on the last-modified timestamp, until all files are processed.
For more information about multithreaded flows, see Multithreaded flow overview.
Reading from subdirectories
When using the Last Modified Timestamp read order, the Directory source can read files in subdirectories of the specified file directory.
When you configure the source to read from subdirectories, it reads files from all subdirectories. It reads files in ascending order based on timestamp, regardless of the location of the file within the directory.
| File Name | Directory | Last Modified Timestamp |
|---|---|---|
log-1.json |
/logs/west/
|
APR 24 2016 14:03:35
|
log-0054.json |
/logs/east/
|
APR 24 2016 14:05:03 |
log-0055.json
|
/logs/west/ |
APR 24 2016 14:45:11
|
log-2.json |
/logs/
|
APR 24 2016 14:45:11
|
Post-processing subdirectories
When the Directory source reads from subdirectories, it uses the subdirectory structure when archiving files during post-processing.
You can archive files when the source completes processing a file or when it cannot fully process a file.
| File Name | Archive Directory |
|---|---|
log-1.json
|
/processed/logs/west/
|
log-0054.json
|
/processed/logs/east/
|
log-0055.json |
/processed/logs/west/
|
log-2.json |
/processed/logs/ |
First file for processing
Configure a first file for processing when you want Directory to ignore one or more existing files in the directory.
When you define a first file to process, Directory starts processing with the specified file and continues based on the read order and file name pattern. When you do not specify a first file, Directory processes all files in the directory that match the file name pattern.
For example, say Directory reads files based on last-modified or changed timestamp. To ignore all files older than a particular file, use that file name as the first file to process.
Similarly, say you have Directory reading files based on lexicographically ascending file names, and the file directory includes the following files: web_001.log, web_002.log, web_003.log.
If you configure web_002.log as the first file, Directory reads web_002.log and continues to web_003.log. It skips web_001.log.
Late directory
You can configure the Directory source to read files in a late directory - a directory that appears after the flow starts.
When reading from a late directory, the source does not validate the directory path when you start the flow. If the directory does not exist when the flow starts, the source waits indefinitely for the appearance of the directory and a file to process.
For example, say you read files in the following directory:
/logs/server/
The directory does not exist when you start the flow, so the Directory source waits until the directory and a file matching the file name pattern appears, and then processes the data.
After the /logs/server directory appears, the source can then process the following files that are written to the directory:
/logs/server/log.json
/logs/server/log1.json
/logs/server/log2.json
Record header attributes
The Directory source creates record header attributes that include information about the originating file for the record.
When the source
processes Avro data, it includes the Avro schema in
an avroSchema record header attribute. When the source
processes Parquet data and Skip Union Indexes is
not enabled, it generates an avro.union.typeIndex./id
record header attribute identifying the index number of the
element in a union the data is read from.
You can use the record:attribute or
record:attributeOrDefault functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with header attributes.
- avro.union.typeIndex./id - When processing Parquet data, provides the index number of the element in the union that the data is read from if Skip Union Indexes is not enabled.
- avroSchema - When processing Avro data, provides the Avro schema.
- baseDir - Base directory containing the file where the record originated.
- filename - Provides the name of the file where the record originated.
- file - Provides the file path and file name where the record originated.
- mtime - Provides the last-modified time for the file.
- offset - Provides the file offset in bytes. The file offset is the location in the file where the record originated.
Event generation
The Directory source can generate events that you can use in an event stream. When you enable event generation, the source generates event records each time the source starts or completes reading a file. It can also generate events when it completes processing all available data and the configured batch wait time has elapsed.
- With the Pipeline Finisher executor to
stop the flow and transition the flow to a Finished state when the source completes processing available data.
For an example, see Stopping a flow after processing all available data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending email during flow processing.
- With a target to store event information.
For an example, see Preserving an audit trail of events.
For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
Event records
| Record Header Attribute | Description |
|---|---|
| sdc.event.type | Event type. Uses one of the following types:
|
| sdc.event.version | Integer that indicates the version of the event record type. |
| sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The Directory source can generate the following types of event records:
- new-file
- The Directory source generates a new-file event record when it starts processing a new file.
- finished-file
- The Directory source generates a finished-file event record when it finishes processing a file.
- no-more-data
- The Directory source generates a no-more-data event record when the source completes processing all available records and the number of seconds configured for Batch Wait Time elapses without any new files appearing to be processed.
Buffer limit and error handling
The Directory source passes each record to a buffer. The size of the buffer determines the maximum size of the record that can be processed. Decrease the buffer limit when memory on the Data Collector machine is limited. Increase the buffer limit to process larger records when memory is available.
- Discard
- The source discards the record and all remaining records in the file, and then continues processing the next file.
- Send to Error
- With a buffer limit error, the source cannot send the record to the flow
for error handling because it is unable to fully process the record.
Instead, the source creates a message stating that a buffer overrun error occurred. The message includes the file and offset where the buffer overrun error occurred. The information displays in the flow history.
If an error directory is configured for the stage, the source moves the file to the error directory and continues processing the next file.
- Stop Flow
- The source stops the flow and creates a message stating that a buffer overrun error occurred. The message includes the file and offset where the buffer overrun error occurred. The information displays in the flow history.
Data formats
The Directory source processes data differently based on the data format.
- Avro
- Generates a record for every Avro record. The source includes the Avro schema in the
avroSchemarecord header attribute. It also includes aprecisionandscalefield attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Delimited
- Generates a record for each delimited line.
- Excel
- Generates a record for every row in the file. Can process
.xlsor.xlsxfiles.You can configure the source to read from all sheets in a workbook or from particular sheets in a workbook. You can specify whether files include a header row and whether to ignore the header row. You can also configure the source to skip cells that do not have a corresponding header value. A header row must be the first row of a file. Vertical header columns are not recognized.
The source cannot process Excel files with large numbers of rows. You can save such files as CSV files in Excel, and then use the source to process with the delimited data format.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Parquet
- The source generates records for every Parquet record in the file.
The file must contain the Parquet schema. The source uses the
Parquet schema to generate records.
The stage includes the Parquet schema in a
parquetSchemarecord header attribute.When Skip Union Indexes is not enabled, the source generates an
avro.union.typeIndex./idrecord header attribute identifying the index number of the element in the union that the data is read from. If a schema contains many unions and the flow does not depend on index information, you can enable Skip Union Indexes to avoid long processing times associated with storing a large number of indexes. - Protobuf
- Generates a record for every protobuf message.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector flow using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- Whole File
- Streams whole files from the source system to the target system. You can specify a transfer rate or use all available resources to perform the transfer.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the source treats the XML file as a single record.
Configuring a Directory source
Configure a Directory source to read data from files in a directory.