Salesforce Bulk API 2.0
The Salesforce Bulk API 2.0 source reads existing data from Salesforce using Salesforce Bulk API 2.0. To read from Salesforce with the SOAP or Bulk API, or to subscribe to notifications, use the Salesforce source. For information about supported versions, see Supported systems and versions.
When you configure the Salesforce Bulk API 2.0 source, you specify the authentication to use.
When processing existing data, you configure the SOQL query, offset field, and optional initial offset to use. The source can perform a full or incremental read at specified intervals. And under certain circumstances, the source can also process deleted records.
The Salesforce Bulk API 2.0 source can use multiple threads to process query result sets in parallel.
By default, the source generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field.
You can specify the prefix to use for Salesforce attributes, or you can disable attribute generation entirely. You can also configure other advanced options, such as disabling query validation or using mutual authentication and an HTTP proxy for the connection.
The source can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
Querying data
The Salesforce Bulk API 2.0 source executes a query to read existing data from Salesforce. Use the Salesforce Object Query Language (SOQL) to write the query.
The Salesforce Bulk API 2.0 source uses an offset field and an initial offset
or start ID to determine where to start reading data within an object. By default, the
offset field is defined as the Salesforce Id system field, which
contains a unique identifier for each record in a Salesforce object.
You can configure the maximum number of columns that the query can return and the maximum number of seconds that the source waits for a response from the query.
Salesforce Bulk API 2.0 is an asynchronous API. The source creates a query job, and periodically polls Salesforce until the job is complete. Salesforce returns query results in one or more result sets. Result sets can be processed in parallel by enabling multithreaded processing.
If the flow stops before it finishes reading all data, the Salesforce Bulk API 2.0 source saves the last read offset value. When the flow starts again, the source uses the last read offset value to continue processing from where it stopped. You can reset the offset to process all requested objects.
Unlike the Salesforce source, the Salesforce Bulk API 2.0 source does not stop the flow when an initial query is complete. To stop the flow automatically, enable the source to generate events and use the Pipeline Finisher executor. For more information, see Event generation.
In rare cases, the query returns data with a type that does not match the data type specified in the schema for a field. For example, the query might return a Float when the schema specifies an Integer. You can use the Mismatched Types Behavior property on the Advanced tab to configure how the source handles mismatched types. The source can retain the returned data, truncate the returned data to match the specified type, or round the returned data to match the specified type.
Bulk API 2.0 queries
When querying existing data with version 2.0 of the Bulk API, you define the SOQL query and related properties to determine the data returned from Salesforce.
- SOQL query
-
When processing existing data, use the following query guidelines:
- In the WHERE clause, include the offset field and the offset value. The source uses an offset field and value to determine the data that is returned. Include both in the WHERE clause of the query.
- In the WHERE clause, use the OFFSET constant to represent the
offset value.Use
${OFFSET}to represent the offset value. For example, when you start a flow, the following query returns all data from the object where the data in the offset field is greater than the initial offset value:SELECT Id, Name FROM <object> WHERE <offset field> > ${OFFSET}Note: When the offset values are strings, enclose${OFFSET}in single quotation marks. - To avoid returning duplicate data, use the offset field as the
first field in the ORDER BY clause.Note: Using a field that is not the
Idfield in the ORDER BY clause can slow performance.
- Additional properties
-
You can configure several additional properties on the Query tab. For example:
- Offset Field - Typically the
Idsystem field, the offset field should be an indexed field in the record. Default is theIdfield. - Initial Offset - First offset value to use when the flow starts or after you reset the source.
- Include Deleted Records - An optional property. Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.
- Offset Field - Typically the
Example
Let's say that you want to read all names and account numbers from the Salesforce Account object a single time.
- SOQL Query - Include the offset field and offset value in the WHERE and
ORDER BY clauses, as well as the fields to return, as
follows:
SELECT Id, Name, AccountNumber FROM Account WHERE Id > '${OFFSET}' ORDER BY Id -
Repeat Query - Set to No Repeat to run the query a single time.
-
Initial Offset - Use the default value of fifteen zeros (
000000000000000) for the offset value to ensure that the source reads all records in the object. -
Offset Field - Use the default,
Id, for the offset field.
Full and incremental mode
- Incremental mode
- When the source performs an incremental query, it uses the initial offset as the offset value in the first SOQL query. As the source completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
- Full mode
- When the source performs a full query, it runs the specified SOQL query. The source uses the initial offset as the offset value in the SQL query each time it requests data.
Multithreaded processing
The Salesforce Bulk API 2.0 source can perform parallel processing and enables the creation of a multithreaded flow.
When you enable multithreaded processing, the Salesforce Bulk API 2.0 source uses multiple concurrent threads based on the Number of Threads property. When you start the flow, the source creates the number of threads specified in the property.
Salesforce Bulk API 2.0 query results are returned in one or more result sets, each of which can be processed in parallel up to the Number of Threads configuration setting that you specify. You can control the maximum size of a result set with the Maximum Records per Query Result Set advanced stage property. If you are working with a very large number of query results, you might experience a timeout before receiving all of the data from Salesforce. To prevent a timeout, you can use this property to split the results into smaller sets.
When you don’t provide a value for the Maximum Records per Query Result Set property, the server uses a default value based on the service.
A Salesforce Bulk API 2.0 source thread reads each result set into one or more batches of records. Upon filling each batch, the source passes it to an available flow runner.
A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source. Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
For example, say you enable multithreaded processing and set the Number of Threads property to 5. When you start the flow, the source creates five threads, and Data Collector creates a matching number of flow runners. Upon receiving data, the source passes a batch to each of the flow runners for processing.
Each flow runner performs the processing associated with the rest of the flow. After a batch is written to flow targets, the flow runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other flow runners, so batches may be written differently from the read order.
At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded flows, see Multithreaded flow overview.
Processing deleted records
The Salesforce Bulk API 2.0 source can retrieve deleted records from the Salesforce recycle bin for processing.
To process deleted records, use the Include Deleted Records property on the Query tab.
Reading custom objects or fields
If the source reads custom Salesforce objects or fields, you might want to use a Field Renamer in the flow to rename the custom fields.
When you extend Salesforce objects, custom object and field names are appended
with the suffix __c. For example, if you create a custom Transaction
object, Salesforce names the object Transaction__c. The Transaction
object might contain fields named Credit_Card__c, Fare_Amount__c, and
Payment_Type__c.
Instead of using field names appended with the suffix __c
throughout the rest of the flow, you can add a Field Renamer to remove the suffix
from the field names.
For more information about Salesforce custom objects, see the Salesforce documentation.
Salesforce attributes
The Salesforce Bulk API 2.0 source generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field. The source receives these details from Salesforce.
Salesforce attributes include a user-defined prefix to differentiate the
Salesforce attributes from other attributes. The prefix is salesforce.
by default. You can change the prefix that the source uses and you can configure the
source not to create Salesforce attributes.
Salesforce header attribute
The Salesforce Bulk API 2.0 source generates a Salesforce record header attribute to provide additional information about each record. The source receives these details from Salesforce.
You can use the record:attribute or
record:attributeOrDefault functions to access the information in
the attribute.
| Salesforce Header Attribute | Description |
|---|---|
| <Salesforce prefix>sobjectType | Provides the Salesforce source object for the record. Generated when the source executes a query. |
For more information about record header attributes, see Record Header Attributes.
Salesforce field attributes
The Salesforce Bulk API 2.0 source generates Salesforce field attributes that provide additional information about each field, such as the data type of the Salesforce field. The source receives these details from Salesforce.
You can use the record:fieldAttribute or
record:fieldAttributeOrDefault functions to access the information
in the attribute.
| Salesforce Field Attribute | Description |
|---|---|
| <Salesforce prefix>salesforceType | Provides the original Salesforce data type for the field. |
| <Salesforce prefix>length | Provides the original length for all string and textarea fields. |
| <Salesforce prefix>precision | Provides the original precision for all double fields. |
| <Salesforce prefix>scale | Provides the original scale for all double fields. |
| <Salesforce prefix>digits | Provides the maximum number of digits for all integer fields. |
For more information about field attributes, see Field Attributes.
Event generation
The Salesforce Bulk API 2.0 source can generate events that you can use in an event stream. When you enable event generation, the source generates an event when it completes processing the data returned by the specified query.
- With the Pipeline Finisher executor
to stop the flow and transition the flow to a Finished state when the
source completes processing available data.
When you restart a flow stopped by the Pipeline Finisher executor, the source processes data based on how you configured the source. For example, if you configure the source to repeat an incremental query, the source saves the offset when the executor stops the flow. When it restarts, the source continues processing from the last-saved offset. If you configure the source to repeat a full query, when you restart the flow, the source uses the initial offset.
For an example, see Stopping a flow after processing all available data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending email during flow processing.
-
With a target to store information about completed queries.
For an example, see Preserving an audit trail of events.
Event record
| Record Header Attribute | Description |
|---|---|
| sdc.event.type | Event type. Uses the following type:
|
| sdc.event.version | Integer that indicates the version of the event record type. |
| sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The no-more-data event record includes no record fields.
Changing the API version
About this task
The Salesforce Bulk API 2.0 source uses version 57.0.0 of the Salesforce API, by default. You can use a different Salesforce API version if you need to access functionality not present in version 57.0.0.
Procedure
Configuring a Salesforce Bulk API 2.0 source
About this task
Configure a Salesforce Bulk API 2.0 source to read data from Salesforce using Salesforce Bulk API 2.0.