Kinesis Consumer
The Kinesis Consumer source reads data from Amazon Kinesis Streams, Amazon DynamoDB, and Amazon CloudWatch. For information about supported versions, see Supported systems and versions.
When you configure the Kinesis Consumer source, you specify the region to use and the format of the data. You can configure the source to use a VPC interface endpoint.
You can configure additional Kinesis configuration properties that you require.
The Kinesis Consumer source can use multiple threads to enable parallel processing of data from the cluster.
The Kinesis Consumer source stores offsets in DynamoDB lease tables. You can optionally define tags to apply to a lease table created by the source. When necessary, you can reset the source for Kinesis Consumer flows, but be sure to review the requirements and guidelines before doing so.
Multithreaded processing
The Kinesis Consumer source can perform parallel processing and enables the creation of a multithreaded flow. The source uses multiple concurrent threads based on the Max Threads property.
When you start the flow, each thread connects to the source system, creates a batch of data, and passes the batch to an available flow runner. A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source.
Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
For example, say you set the Max Threads property to 5. When you start the flow, the source creates five threads, and Data Collector creates a matching number of flow runners. Upon receiving data, the source passes a batch to each of the flow runners for processing.
Each flow runner performs the processing associated with the rest of the flow. After a batch is written to flow targets, the flow runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other flow runners, so batches may be written differently from the read order.
At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded flows, see Multithreaded flow overview.
Authentication method
You can configure the Kinesis Consumer source to authenticate with Amazon Web Services (AWS) using an instance profile, environment variables, credential process profile, or AWS access keys.
For more information about the authentication methods and details on how to configure each method, see Security in Amazon stages.
Additional Kinesis properties
You can add additional custom Kinesis configuration properties to the Kinesis Consumer source.
When you add a Kinesis configuration property, enter the exact property name and the value. The source does not validate the property names or values.
You can add the following custom Kinesis configuration properties to the source:
- billingMode
- cleanupLeasesUponShardCompletion
- clientExecutionTimeout
- connectionMaxIdleMillis
- failoverTimeMillis
- listShardsBackoffTimeInMillis
- logWarningForTaskAfterMillis
- maxConnections
- maxConsecutiveRetriesBeforeThrottling
- maxGetRecordsThreadPool
- maxLeaseRenewalThreads
- maxListShardsRetryAttempts
- metricsBufferTimeMillis
- metricsMaxQueueSize
- requestTimeout
- retryGetRecordsInSeconds
- retryMode
- shutdownGraceMillis
- taskBackoffTimeMillis
- throttleRetries
- timeoutInSeconds
- useExpectContinue
- userAgentPrefix
- userAgentSuffix
- validateAfterInactivityMillis
- validateSequenceNumberBeforeCheckpointing
For a description of each property, see the Amazon Kinesis Streams documentation.
Read interval
You can configure the read interval for the Kinesis Consumer. The read interval determines how long Kinesis Consumer waits before requesting additional data from Kinesis shards. By default, the Kinesis Consumer waits one second between requests.
Adjust the read interval to adapt to the Kinesis data load. For example, you might reduce the wait time to enable higher throughput or increase the wait time when the stream of data slows.
Amazon recommends 1 second read intervals. The minimum read interval for the Kinesis Consumer is 200 milliseconds which enables polling five times a second. To increase total capacity, you can create additional shards for the cluster.
Lease table tags
The Kinesis Consumer source stores offsets - the location where the source stops reading - in DynamoDB lease tables. You can optionally define tags to apply to a lease table created by the source. The source cannot add tags to existing lease tables.
Tags allow you to categorize DynamoDB resources by purpose, owner, environment, or any other categorization.
Each Kinesis application uses one DynamoDB lease table. If you have multiple Kinesis Consumer sources that use the same application, then all the sources use the same DynamoDB lease table. The source creates a DynamoDB lease table for the application if one doesn't already exist.
The source assigns the configured tags only when it creates the lease table. If a lease table already exists for the application, the source uses the existing table and does not assign any configured tags to the existing table.
Define tags on the Lease Table tab when you configure the source. To add tags, click Add and define the tag name and value. DynamoDB tag names and values are case-sensitive.
For more information about tagging for DynamoDB, see the Amazon Web Services documentation.
Resetting the Kinesis Consumer source
You can reset the Kinesis Consumer source to process all available data instead of continuing from the last-saved offset. For general information about resetting the source, see Managing job offsets.
When you reset the source for a Kinesis Consumer flow, Data Collector resets the source for every Kinesis Consumer flow that uses the same application name.
- Requirement: Delete table permission on DynamoDB
- Because Kinesis Streams stores offsets in DynamoDB, the IAM user or instance
profile associated with the flow must have delete table permissions for
tables matching the application name used in your flows, as follows:
- If you use access keys to authenticate with Amazon Web Services, the IAM user associated with the access keys must have delete table permission.
- If Data Collector uses instance profiles to authenticate, the instance profile for the EC2 instance must have delete permission.
- Guidelines for resetting the source
- To reset the source for a Kinesis Consumer flow, perform the following
steps:
- Stop the flow that you want to reset.
If other flows use the same application name, stop all of the flows.
- Reset the source: In the flow canvas
or from the Home page, click the More icon
, then click Reset
Origin.When working with multiple flows that use the same application name, you can use any of the flows. You only need to reset one of them.
- Start the flow with the reset source.
- If you stopped any related flows, restart those flows.
- Stop the flow that you want to reset.
For example, say you have three flows that read from Kinesis Streams using the Sales application name: NorthSales, CentralSales, and SouthSales. To reset the source for all flows, stop all three flows, reset the source for one of the flows, say SouthSales. Then restart the SouthSales flow, and then you can restart the other flows.
Data formats
The Kinesis Consumer source processes data differently based on the data format. Kinesis Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precisionandscalefield attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the source assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector flow using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the source treats the XML file as a single record.
Reading from DynamoDB or CloudWatch
About this task
To read from DynamoDB or CloudWatch, you must use a VPC interface endpoint to connect to Kinesis. Then, you specify DynamoDB or CloudWatch regions or VPC interface endpoints to connect.
Use the following steps to specify the connection details, then configure the rest of the source as appropriate.
Procedure
Configuring a Kinesis Consumer source
About this task
Configure a Kinesis Consumer source to read data from Amazon Kinesis Streams.