Pulsar Consumer
The Pulsar Consumer origin reads messages from one or more topics in an Apache Pulsar cluster. The origin can use multiple threads to process data in parallel. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
The Pulsar Consumer origin subscribes to Pulsar topics, processes incoming messages, and then sends acknowledgements back to Pulsar as the messages are read.
When you configure a Pulsar Consumer origin, you define the URL to connect to Pulsar, and you configure the Pulsar security features to use when connecting. You can also use a connection to configure the origin.
You define the subscription name and consumer name to use for the origin and the topics to subscribe to. When the pipeline starts, Pulsar either creates a single consumer with the specified consumer name or creates one consumer for each thread using the specified consumer name with a sequential integer appended. If the subscription and topics do not exist, Pulsar also creates the subscription and topics.
You can configure the schema used to determine compatibility with a Pulsar topic. You can also configure advanced properties as needed, such as the type of subscription to create or the initial offset to begin reading from.
The Pulsar Consumer origin includes record header attributes that enable you to use information about the record in pipeline processing.
For more information about Pulsar topics, subscriptions, and consumers, see the Apache Pulsar documentation.
Topics Selector
The Pulsar Consumer origin can subscribe to a single topic or to multiple topics.
To define the topics that the origin subscribes to, configure the Topics Selector property on the Pulsar tab.
The origin provides the following methods of subscribing to topics:
- Single Topic
- Subscribes to a single topic. Use the following format to specify the topic
name:
{persistent|non-persistent}://<tenant>/<namespace>/<topic name>
- Topics List
- Subscribes to multiple topics defined in a list of topic names. Use the Add icon to add additional topic names. Define each topic name using the same format required for a single topic.
- Topics Pattern
- Subscribes to multiple topics defined by a naming pattern. Use the following
format to specify the
pattern:
{persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
For more information about defining topic names and about subscribing to multiple topics, see the Apache Pulsar documentation.
Multithreaded Processing
The Pulsar Consumer origin can create a multithreaded pipeline to process data in parallel. The Pulsar Consumer origin creates the number of threads indicated by the Number of Threads property on the Pulsar tab.
The subscription type, set on the Advanced tab, determines the relationship between consumers and threads. For a shared subscription or a failover subscription, the origin creates a consumer for each thread. For an exclusive subscription, the origin creates one consumer for all the threads. Each thread processes a batch of messages, passing its batch to an available pipeline runner.
A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Offset Management
The first time that a Pulsar Consumer origin receives messages from a topic, an offset entry is created for that subscription and topic. The offset entry is created and maintained by Pulsar.
- No stored offset
- When the subscription and topic combination does not have a previously stored offset, the Pulsar Consumer origin begins receiving messages based on the value of the initial offset defined on the Advanced tab of the origin.
- Previously stored offset
- When the subscription and topic combination has a previously stored offset, the Pulsar Consumer origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Record Header Attributes
- User-defined information in the properties field of the message
- The origin includes any user-defined information in the properties field of
the message - outside of the payload field - as record header attributes.
When generating the record header attribute, the origin uses the same names
as the user-defined key/value pairs in the properties field. For example, if
the Pulsar message includes an
environment
key/value pair in the properties field, the origin generates a record header attribute namedenvironment
. - pulsar.topic
- The origin includes the topic that the message was read from in the
pulsar.topic
record header attribute.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
Schema Properties
- Schema tab
- The schema specified on the Schema tab determines the compatibility between
the origin and a Pulsar topic.
If the origin reads from a topic in a Pulsar namespace configured to enforce schema validation, then you must specify a schema on the Schema tab. The origin passes the schema to Pulsar. Then Pulsar uses the schema to verify that the origin is compatible with a topic.
- Data Format tab
- The schema specified on the Data Format tab supports message
processing.
If you configure the origin to read messages in Avro format, then you can specify a schema on the Data Format tab. The origin uses the specified schema to process messages.
If you specify a schema on both tabs, then specify the same schema.
Security
Configure the Pulsar Consumer origin to use the security features available in the Pulsar cluster.
- TLS transport encryption
- The Pulsar cluster encrypts all traffic between the Pulsar server and the stage. The Pulsar server provides a key and certificate, which the stage uses to verify the server's identity. For details, see the Pulsar documentation on TLS transport encryption.
- TLS authentication
- The stage provides keys and certificates, which the Pulsar server uses to verify the stage's identity. TLS authentication requires TLS transport encryption. For details, see the Pulsar documentation on TLS authentication.
- JWT authentication
- The stage provides Pulsar a JSON Web Token (JWT), which identifies the stage and grants permission for some actions. JWT authentication requires TLS transport encryption. For details, see the Pulsar documentation on JWT authentication.
- OAuth authentication
- The stage provides Pulsar an OAuth 2.0 access token, which identifies the stage and associates the stage with a role. For details, see the Pulsar documentation on OAuth authentication.
Enabling TLS Transport Encryption
Enable TLS transport encryption to encrypt all traffic between the Pulsar server and the stage.
Enabling TLS Authentication
Enable TLS authentication so that Pulsar can authenticate the stage with certificates.
- Enable TLS transport encryption.
- On the Security tab of the stage, select Enable Mutual Authentication.
-
Create the client certificate and client private key PEM files for the stage to
use.
For information about creating client certificates for Pulsar, see the Pulsar documentation.
- Store the client certificate and client private key PEM files created for the stage in the Data Collector resources directory, $SDC_RESOURCES.
- On the Security tab of the stage, enter the name of the client files in the Client Certificate PEM and Client Key PEM properties.
Enabling JWT Authentication
Enable JWT authentication so that Pulsar can authenticate the stage with a JSON Web Token (JWT).
- Enable TLS transport encryption.
- On the Security tab, select Use JWT.
- In the Token property, enter the token string that represents a signed JWT for the stage.
Enabling OAuth Authentication
Enable OAuth authentication so that Pulsar can authenticate the stage with an access token.
Data Formats
The Pulsar Consumer origin processes data differently based on the data format. Pulsar Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Pulsar Consumer Origin
Configure a Pulsar Consumer origin to read messages from Apache Pulsar.