Pulsar Consumer
The Pulsar Consumer source reads messages from one or more topics in an Apache Pulsar cluster. The source can use multiple threads to process data in parallel. For information about supported versions, see Supported systems and versions.
The Pulsar Consumer source subscribes to Pulsar topics, processes incoming messages, and then sends acknowledgements back to Pulsar as the messages are read.
When you configure a Pulsar Consumer source, you define the URL to connect to Pulsar, and you configure the Pulsar security features to use when connecting. You can also use a connection to configure the source.
You define the subscription name and consumer name to use for the source and the topics to subscribe to. When the flow starts, Pulsar either creates a single consumer with the specified consumer name or creates one consumer for each thread using the specified consumer name with a sequential integer appended. If the subscription and topics do not exist, Pulsar also creates the subscription and topics.
You can configure the schema used to determine compatibility with a Pulsar topic. You can also configure advanced properties as needed, such as the type of subscription to create or the initial offset to begin reading from.
The Pulsar Consumer source includes record header attributes that enable you to use information about the record in flow processing.
For more information about Pulsar topics, subscriptions, and consumers, see the Apache Pulsar documentation.
Topics selector
The Pulsar Consumer source can subscribe to a single topic or to multiple topics.
To define the topics that the source subscribes to, configure the Topics Selector property on the Pulsar tab.
The source provides the following methods of subscribing to topics:
- Single Topic
- Subscribes to a single topic. Use the following format to specify the topic
name:
{persistent|non-persistent}://<tenant>/<namespace>/<topic name> - Topics List
- Subscribes to multiple topics defined in a list of topic names. Use the Add icon to add additional topic names. Define each topic name using the same format required for a single topic.
- Topics Pattern
- Subscribes to multiple topics defined by a naming pattern. Use the following
format to specify the
pattern:
{persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
Multithreaded processing
The Pulsar Consumer source can create a multithreaded flow to process data in parallel. The Pulsar Consumer source creates the number of threads indicated by the Number of Threads property on the Pulsar tab.
The subscription type, set on the Advanced tab, determines the relationship between consumers and threads. For a shared subscription or a failover subscription, the source creates a consumer for each thread. For an exclusive subscription, the source creates one consumer for all the threads. Each thread processes a batch of messages, passing its batch to an available flow runner.
A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source. Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
For example, say you set the Number of Threads property to 5. When you start the flow, the source creates five threads, and Data Collector creates a matching number of flow runners. Upon receiving data, the source passes a batch to each of the flow runners for processing.
At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded flows, see Multithreaded flow overview.
Offset management
The first time that a Pulsar Consumer source receives messages from a topic, an offset entry is created for that subscription and topic. The offset entry is created and maintained by Pulsar.
- No stored offset
- When the subscription and topic combination does not have a previously stored offset, the Pulsar Consumer source begins receiving messages based on the value of the initial offset defined on the Advanced tab of the source.
- Previously stored offset
- When the subscription and topic combination has a previously stored offset, the Pulsar Consumer source receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the flow, processing resumes from the last committed offset.
Record header attributes
- User-defined information in the properties field of the message
- The source includes any user-defined information in the properties field of the
message - outside of the payload field - as record header attributes. When generating the record
header attribute, the source uses the same names as the user-defined key/value pairs
in the properties field. For example, if the Pulsar message includes an
environmentkey/value pair in the properties field, the source generates a record header attribute namedenvironment. - pulsar.topic
- The source includes the topic that the message was read from in the
pulsar.topicrecord header attribute.
You can use the record:attribute or
record:attributeOrDefault functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with header attributes.
Schema properties
- Schema tab
- The schema specified on the Schema tab determines the compatibility between the source and a Pulsar topic.
If the source reads from a topic in a Pulsar namespace configured to enforce schema validation, then you must specify a schema on the Schema tab. The source passes the schema to Pulsar. Then Pulsar uses the schema to verify that the source is compatible with a topic.
- Data Format tab
- The schema specified on the Data Format tab supports message processing.
If you configure the source to read messages in Avro format, then you can specify a schema on the Data Format tab. The source uses the specified schema to process messages.
If you specify a schema on both tabs, then specify the same schema.
Security
Configure the Pulsar Consumer source to use the security features available in the Pulsar cluster.
- TLS transport encryption
- The Pulsar cluster encrypts all traffic between the Pulsar server and the stage. The Pulsar server provides a key and certificate, which the stage uses to verify the server's identity.
- TLS authentication
- The stage provides keys and certificates, which the Pulsar server uses to verify the stage's identity. TLS authentication requires TLS transport encryption.
- JWT authentication
- The stage provides Pulsar a JSON Web Token (JWT), which identifies the stage and grants permission for some actions. JWT authentication requires TLS transport encryption.
- OAuth authentication
- The stage provides Pulsar an OAuth 2.0 access token, which identifies the stage and associates the stage with a role.
Enabling TLS transport encryption
Enable TLS transport encryption to encrypt all traffic between the Pulsar server and the stage.
Procedure
Enabling TLS authentication
Enable TLS authentication so that Pulsar can authenticate the stage with certificates.
Procedure
- Enable TLS transport encryption.
- On the Security tab of the stage, select Enable Mutual Authentication.
- Create the client certificate and client private key PEM files for the stage to use.
- Store the client certificate and client private key PEM files created for the stage in the Data Collector resources directory, $SDC_RESOURCES.
- On the Security tab of the stage, enter the name of the client files in the Client Certificate PEM and Client Key PEM properties.
Enabling JWT authentication
Enable JWT authentication so that Pulsar can authenticate the stage with a JSON Web Token (JWT).
Before you begin
Procedure
- Enable TLS transport encryption.
- On the Security tab, select Use JWT.
- In the Token property, enter the token string that represents a signed JWT for the stage.
Enabling OAuth authentication
Enable OAuth authentication so that Pulsar can authenticate the stage with an access token.
Procedure
Data formats
The Pulsar Consumer source processes data differently based on the data format. Pulsar Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precisionandscalefield attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The source can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the source assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector flow using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the source treats the XML file as a single record.
Configuring a Pulsar Consumer source
About this task
Configure a Pulsar Consumer source to read messages from Apache Pulsar.