Kinesis Consumer
The Kinesis Consumer origin reads data from Amazon Kinesis Streams, Amazon DynamoDB, and Amazon CloudWatch. For information about supported versions, see Supported Systems and Versions.
When you configure the Kinesis Consumer origin, you specify the region to use and the format of the data. You can configure the origin to use a VPC interface endpoint. You can also use a connection to configure the origin.
You can configure additional Kinesis configuration properties that you require.
The Kinesis Consumer origin can use multiple threads to enable parallel processing of data from the cluster.
The Kinesis Consumer origin stores offsets in DynamoDB lease tables. You can optionally define tags to apply to a lease table created by the origin. When necessary, you can reset the origin for Kinesis Consumer pipelines, but be sure to review the requirements and guidelines before doing so.
Multithreaded Processing
The Kinesis Consumer origin can perform parallel processing and enables the creation of a multithreaded pipeline. The origin uses multiple concurrent threads based on the Max Threads property.
When you start the pipeline, each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin.
Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you set the Max Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read order.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Authentication Method
You can configure the Kinesis Consumer origin to authenticate with Amazon Web Services (AWS) using an instance profile or AWS access keys.
For more information about the authentication methods and details on how to configure each method, see Security in Amazon Stages.
Additional Kinesis Properties
You can add additional custom Kinesis configuration properties to the Kinesis Consumer origin.
When you add a Kinesis configuration property, enter the exact property name and the value. The origin does not validate the property names or values.
You can add the following custom Kinesis configuration properties to the origin:
- billingMode
- cleanupLeasesUponShardCompletion
- clientExecutionTimeout
- connectionMaxIdleMillis
- failoverTimeMillis
- listShardsBackoffTimeInMillis
- logWarningForTaskAfterMillis
- maxConnections
- maxConsecutiveRetriesBeforeThrottling
- maxGetRecordsThreadPool
- maxLeaseRenewalThreads
- maxListShardsRetryAttempts
- metricsBufferTimeMillis
- metricsMaxQueueSize
- requestTimeout
- retryGetRecordsInSeconds
- retryMode
- shutdownGraceMillis
- taskBackoffTimeMillis
- throttleRetries
- timeoutInSeconds
- useExpectContinue
- userAgentPrefix
- userAgentSuffix
- validateAfterInactivityMillis
- validateSequenceNumberBeforeCheckpointing
For a description of each property, see the Amazon Kinesis Streams documentation.
Read Interval
You can configure the read interval for the Kinesis Consumer. The read interval determines how long Kinesis Consumer waits before requesting additional data from Kinesis shards. By default, the Kinesis Consumer waits one second between requests.
Adjust the read interval to adapt to the Kinesis data load. For example, you might reduce the wait time to enable higher throughput or increase the wait time when the stream of data slows.
Amazon recommends 1 second read intervals. The minimum read interval for the Kinesis Consumer is 200 milliseconds which enables polling five times a second. To increase total capacity, you can create additional shards for the cluster.
Lease Table Tags
The Kinesis Consumer origin stores offsets - the location where the origin stops reading - in DynamoDB lease tables. You can optionally define tags to apply to a lease table created by the origin. The origin cannot add tags to existing lease tables.
Tags allow you to categorize DynamoDB resources by purpose, owner, environment, or any other categorization.
Each Kinesis application uses one DynamoDB lease table. If you have multiple Kinesis Consumer origins that use the same application, then all the origins use the same DynamoDB lease table. The origin creates a DynamoDB lease table for the application if one doesn't already exist.
The origin assigns the configured tags only when it creates the lease table. If a lease table already exists for the application, the origin uses the existing table and does not assign any configured tags to the existing table.
Define tags on the Lease Table tab when you configure the origin. To add tags, click Add and define the tag name and value. DynamoDB tag names and values are case-sensitive.
For more information about tagging for DynamoDB, see the Amazon Web Services documentation.
Resetting the Kinesis Consumer Origin
You can reset the Kinesis Consumer origin to process all available data instead of continuing from the last-saved offset. For general information about resetting the origin, see Resetting the Origin for Jobs.
When you reset the origin for a Kinesis Consumer pipeline, Data Collector resets the origin for every Kinesis Consumer pipeline that uses the same application name.
- Requirement: Delete table permission on DynamoDB
- Because Kinesis Streams stores offsets in DynamoDB, the IAM user or instance
profile associated with the pipeline must have delete table permissions for
tables matching the application name used in your pipelines, as follows:
- If you use access keys to authenticate with Amazon Web Services, the IAM user associated with the access keys must have delete table permission.
- If Data Collector uses instance profiles to authenticate, the instance profile for the EC2 instance must have delete permission.
- Guidelines for resetting the origin
- To reset the origin for a Kinesis Consumer pipeline, perform the following
steps:
- Stop the pipeline that you want to reset.
If other pipelines use the same application name, stop all of the pipelines.
- Reset the origin: In the pipeline canvas or from the Home page,
click the More icon
, then click Reset Origin.
When working with multiple pipelines that use the same application name, you can use any of the pipelines. You only need to reset one of them.
- Start the pipeline with the reset origin.
- If you stopped any related pipelines, restart those pipelines.
- Stop the pipeline that you want to reset.
For example, say you have three pipelines that read from Kinesis Streams using the Sales application name: NorthSales, CentralSales, and SouthSales. To reset the origin for all pipelines, stop all three pipelines, reset the origin for one of the pipelines, say SouthSales. Then restart the SouthSales pipeline, and then you can restart the other pipelines.
Data Formats
The Kinesis Consumer origin processes data differently based on the data format. Kinesis Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Reading from DynamoDB or CloudWatch
To read from DynamoDB or CloudWatch, you must use a VPC interface endpoint to connect to Kinesis. Then, you specify DynamoDB or CloudWatch regions or VPC interface endpoints to connect.
Use the following steps to specify the connection details, then configure the rest of the origin as appropriate.
Configuring a Kinesis Consumer Origin
Configure a Kinesis Consumer origin to read data from Amazon Kinesis Streams.