Amazon SQS Consumer
Use the Amazon SQS Consumer origin to read data from queues in Amazon Simple Queue Services (SQS). The origin can use multiple threads to enable parallel processing of data. To read data from Amazon S3, use the Amazon S3 origin. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
When you configure the Amazon SQS Consumer origin, you define the region and the set of queue name prefixes to use. These properties determine the objects that the origin processes. You configure the authentication method that the origin uses to connect to Amazon Simple Queue Services.
You can optionally include Amazon SQS message attributes and sender attributes in records as record header attributes.
You can also use a connection to configure the origin.
Authentication Method
You can configure the Amazon SQS Consumer origin to authenticate with Amazon Web Services (AWS) using an instance profile or AWS access keys. When accessing a public bucket, you can connect anonymously using no authentication.
For more information about the authentication methods and details on how to configure each method, see Security in Amazon Stages.
Queue Name Prefix
The Amazon SQS Consumer origin uses the queue name prefix to determine the queues to process. You can define multiple queue name prefixes.
When you specify the queue name prefix, enter a string that represents the beginning of the queue names that you want to use. The origin processes data from every queue with a matching prefix. You cannot use wildcards within the queue name prefix.
sales-eu-france
sales-eu-germany
sales-us
sales-egypt
If you use "sales" as the prefix, the origin processes messages from all of the queues.
If you use "sales-eu" as the prefix, the origin processes only sales-eu-france and sales-eu-germany.
If you use "sales-e" as the prefix, the origin processes all queues except for sales-us.
Multithreaded Processing
The Amazon SQS Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline. The Amazon SQS Consumer origin uses multiple concurrent threads based on the Max Threads property.
When performing multithreaded processing, the Amazon SQS Consumer origin determines the number of queues to process and creates the specified number of threads. When there are more queues than threads, the queues are divided up and assigned to different threads. Each thread processes data from a specific set of queues and cycles round-robin through the set of queues.
When a thread requests data from a queue, the queue returns messages based on the configured Number of Messages per Request property. The thread creates a batch of data and passes the batch to an available pipeline runner. After processing the batch, the thread continues to the next assigned queue.
A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But the order that batches are written to destinations is not ensured.
For example, say you set the Max Threads property to 5 and the origin is configured to process 20 queues. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Each thread is assigned 4 queues to process. Each thread cycles through the queues, creating one batch of data at a time and passing it to a pipeline runner for processing.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Including SQS Message Attributes
- Basic - Includes some basic SQS attributes.
- All Attributes - Includes all standard SQS attributes.
SQS attributes are added to record header attributes using the following naming
convention: sqs.<SQS attribute name>
.
SQS Attribute Level | Description |
---|---|
Basic | Includes the following standard SQS attributes:
|
All Attributes | Includes the standard SQS attributes listed above and the
following additional SQS attributes:
|
For more information about SQS message attributes, see the Amazon SQS documentation.
Including Sender Attributes
In addition to SQS message attributes, you can include sender attributes in record headers. Sender attributes are custom attributes included in messages by the message sender.
- Set the SQS Message Attribute Level property to All Attributes.
- Configure the SQS Sender Attribute property, adding the name of each attribute that you want to include in the record.
SQS attributes are added to record header attributes using the following naming
convention: sqs.messageAttr.<sender attribute name>
.
For example, a senderId
attribute appears in the record header as
sqs.messageAttr.senderId
.
Data Formats
The Amazon SQS Consumer origin processes data differently based on the data format. The Amazon SQS Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring an Amazon SQS Consumer
Configure an Amazon SQS Consumer origin to read messages from Amazon SQS.