Pulsar Consumer (Legacy)
The Pulsar Consumer (Legacy) origin reads messages from one or more topics in an Apache Pulsar cluster. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
Data Collector also includes the Pulsar Consumer origin, which can use multiple threads to process data in parallel. Because the Pulsar Consumer origin can process different batches simultaneously, the origin can process messages faster.
The Pulsar Consumer (Legacy) origin subscribes to Pulsar topics, processes incoming messages, and then sends acknowledgements back to Pulsar as the messages are read.
When you configure a Pulsar Consumer (Legacy) origin, you define the URL to connect to Pulsar, and you configure the Pulsar security features to use when connecting. You can also use a connection to configure the origin.
You define the subscription name and consumer name to use for the origin and the topics to subscribe to. When the pipeline starts, Pulsar creates a consumer with the specified consumer name. If the subscription and topics do not exist, Pulsar also creates the subscription and topics.
You can configure the schema used to determine compatibility with a Pulsar topic. You can also configure advanced properties as needed, such as the type of subscription to create or the initial offset to begin reading from.
The Pulsar Consumer (Legacy) origin includes record header attributes that enable you to use information about the record in pipeline processing.
For more information about Pulsar topics, subscriptions, and consumers, see the Apache Pulsar documentation.
Topics Selector
The Pulsar Consumer (Legacy) origin can subscribe to a single topic or to multiple topics. In either case, the origin uses one thread for the read.
To define the topics that the origin subscribes to, configure the Topics Selector property on the Pulsar tab.
The origin provides the following methods of subscribing to topics:
- Single Topic
- Subscribes to a single topic. Use the following format to specify the topic
name:
{persistent|non-persistent}://<tenant>/<namespace>/<topic name>
- Topics List
- Subscribes to multiple topics defined in a list of topic names. Use the Add icon to add additional topic names. Define each topic name using the same format required for a single topic.
- Topics Pattern
- Subscribes to multiple topics defined by a naming pattern. Use the following
format to specify the
pattern:
{persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
For more information about defining topic names and about subscribing to multiple topics, see the Apache Pulsar documentation.
Parallel Reads of a Topic
The Pulsar Consumer (Legacy) origin can subscribe to a single topic or to multiple topics. In either case, the origin uses one thread to read from the topic or topics. To perform parallel reads from the same topic, you can configure multiple pipelines with a Pulsar Consumer origin that subscribe to the same topic.
To configure multiple origins to subscribe to the same topic, you'll need to determine whether the origins use a single shared subscription, multiple exclusive subscriptions, or multiple failover subscriptions. A shared subscription allows multiple consumers to attach to the same subscription. An exclusive subscription allows only a single consumer to attach to the subscription. A failover subscription allows multiple consumers to attach to the same subscription, but only one consumer receives messages at a time. If the initial primary consumer disconnects, messages are delivered to the next consumer.
For more information about the Pulsar subscription modes, see Subscription Modes in the Pulsar documentation.
You can configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same Pulsar topic in the following ways:
- Single shared subscription
- To configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same topic
using the same shared subscription, configure the following origin
properties:
- Subscription Name - On the Pulsar tab, configure each origin to use the same subscription name.
- Topic - On the Pulsar tab, configure each origin to use the same topic name.
- Subscription Type - On the Advanced tab, configure each origin to use the shared subscription type.
- Multiple exclusive subscriptions
- To configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same topic
using multiple exclusive subscriptions, configure the following origin
properties:
- Subscription Name - On the Pulsar tab, configure each origin to use a unique subscription name.
- Topic - On the Pulsar tab, configure each origin to use the same topic name.
- Subscription Type - On the Advanced tab, configure each origin to use the exclusive subscription type.
- Multiple failover subscriptions
- To configure multiple Pulsar Consumer (Legacy) origins to subscribe to the same topic
using multiple failover subscriptions, configure the following origin
properties:
- Subscription Name - On the Pulsar tab, configure each pair of origins to
use a unique subscription name.
For example, configure the origins in pipelineA and pipelineB to use subscription1. The origin in pipelineA serves as the primary consumer, and the origin in pipelineB is the next in line to receive messages if the pipelineA origin disconnects. Then, configure the origins in pipelineC and pipelineD to use subscription2.
- Topic - On the Pulsar tab, configure each origin to use the same topic name.
- Subscription Type - On the Advanced tab, configure each origin to use the failover subscription type.
- Subscription Name - On the Pulsar tab, configure each pair of origins to
use a unique subscription name.
Offset Management
The first time that a Pulsar Consumer (Legacy) origin receives messages from a topic, an offset entry is created for that subscription and topic. The offset entry is created and maintained by Pulsar.
- No stored offset
- When the subscription and topic combination does not have a previously stored offset, the Pulsar Consumer (Legacy) origin begins receiving messages based on the value of the initial offset defined on the Advanced tab of the origin.
- Previously stored offset
- When the subscription and topic combination has a previously stored offset, the Pulsar Consumer (Legacy) origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Record Header Attributes
- User-defined information in the properties field of the message
- The origin includes any user-defined information in the properties field of
the message - outside of the payload field - as record header attributes.
When generating the record header attribute, the origin uses the same names
as the user-defined key/value pairs in the properties field. For example, if
the Pulsar message includes an
environment
key/value pair in the properties field, the origin generates a record header attribute namedenvironment
. - pulsar.topic
- The origin includes the topic that the message was read from in the
pulsar.topic
record header attribute.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
Schema Properties
- Schema tab
- The schema specified on the Schema tab determines the compatibility between
the origin and a Pulsar topic.
If the origin reads from a topic in a Pulsar namespace configured to enforce schema validation, then you must specify a schema on the Schema tab. The origin passes the schema to Pulsar. Then Pulsar uses the schema to verify that the origin is compatible with a topic.
- Data Format tab
- The schema specified on the Data Format tab supports message
processing.
If you configure the origin to read messages in Avro format, then you can specify a schema on the Data Format tab. The origin uses the specified schema to process messages.
If you specify a schema on both tabs, then specify the same schema.
Security
Configure the Pulsar Consumer (Legacy) origin to use the security features available in the Pulsar cluster.
The origin supports the following security features:
- TLS transport encryption
- The Pulsar cluster encrypts all traffic between the Pulsar server and the stage. The Pulsar server provides a key and certificate, which the stage uses to verify the server's identity. For details, see the Pulsar documentation on TLS transport encryption.
- TLS authentication
- The stage provides keys and certificates, which the Pulsar server uses to verify the stage's identity. TLS authentication requires TLS transport encryption. For details, see the Pulsar documentation on TLS authentication.
- JWT authentication
- The stage provides Pulsar a JSON Web Token (JWT), which identifies the stage and grants permission for some actions. JWT authentication requires TLS transport encryption. For details, see the Pulsar documentation on JWT authentication.
- OAuth authentication
- The stage provides Pulsar an OAuth 2.0 access token, which identifies the stage and associates the stage with a role. For details, see the Pulsar documentation on OAuth authentication.
Enabling TLS Transport Encryption
Enable TLS transport encryption to encrypt all traffic between the Pulsar server and the stage.
Enabling TLS Authentication
Enable TLS authentication so that Pulsar can authenticate the stage with certificates.
- Enable TLS transport encryption.
- On the Security tab of the stage, select Enable Mutual Authentication.
-
Create the client certificate and client private key PEM files for the stage to
use.
For information about creating client certificates for Pulsar, see the Pulsar documentation.
- Store the client certificate and client private key PEM files created for the stage in the Data Collector resources directory, $SDC_RESOURCES.
- On the Security tab of the stage, enter the name of the client files in the Client Certificate PEM and Client Key PEM properties.
Enabling JWT Authentication
Enable JWT authentication so that Pulsar can authenticate the stage with a JSON Web Token (JWT).
- Enable TLS transport encryption.
- On the Security tab, select Use JWT.
- In the Token property, enter the token string that represents a signed JWT for the stage.
Enabling OAuth Authentication
Enable OAuth authentication so that Pulsar can authenticate the stage with an access token.
Data Formats
The Pulsar Consumer (Legacy) origin processes data differently based on the data format. Pulsar Consumer (Legacy) can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Pulsar Consumer (Legacy) Origin
Configure a Pulsar Consumer (Legacy) origin to read messages from Apache Pulsar.