Record header attributes

Record header attributes are attributes in record headers that you can use in flow logic, as needed.

Some stages create record header attributes for a particular purpose. For example, CDC-enabled sources include the CRUD operation type in the sdc.operation.type record header attribute. This enables CRUD-enabled targets to determine the operation type to use when processing records. Similarly, the PostgreSQL Metadata processor generates record header attributes that some targets can use as part of the Drift Synchronization Solution for PostgreSQL.

Other stages include processing-related information in record header attributes for general use. For example, event-generating stages include the event type in record header attributes in case you want to process the event based on that information. And several sources include information such as the originating file name, location, or partition for each record.

You can use certain processors to create or update record header attributes. For example, you can use an Expression Evaluator to create attributes for record-based writes.

The inclusion of attributes in record headers does not require using them in the flow. You can, for example, use the CDC-enabled Salesforce source in a non-CDC flow and ignore the CDC record header attributes that are automatically generated.

When writing data to target systems, record header attributes are preserved with the record only when using the Google Pub/Sub Publisher or JMS Producer target or when using another target with the SDC Record data format. To preserve the information when using other data formats, use the Expression Evaluator to copy information from record header attributes to record fields.

Working with header attributes

You can use an Expression Evaluator processor or any scripting processor to create or update record header attributes. For example, say a target requires a CRUD operation to be specified in a record header attribute. If the source providing the data does not generate that information automatically, you can use an Expression Evaluator processor or any scripting processor to set the attribute value.

Record header attributes are string values. You can use record:attribute functions in any expression to include attribute values in calculations.

Important: Record header attributes do not have field paths. When using an attribute in an expression, use just the attribute name, surrounded by quotation marks since attributes are strings, as follows:
 ${record:attribute('<attribute name>')}

For example, the following Expression Evaluator adds the file and offset record header attributes created by the Directory source to the record:

The settings for an Expression Evaluator configured to add the file and offset record header attributes to the record

Internal attributes

Data Collector generates and updates some read-only internal record header attributes as records move from stage to stage. These attributes can be viewed for debugging issues, but can only be updated by Data Collector.

The record:attribute function does not allow access to internal record header attributes. The following table describes the internal record header attributes and the functions that you can use to access the data in the flow:
Internal Record Header Attribute Description Related Function
stageCreator The ID of the stage that created the record. record:creator()
sourceId Source of the record. Can include different information based on the source stage type. record:id()
stagesPath List of stages that processed the record in order, by stage name. record:path()
trackingId The route the record has taken through the flow, starting with the sourceId, then listing the stages that processed the record. n/a
previousTrackingId The tracking ID of the record before it entered the current stage. n/a
errorStage The stage that generated the error.

In error records only.

record:errorStage()

errorStageLabel The user-defined name for a stage.

In error records only.

record:errorStageLabel()
errorCode The error code.

In error records only.

record:errorCode()
errorJobId The ID of the job that started the flow. n/a
errorMessage The error message.

In error records only.

record:errorMessage()

errorTimestamp The time that the error occurred.

In error records only.

record:errorTime()

errorStackTrace The stack trace associated with the error.

In error records only.

n/a

Header attribute-generating stages

The following table lists the stages that generate record header attributes:
Stage Description
CDC-enabled sources Include the CRUD operation type in the sdc.operation.type header attribute and can include additional CRUD and CDC information in record header attributes. For more information, see CDC-Enabled Sources.
Sources that process Avro data and the Data Parser processor Include the Avro schema in an avroSchema record header attribute.
Sources that process Parquet data

Include the Parquet schema in a parquetSchema record header attribute, and include the index number of the element in a union the data is read from in an avro.union.typeIndex./id record header attribute unless the source is configured to skip union indexes.

Sources that process XML data Can include namespaces in an xmlns record header attribute when you enable field XPaths.
Stages that generate events Generate record header attributes for event records. For details about event record header attributes, see "Event Records" in the stage documentation.
Amazon S3 source Can be configured to include system-defined and user-defined object metadata in record header attributes.
Amazon SQS Consumer source Can be configured to include SQS message attributes in record header attributes.
Aurora PostgreSQL CDC Client source When tables have primary keys, includes information about the primary keys in record header attributes.
Azure Blob Storage source Can be configured to include system-defined, user-defined, and custom object metadata in record header attributes.
Azure Data Lake Storage Gen2 source Can be configured to include system-defined and custom object metadata in record header attributes.
Azure Data Lake Storage Gen2 (Legacy) source Includes information about the originating file for the record in record header attributes.
Directory source Includes information about the originating file for the record in record header attributes.
File Tail source Includes information about the originating file for the record in record header attributes.

Can be configured to use tag attributes for sets of files.

Google Pub/Sub Subscriber source When available, includes user-defined message attributes in record header attributes
Groovy Scripting source Can be configured to create record header attributes.
HTTP Client source Includes response header fields in record header attributes.
HTTP Server source Includes information about the requested URL and request header fields in record header attributes.

IBM Db2 source

Includes schema and table names in record header attributes.
JavaScript Scripting source Can be configured to create record header attributes.
JDBC Multitable Consumer source Includes table and data type information in JDBC record header attributes.
JDBC Query Consumer source Can be configured to include table and data type information in JDBC record header attributes.
Jython Scripting source Can be configured to create record header attributes.
Kafka Multitopic Consumer source Includes information about the sources of the record in record header attributes. Can also include Kafka message headers in record header attributes.
MQTT Subscriber source Includes information about the sources of the record in record header attributes.
MySQL Binary Log source When tables have primary keys, includes information about the primary keys in record header attributes.
Oracle CDC source When tables have primary keys, includes information about the primary keys in record header attributes.

Can include a range of additional data depending on how you configure the source.

Oracle CDC Client source When tables have primary keys, includes information about the primary keys in record header attributes.
Oracle Multitable Consumer source Includes table and data type information in JDBC header attributes.
PostgreSQL CDC Client source When tables have primary keys, includes information about the primary keys in record header attributes.
Pulsar Consumer (Legacy) source Includes information in the properties field of the message and the topic that the message was read from in record header attributes.
RabbitMQ Consumer source Includes RabbitMQ attributes in record header attributes.
REST Service source Includes information about the URL and request header fields in record header attributes.
Salesforce source Includes Salesforce information about the sources of the record in Salesforce header attributes.
Salesforce Bulk API 2.0 source Includes Salesforce information about the sources of the record in Salesforce header attribute.
SAP HANA Query Consumer source Can be configured to include table and data type information in JDBC header attributes.

Can be configured to include SAP HANA connection information in SAP HANA header attributes.

SFTP/FTP/FTPS Client source Includes information about the originating file for the record in record header attributes.
Snowflake Bulk source Includes information about the sources of the record in Snowflake header attributes.
Couchbase Lookup processor Includes information about the state of the looked-up document in a record header attribute.
Expression Evaluator processor Can be configured to create or update record header attributes.
Groovy Evaluator processor

Can be configured to create or update record header attributes.

HTTP Client processor Includes response header fields in record header attributes.

Generates record header attributes when passing an incoming record downstream based on per-status action or timeout properties.

JavaScript Evaluator processor Can be configured to create or update record header attributes.
Jython Evaluator processor Can be configured to create or update record header attributes.
Schema Generator processor Generates schemas and writes them to a user-defined record header attribute.

Record header attributes for record-based writes

Targets can use information in record header attributes to write data. Targets that write Avro data can use Avro schemas in the record header.

To use a record header attribute, configure the target to use the header attribute and ensure that the records include the header attribute.

You can use the Expression Evaluator or a scripting processor to add record header attributes.

You can use the following record header attributes in targets:
targetDirectory attribute in Azure Data Lake Storage targets and Local FS targets
The targetDirectory record header attribute defines the directory where the record is written. If the directory does not exist, the target creates the directory. The targetDirectory header attribute replaces the Directory Template property in the target.
When you use targetDirectory to provide the directory, the time basis configured for the target is used only for determining whether a record is late. Time basis is not used to determine the output directories to create or to write records to directories.
To use the targetDirectory header attribute, on the Output tab, select Directory in Header.
avroSchema attribute in targets that write Avro data
The avroSchema header attribute defines the Avro schema for the record. When you use this header attribute, you cannot define an Avro schema to use in the target.
To use the avroSchema header attribute, on the Data Format tab, select the Avro data format, and then for the Avro Schema Location property, select In Record Header.
parquetSchema attribution in targets that write Parquet data
The parquetSchema header attribute defines the Parquet schema for the record. When you use this header attribute, you cannot define a Parquet schema to use in the target.
To use the parquetSchema header attribute, on the Data Format tab, select the Parquet data format, and then for the Parquet Schema Location property, select In Record Header.
roll attribute in Azure Data Lake Storage targets and Local FS targets
The roll attribute, when present in the record header, triggers a roll of the file.
You can define the name of the roll header attribute. When you use an Expression Evaluator, use the name of the roll attribute that you defined in the processor.
To use a roll header attribute, on the Output tab, select Use Roll Attribute and define the name of the attribute.

Generating attributes for record-based writes

You can use an Expression Evaluator processor or any scripting processor to generate record header attributes for record-based writes. You can use the Expression Evaluator or a scripting processor to add record header attributes.

To use an Expression Evaluator processor or any scripting processor, you must generate record header attributes as expected by the target. Use the following guidelines to generate record header attributes:
Generating the target directory
When using an Expression Evaluator processor or any scripting processor to generate the target directory, note the following details:
  • The target expects the directory in a header attribute named "targetDirectory".
  • The target uses the directory exactly as written in the targetDirectory header attribute. Unlike directory templates, the directory specified in the targetDirectory attribute should not include any components that require evaluation, such as variables, or runtime properties.

  • When you define the expression that evaluates to a directory, you can use any valid component, including expressions that evaluate data in the record.

For example, you want to write records to different directories based on the Data Collector that runs the flow, and the region and store ID where the transaction took place. You can set up a runtime resource named DIR that defines the base for the directory and define DIR for each Data Collector that runs the flow. Then, you can use the following expression in the Expression Evaluator to define the targetDirectory attribute:

${runtime:conf('DIR')/transactions/${record.value('/region')}/${record.value('/storeID')}
Generating the Avro schema
When using an Expression Evaluator processor or any scripting processor to generate the Avro schema, note the following details:
  • The target expects the Avro schema in a header attribute named "avroSchema".
  • Use the standard Avro schema format, for example:
    {"type":"record","name":"table_name","namespace":"database_name",
    "fields":[{"name":"int_val","type":["null","int"],"default":null},
    {"name":"str_val","type":["null","string"],"default":null}]}
  • The database name and table name must be included in the Avro schema.
Tip: You might use an Avro schema generator to help generate the Avro schema.
Generating the roll attribute
When using an Expression Evaluator processor or any scripting processor to generate the roll attribute, note the following details:
  • Use any name for the attribute and specify the attribute name in the target.
  • Configure an expression that defines when to roll files.
To define these record header attributes in the Expression Evaluator, perform the following steps:
  1. On the Expressions tab of the Expression Evaluator, specify the Header Attribute name.

    To generate a target directory, use targetDirectory.

    To generate an Avro schema, use avroSchema.

    You can use any name for a roll indicator header attribute.

  2. For the Header Attribute Expression, define the expression that evaluates to the information you want the target to use.

For information about generating record header attributes with scripting processors, see the scripting processor documentation.