Multithreaded flows
Multithreaded flow overview
A multithreaded flow is a Data Collector flow with a source that supports parallel execution, enabling one flow to run in multiple threads.
Multithreaded flows enable processing high volumes of data in a single flow on one Data Collector, thus taking full advantage of all available CPUs on the Data Collector machine. When using multithreaded flows, make sure to allocate sufficient resources to the flow and Data Collector.
A multithreaded flow honors the configured delivery guarantee for the flow, but does not guarantee the order in which batches of data are processed.
How it works
When you configure a multithreaded flow, you specify the number of threads that the source should use to generate batches of data. You can also configure the maximum number of flow runners that Data Collector uses to perform flow processing.
A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source.
Sources perform multithreaded processing based on the source systems they work with, but the following is true for all sources that generate multithreaded flows:
When you start the flow, the source creates a number of threads based on the multithreaded property configured in the source. And Data Collector creates a number of flow runners based on the flow Max Runners property to perform flow processing. Each thread connects to the source system, creates a batch of data, and passes the batch to an available flow runner.
Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
For example, take the following multithreaded flow. The HTTP Server source processes HTTP POST and PUT requests passed from HTTP clients. When you configure the source, you specify the number of threads to use - in this case, the Max Concurrent Requests property:

Let's say you configure the flow to opt out of the Max Runners property. When you do this, Data Collector generates a matching number of flow runners for the number of threads.
With Max Concurrent Requests set to 5, when you start the flow the source creates five threads and Data Collector creates five flow runners. Upon receiving data, the source passes a batch to each of the flow runners for processing.
Conceptually, the multithreaded flow looks like this:

Each flow runner performs the processing associated with the rest of the flow. After a batch is written to flow targets - in this case, Azure Data Lake Store 1 and 2 - the flow runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independently from batches processed by other flow runners, so the write-order of the batches can differ from the read-order.
At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.
Sources for multithreaded flows
- Amazon S3 - Reads objects stored in Amazon S3.
- Amazon SQS Consumer - Reads data from queues in Amazon Simple Queue Services (SQS).
- Azure Blob Storage - Reads data from Microsoft Azure Blob Storage.
- Azure Data Lake Storage Gen2 - Reads data from Microsoft Azure Data Lake Storage Gen2.
- Azure Data Lake Storage Gen2 (Legacy) - Reads data from Microsoft Azure Data Lake Storage Gen2.
- Azure IoT/Event Hub Consumer - Reads data from Microsoft Azure Event Hub.
- CoAP Server - Listens on a CoAP endpoint and processes the contents of all authorized CoAP requests.
- Couchbase - Reads data from Couchbase Server.
- Directory - Reads fully written files from a directory.
- Elasticsearch - Reads data from an Elasticsearch cluster.
- Google Pub/Sub Subscriber - Consumes messages from a Google Pub/Sub subscription.
- Groovy Scripting - Runs a Groovy script to create Data Collector records.
- HTTP Server - Listens on a HTTP endpoint and processes the contents of all authorized HTTP POST and PUT requests.
- IBM Db2 - Reads data from an IBM Db2 database.
- JavaScript Scripting - Runs a JavaScript script to create Data Collector records.
- JDBC Multitable Consumer - Reads database data from multiple tables through a JDBC connection.
- Jython Scripting - Runs a Jython script to create Data Collector records.
- Kafka Multitopic Consumer - Reads data from multiple topics in a Kafka cluster.
- Kinesis Consumer - Reads data from a Kinesis cluster.
- Oracle Bulkload - Reads data from multiple Oracle database tables, then stops the flow.
- Oracle Multitable Consumer - Reads data from multiple Oracle database tables.
- Pulsar Consumer - Reads messages from Apache Pulsar topics.
- REST Service - Listens on an HTTP endpoint, parses the contents of all authorized requests, and sends responses back to the originating REST API. Use as part of a microservices flow.
- Salesforce Bulk API 2.0 - Reads data from Salesforce using Salesforce Bulk API 2.0.
- Snowflake Bulk source - Reads data from multiple Snowflake tables, then stops the flow.
- TCP Server - Listens at the specified ports and processes incoming data over TCP/IP connections.
- UDP Multithreaded Source - Reads messages from one or more UDP ports.
- WebSocket Server - Listens on a WebSocket endpoint and processes the contents of all authorized WebSocket requests.
- Dev Data Generator - Generates random data for development and testing.
The sources use different properties and perform processing differently based on the source systems they work with. For details on how a source performs multithreaded processing, see "Multithreaded Processing" in the source documentation.
Processor caching
Since multithreaded flows use multiple flow runners to run multiple sourceless flow instances, processor caching in a multithreaded flow can differ from a flow that runs on a single thread.
Generally, when a processor caches data, each instance of the processor can only cache the data that passes through that particular flow runner. Be sure to consider this behavior when configuring multithreaded flows.
For example, if you configure a lookup processor to create a local cache, each instance of the lookup processor creates its own local cache. This should not be a problem since the cache is generally used to improve flow performance.
The exception is the Record Deduplicator processor. The Record Deduplicator caches records for comparison for up to a specified number of records or amount of time. When used in a multithreaded flow, the records in the cache are shared across flow runners.
Tuning threads and runners
- threads
- Configure the maximum number of threads or concurrency in the source.
- flow runners
- Configure the maximum number of flow runners using the Max Runners flow property.
For example, say you have a flow with the Kinesis Consumer reading from 4 shards. In the source, you set the number of threads to 4. You also leave the flow Max Runners property with the default of 0, which creates a matching number of flow runners for the threads - in this case, 4. After you start a job for the flow and let it run for a while, you check back and find the following histogram in the Realtime Summary tab as you monitor the job:

The histogram shows that the mean is 1.4, which means at any time, it's likely that there are 1.4 available runners.
If this is the peak load for the flow, this means you can reduce the number of flow runners used in the flow to 3 without sacrificing much performance. If Data Collector resources are needed elsewhere and you don't mind a minor hit to flow performance, you might reduce the number of flow runners to 2.
Resource usage
Since each flow runner performs all processing associated with the flow after the source, each thread in a multithreaded flow requires roughly the same resources as the same flow running on a single thread.
When working with multithreaded flows, you should monitor the Data Collector resource usage and increase the Data Collector Java heap size when appropriate.
Multithreaded flow summary
The following points attempt to summarize the key details about multithreaded flows:
- Use multithreaded sources to create a multithreaded flow. You can use the
following sources at this time:
- Amazon S3 - Reads objects stored in Amazon S3.
- Amazon SQS Consumer - Reads data from queues in Amazon Simple Queue Services (SQS).
- Azure Blob Storage - Reads data from Microsoft Azure Blob Storage.
- Azure Data Lake Storage Gen2 - Reads data from Microsoft Azure Data Lake Storage Gen2.
- Azure Data Lake Storage Gen2 (Legacy) - Reads data from Microsoft Azure Data Lake Storage Gen2.
- Azure IoT/Event Hub Consumer - Reads data from Microsoft Azure Event Hub.
- CoAP Server - Listens on a CoAP endpoint and processes the contents of all authorized CoAP requests.
- Couchbase - Reads data from Couchbase Server.
- Directory - Reads fully written files from a directory.
- Elasticsearch - Reads data from an Elasticsearch cluster.
- Google Pub/Sub Subscriber - Consumes messages from a Google Pub/Sub subscription.
- Groovy Scripting - Runs a Groovy script to create Data Collector records.
- HTTP Server - Listens on a HTTP endpoint and processes the contents of all authorized HTTP POST and PUT requests.
- IBM Db2 - Reads data from an IBM Db2 database.
- JavaScript Scripting - Runs a JavaScript script to create Data Collector records.
- JDBC Multitable Consumer - Reads database data from multiple tables through a JDBC connection.
- Jython Scripting - Runs a Jython script to create Data Collector records.
- Kafka Multitopic Consumer - Reads data from multiple topics in a Kafka cluster.
- Kinesis Consumer - Reads data from a Kinesis cluster.
- Oracle Bulkload - Reads data from multiple Oracle database tables, then stops the flow.
- Oracle Multitable Consumer - Reads data from multiple Oracle database tables.
- Pulsar Consumer - Reads messages from Apache Pulsar topics.
- REST Service - Listens on an HTTP endpoint, parses the contents of all authorized requests, and sends responses back to the originating REST API. Use as part of a microservices flow.
- Salesforce Bulk API 2.0 - Reads data from Salesforce using Salesforce Bulk API 2.0.
- Snowflake Bulk source - Reads data from multiple Snowflake tables, then stops the flow.
- TCP Server - Listens at the specified ports and processes incoming data over TCP/IP connections.
- UDP Multithreaded Source - Reads messages from one or more UDP ports.
- WebSocket Server - Listens on a WebSocket endpoint and processes the contents of all authorized WebSocket requests.
- Dev Data Generator - Generates random data for development and testing.
- Unlike a basic, single-threaded flow, a multithreaded flow cannot
guarantee the order of data.
Data within a batch is processed in order, but since batches are created quickly and passed to different threads, the order of batches can change as they are written to flow targets.
- Processors that cache information generally have a separate cache for each instance of the flow. The exception is the Record Deduplicator, which can identify duplicate records across all flow runners.
- To optimize performance and resource usage, check the Available Flow Runners histogram to see if flow runners are being used effectively.
- We recommend monitoring the resource usage of the flow and the Data Collector heap usage, increasing them as needed.