Multithreaded flows

Multithreaded flow overview

A multithreaded flow is a Data Collector flow with a source that supports parallel execution, enabling one flow to run in multiple threads.

Multithreaded flows enable processing high volumes of data in a single flow on one Data Collector, thus taking full advantage of all available CPUs on the Data Collector machine. When using multithreaded flows, make sure to allocate sufficient resources to the flow and Data Collector.

A multithreaded flow honors the configured delivery guarantee for the flow, but does not guarantee the order in which batches of data are processed.

How it works

When you configure a multithreaded flow, you specify the number of threads that the source should use to generate batches of data. You can also configure the maximum number of flow runners that Data Collector uses to perform flow processing.

A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source.

Sources perform multithreaded processing based on the source systems they work with, but the following is true for all sources that generate multithreaded flows:

When you start the flow, the source creates a number of threads based on the multithreaded property configured in the source. And Data Collector creates a number of flow runners based on the flow Max Runners property to perform flow processing. Each thread connects to the source system, creates a batch of data, and passes the batch to an available flow runner.

Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.

Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.

For example, take the following multithreaded flow. The HTTP Server source processes HTTP POST and PUT requests passed from HTTP clients. When you configure the source, you specify the number of threads to use - in this case, the Max Concurrent Requests property:

HTTP Server source HTTP tab with the Max Concurrent Requests property highlighted

Let's say you configure the flow to opt out of the Max Runners property. When you do this, Data Collector generates a matching number of flow runners for the number of threads.

With Max Concurrent Requests set to 5, when you start the flow the source creates five threads and Data Collector creates five flow runners. Upon receiving data, the source passes a batch to each of the flow runners for processing.

Conceptually, the multithreaded flow looks like this:

Flow with an HTTP Server source passing data to five flow runners.

Each flow runner performs the processing associated with the rest of the flow. After a batch is written to flow targets - in this case, Azure Data Lake Store 1 and 2 - the flow runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independently from batches processed by other flow runners, so the write-order of the batches can differ from the read-order.

At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.

Sources for multithreaded flows

You can use the following sources to create multithreaded flows:

The sources use different properties and perform processing differently based on the source systems they work with. For details on how a source performs multithreaded processing, see "Multithreaded Processing" in the source documentation.

Processor caching

Since multithreaded flows use multiple flow runners to run multiple sourceless flow instances, processor caching in a multithreaded flow can differ from a flow that runs on a single thread.

Generally, when a processor caches data, each instance of the processor can only cache the data that passes through that particular flow runner. Be sure to consider this behavior when configuring multithreaded flows.

For example, if you configure a lookup processor to create a local cache, each instance of the lookup processor creates its own local cache. This should not be a problem since the cache is generally used to improve flow performance.

The exception is the Record Deduplicator processor. The Record Deduplicator caches records for comparison for up to a specified number of records or amount of time. When used in a multithreaded flow, the records in the cache are shared across flow runners.

Tuning threads and runners

To optimize flow performance and resource usage, you can tune the number of threads and flow runners that a multithreaded flow uses.
threads
Configure the maximum number of threads or concurrency in the source.
Before specifying a number of threads, consider how the source uses threads. All sources use threads to connect to the source system and create batches of data, but they can perform this task differently.
For example, the JDBC Multitable Consumer source uses one thread for each table, so there's no need to configure the source to use more threads than the number of tables being queried.
In contrast, the HTTP Server source listens at an HTTP endpoint. When you configure the number of threads to use, you should consider the maximum number of threads you might feasibly use in relation to the peak spikes and the number of available flow runners.
Note that idle threads consume few resources, so no harm can come from configuring more threads.
flow runners
Configure the maximum number of flow runners using the Max Runners flow property.
Flow runners consume resources even when idle. So when considering the number of runners to use, you should decide if you want to optimize for performance, resource usage, or both.
Flow runners process batches created by the source threads. The speed of processing might differ based on the complexity of the flow logic, batch size, etc.
So to determine the number of flow runners that you want to use, monitor the number of available runners when you run the flow. If you find that you have an abundance of available runners, you might reduce the number of runners that you allow. Conversely, if the flow runners are generally unavailable, increasing the number of flow runners can improve performance.

For example, say you have a flow with the Kinesis Consumer reading from 4 shards. In the source, you set the number of threads to 4. You also leave the flow Max Runners property with the default of 0, which creates a matching number of flow runners for the threads - in this case, 4. After you start a job for the flow and let it run for a while, you check back and find the following histogram in the Realtime Summary tab as you monitor the job:

Realtime Summary tab showing the Available Pipeline Runners Histogram

The histogram shows that the mean is 1.4, which means at any time, it's likely that there are 1.4 available runners.

If this is the peak load for the flow, this means you can reduce the number of flow runners used in the flow to 3 without sacrificing much performance. If Data Collector resources are needed elsewhere and you don't mind a minor hit to flow performance, you might reduce the number of flow runners to 2.

Resource usage

Since each flow runner performs all processing associated with the flow after the source, each thread in a multithreaded flow requires roughly the same resources as the same flow running on a single thread.

When working with multithreaded flows, you should monitor the Data Collector resource usage and increase the Data Collector Java heap size when appropriate.

Multithreaded flow summary

The following points attempt to summarize the key details about multithreaded flows:

  • Use multithreaded sources to create a multithreaded flow. You can use the following sources at this time:
  • Unlike a basic, single-threaded flow, a multithreaded flow cannot guarantee the order of data.

    Data within a batch is processed in order, but since batches are created quickly and passed to different threads, the order of batches can change as they are written to flow targets.

  • Processors that cache information generally have a separate cache for each instance of the flow. The exception is the Record Deduplicator, which can identify duplicate records across all flow runners.
  • To optimize performance and resource usage, check the Available Flow Runners histogram to see if flow runners are being used effectively.
  • We recommend monitoring the resource usage of the flow and the Data Collector heap usage, increasing them as needed.