Google Pub/Sub Subscriber
The Google Pub/Sub Subscriber source consumes messages from a Google Pub/Sub subscription. For information about supported versions, see Supported systems and versions.
When you configure the source, you define the Google Pub/Sub subscription ID to receive messages from. You define the project ID and credentials to use when connecting to Google Pub/Sub. You can also use a connection to configure the source.
The Google Pub/Sub Subscriber source can use multiple threads to enable parallel processing of data from a Google Pub/Sub subscription.
When available, the Google Pub/Sub Subscriber source includes user-defined message attributes in record header attributes.
Credentials
When the Google Pub/Sub Subscriber source consumes messages from a Google Pub/Sub subscription, the source must pass credentials to Google Pub/Sub.
- Google Cloud default credentials
- Credentials in a file
- Credentials in a stage property
For details on how to configure each option, see Security in Google Cloud stages.
Multithreaded processing
The Google Pub/Sub Subscriber source can perform parallel processing and enables the creation of a multithreaded flow. The source uses multiple concurrent threads based on the Num Flow Runners property.
When you start the flow, each thread connects to the source system, creates a batch of data, and passes the batch to an available flow runner. A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source.
Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
For example, say you set the Num Flow Runners property to 5. When you start the flow, the source creates five threads, and Data Collector creates a matching number of flow runners. Upon receiving data, the source passes a batch to each of the flow runners for processing.
Each flow runner performs the processing associated with the rest of the flow. After a batch is written to flow targets, the flow runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other flow runners, so batches may be written differently from the read order.
At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded flows, see Multithreaded flow overview.
Record header attributes
The Google Pub/Sub Subscriber source includes user-defined message attributes in record
header attributes when they are available. When the source processes Avro data, it includes the Avro schema in
an avroSchema record header attribute.
A Google Pub/Sub message contains a payload and optional attributes that describe the payload content. If the Google Pub/Sub Subscriber source consumes a message with optional attributes, the source includes the message attributes in record header attributes.
You can use the record:attribute or
record:attributeOrDefault functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with header attributes.
Data formats
The Google Pub/Sub Subscriber source processes data differently based on the data format. Google Pub/Sub Subscriber can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precisionandscalefield attribute for each Decimal field. - The stage includes the Avro schema in an
avroSchemarecord header attribute. You can use one of the following methods to specify the location of the Avro schema definition:- Message/Data Includes Schema - Use the schema in the message.
- In Flow Configuration - Use the schema that you provide in the stage configuration.
- Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the stage to look up the schema in Confluent Schema Registry by the schema ID embedded in the message or by the schema ID or subject specified in the stage configuration.
- Using a schema in the stage configuration or retrieving a schema from Confluent Schema Registry overrides any schema that might be included in the message and can improve performance.
- Binary
- Generates a record with a single byte array field at the root of the record.
- When the data exceeds the user-defined maximum data size, the source cannot process the data. Because the record is not created, the source cannot pass the record to the flow to be written as an error record. Instead, the source generates a stage error.
- Delimited
- Generates a record for each delimited line.
- The CSV parser that you choose determines the delimiter properties that you configure and how the stage handles parsing errors. You can specify if the data includes a header line and whether to use it. You can define the number of lines to skip before reading, the character set of the data, and the root field type to use for the generated record.
- You can also configure the stage to replace a string constant with null values and to ignore control characters.
- For more information about reading delimited data, see Reading delimited data.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- When an object exceeds the maximum object length defined for the source, the source processes the object based on the error handling configured for the stage.
- Protobuf
- Generates a record for every protobuf message. By default, the source assumes messages contain multiple protobuf messages.
- Protobuf messages must match the specified message type and be described in the descriptor file.
- When the data for a record exceeds 1 MB, the source cannot continue processing data in the message. The source handles the message based on the stage error handling property and continues reading the next message.
- For information about generating the descriptor file, see Protobuf data format prerequisites.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector flow using the SDC Record data format.
- For error records, the source provides the original record as read from the source in the original flow, as well as error information that you can use to correct the record.
- When processing error records, the source expects the error file names and contents as generated by the original flow.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- When a line or section exceeds the maximum line length defined for the source, the source truncates it. The source adds a boolean field named Truncated to indicate if the line was truncated.
- For more information about processing text with a custom delimiter, see Text data format with custom delimiters.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the source treats the XML file as a single record.
- Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
- You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
- When a record exceeds the user-defined maximum record length, the source skips the record and continues processing with the next record. It sends the skipped record to the flow for error handling.
- Use the XML data format to process valid XML documents. For more information about XML processing, see Reading and processing XML data.
-
Tip: If you want to process invalid XML documents, you can try using the text data format with custom delimiters. For more information, see Processing XML data with custom delimiters.
Configuring a Google Pub/Sub Subscriber source
About this task
Configure a Google Pub/Sub Subscriber source to consume messages from a Google Pub/Sub subscription.
Procedure
-
In the Properties panel, on the General tab, configure the
following properties:
General Property Description Name Stage name. Description Optional description. On Record Error Error record handling for the stage: - Discard - Discards the record.
- Send to Error - Sends the record to the flow for error handling.
- Stop Flow - Stops the flow.
-
On the Pub/Sub tab, configure the following
properties:
Pub/Sub Property Description Subscription ID Google Pub/Sub subscription ID to receive messages from. Num Flow Runners The number of threads that the source generates and uses for multithreaded processing. Enter a positive integer or an expression that evaluates to a positive integer. By default, the source uses the following expression to configure the property based on the number of available CPU cores on the Data Collector machine:${runtime:availableProcessors()}This expression is evaluated when you start the flow.
Max Batch Size (records) Maximum amount of records to include in a batch. Honors values up to the Data Collector maximum batch size. Default is 1000. The Data Collector default is 1000.
Max Batch Wait Time (ms) Amount of time the source will wait to fill a batch before sending an empty batch. -
On the Credentials tab, configure the following
properties:
Credentials Property Description Connection 6.4.1-0111 and later
Connection that defines the information that is required to connect to an external system. To connect to an external system, you can select a connection that contains the details, or you can specify the details in local properties. When you select a connection, the flow canvas hides properties that are defined in the connection.
Project ID Google Cloud project ID to use.
Credentials Provider Provider for Google Cloud credentials: - Default credentials provider - Uses Google Cloud default credentials.
- Service account credentials file (JSON) - Uses credentials stored in a JSON service account credentials file.
- Service account credentials (JSON) - Uses JSON-formatted credentials information from a service account credentials file.
Credentials File Path (JSON) Path to the Google Cloud service account credentials file used to connect. The credentials file must be a JSON file. Enter a path relative to the Data Collector resources directory,
$SDC_RESOURCES, or enter an absolute path.Credentials File Content (JSON) Contents of a Google Cloud service account credentials JSON file used to connect. Enter JSON-formatted credential information in plain text, or use an expression to call the information from runtime resources or a credential store.
-
Optionally, click the Advanced tab to tune the
performance of the source.
The defaults for these properties should work in most cases:
Advanced Property Description Number of Subscribers Number of subscribers to spawn. Default is 1.
Subscriber Thread Pool Size Size of the thread pool for each subscriber. Enter a positive integer or an expression that evaluates to a positive integer. By default, the source uses the following expression to configure the property based on the number of available CPU cores on the Data Collector machine:${5 * 10 * runtime:availableProcessors()}Custom Endpoint Optional endpoint to receive messages, entered in the following format: <host name>:<port number>Use to test with a Cloud SDK emulator for Google Pub/Sub.
-
On the Data Format tab, configure the following
property:
Data Format Property Description Data Format Type of data to be read. Use one of the following options: - Avro
- Binary
- Delimited
- JSON
- Protobuf
- SDC Record
- Text
- XML
-
For Avro data, on the Data Format tab, configure the
following properties:
Avro Property Description Avro Schema Location Location of the Avro schema definition to use when processing data: - Message/Data Includes Schema - Use the schema in the message.
- In Flow Configuration - Use the schema provided in the stage configuration.
- Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
Using a schema in the stage configuration or in Confluent Schema Registry can improve performance.
Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data. You can optionally use the
runtime:loadResourcefunction to load a schema definition stored in a runtime resource file.Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add and then enter the URL in the following format: http://<host name>:<port number>Basic Auth User Info User information needed to connect to Confluent Schema Registry when using basic authentication. Enter the key and secret from the
schema.registry.basic.auth.user.infosetting in Schema Registry using the following format:<key>:<secret>Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.Lookup Schema By Method used to look up the schema in Confluent Schema Registry: - Subject - Look up the specified Avro schema subject.
- Schema ID - Look up the specified Avro schema ID.
- Embedded Schema ID - Look up the Avro schema ID embedded in each message.
Schema Subject Avro schema subject to look up in Confluent Schema Registry. If the specified subject has multiple schema versions, the stage uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.
Schema ID Avro schema ID to look up in Confluent Schema Registry. -
For binary data, on the Data Format tab, configure the
following properties:
Binary Property Description Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error. -
For delimited data, on the Data Format tab, configure the
following properties:
Delimited Property Description Header Line Indicates whether a file contains a header line, and whether to use the header line. Delimiter Format Type Delimiter format type. Use one of the following options: - Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
Available when using the Apache Commons parser type.
Multi Character Field Delimiter Characters that delimit fields. Default is two pipe characters (||).
Available when using the Apache Commons parser with the multi-character delimiter format.
Multi Character Line Delimiter Characters that delimit lines or records. Default is the newline character (\n).
Available when using the Apache Commons parser with the multi-character delimiter format.
Delimiter Character Delimiter character. Select one of the available options or use Other to enter a custom character. You can enter a Unicode control character using the format \uNNNN, where N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.
Default is the pipe character ( | ).
Available when using the Apache Commons parser with a custom delimiter format.
Field Separator One or more characters to use as delimiter characters between columns. Available when using the Univocity parser.
Escape Character Escape character. Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.
Quote Character Quote character. Available when using the Apache Commons parser with the custom or multi-character delimiter format. Also available when using the Univocity parser.
Line Separator Line separator. Available when using the Univocity parser.
Allow Comments Allows commented data to be ignored for custom delimiter format. Available when using the Univocity parser.
Comment Character Character that marks a comment when comments are enabled for custom delimiter format.
Available when using the Univocity parser.
Enable Comments Allows commented data to be ignored for custom delimiter format. Available when using the Apache Commons parser.
Comment Marker Character that marks a comment when comments are enabled for custom delimiter format. Available when using the Apache Commons parser.
Lines to Skip Number of lines to skip before reading data. Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
CSV Parser Parser to use to process delimited data: - Apache Commons - Provides robust parsing and a wide range of delimited format types.
- Univocity - Can provide faster processing for wide delimited files, such as those with over 200 columns.
Default is Apache Commons.
Max Columns Maximum number of columns to process per record. Available when using the Univocity parser.
Max Character per Column Maximum number of characters to process in each column. Available when using the Univocity parser.
Skip Empty Lines Allows skipping empty lines. Available when using the Univocity parser.
Allow Extra Columns Allows processing records with more columns than exist in the header line. Available when using the Apache Commons parser to process data with a header line.
Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>.For example,
_extra_1. Default is_extra_.Available when using the Apache Commons parser to process data with a header line while allowing extra columns.
Max Record Length (chars) Maximum length of a record in characters. Longer records are not read. This property can be limited by the Data Collector parser buffer size. For more information, see Maximum record size.
Available when using the Apache Commons parser.
Ignore Empty Lines Allows empty lines to be ignored. Available when using the Apache Commons parser with the custom delimiter format.
Root Field Type Root field type to use: - List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new flows.
- List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain flows created before 1.1.0.
Parse NULLs Replaces the specified string constant with null values. NULL Constant String constant to replace with null values. Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For JSON data, on the Data Format tab, configure the
following properties:
JSON Property Description JSON Content Type of JSON content. Use one of the following options: - JSON array of objects
- Multiple JSON objects
Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Max Object Length (chars) Maximum number of characters in a JSON object. Longer objects are diverted to the flow for error handling.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum record size.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For protobuf data, on the Data Format tab, configure the
following properties:
Protobuf Property Description Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.For information about generating the descriptor file, see Protobuf data format prerequisites.
Message Type The fully-qualified name for the message type to use when reading data. Use the following format:
Use a message type defined in the descriptor file.<package name>.<message type>.Delimited Messages Indicates if a message might include more than one protobuf message. Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
-
For SDC Record data, on the Data Format tab, configure the
following properties:
SDC Record Property Description Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
-
For text data, on the Data Format tab, configure the
following properties:
Text Property Description Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated. Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum record size.
Use Custom Delimiter Uses custom delimiters to define records instead of line breaks. Custom Delimiter One or more characters to use to define records. Include Custom Delimiter Includes delimiter characters in the record. Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. -
For XML data, on the Data Format tab, configure the
following properties:
XML Property Description Delimiter Element Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:- An XML element directly under the root element.
Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.
- A simplified XPath expression that specifies the
data to use.
Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.
For more information about valid syntax, see Simplified XPath syntax.
Compression Format The compression format of the files: - None - Processes only uncompressed files.
- Compressed File - Processes files that are compressed by the supported compression formats.
- Archive - Processes files that are archived by the supported archive formats.
- Compressed Archive - Processes files that are archived and compressed by the supported archive and compression formats.
File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json. Default is *, which processes all files.
Preserve Root Element Includes the root element in the generated records. When omitting a delimiter to generate a single record, the root element is the root element of the XML document.
When specifying a delimiter to generate multiple records, the root element is the XML element specified as the delimiter element or is the last XML element in the simplified XPath expression specified as the delimiter element.
Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute. When not selected, this information is not included in the record. By default, the property is not selected.
Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces. For information about using namespaces with an XML element, see Using XML elements with namespaces.
For information about using namespaces with XPath expressions, see Using XPath expressions with namespaces.
Using simple or bulk edit mode, click the Add icon to add additional namespaces.
Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields. By default, the property is not selected.
Max Record Length (chars) The maximum number of characters in a record. Longer records are diverted to the flow for error handling.
This property can be limited by the Data Collector parser buffer size. For more information, see Maximum record size.
Charset Character encoding of the files to be processed. Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters. - An XML element directly under the root element.