Salesforce Bulk API 2.0
The Salesforce Bulk API 2.0 origin reads existing data from Salesforce using Salesforce Bulk API 2.0. To read from Salesforce with the SOAP or Bulk API, or to subscribe to notifications, use the Salesforce origin. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
When you configure the Salesforce Bulk API 2.0 origin, you specify the authentication to use. You can also use a connection to configure the origin.
When processing existing data, you configure the SOQL query, offset field, and optional initial offset to use. The origin can perform a full or incremental read at specified intervals. And under certain circumstances, the origin can also process deleted records.
The Salesforce Bulk API 2.0 origin can use multiple threads to process query result sets in parallel.
By default, the origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field.
You can specify the prefix to use for Salesforce attributes, or you can disable attribute generation entirely. You can also configure other advanced options, such as disabling query validation or using mutual authentication and an HTTP proxy for the connection.
The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Querying Data
The Salesforce Bulk API 2.0 origin executes a query to read existing data from Salesforce. Use the Salesforce Object Query Language (SOQL) to write the query.
The Salesforce Bulk API 2.0 origin uses an offset field and an initial offset
or start ID to determine where to start reading data within an object. By default, the
offset field is defined as the Salesforce Id
system field, which
contains a unique identifier for each record in a Salesforce object.
You can configure the maximum number of columns that the query can return and the maximum number of seconds that the origin waits for a response from the query.
Salesforce Bulk API 2.0 is an asynchronous API. The origin creates a query job, and periodically polls Salesforce until the job is complete. Salesforce returns query results in one or more result sets. Result sets can be processed in parallel by enabling multithreaded processing.
If the pipeline stops before it finishes reading all data, the Salesforce Bulk API 2.0 origin saves the last read offset value. When the pipeline starts again, the origin uses the last read offset value to continue processing from where it stopped. You can reset the origin to process all requested objects.
Unlike the Salesforce origin, the Salesforce Bulk API 2.0 origin does not stop the pipeline when an initial query is complete. To stop the pipeline automatically, enable the origin to generate events and use the Pipeline Finisher executor. For more information, see Event Generation.
In rare cases, the query returns data with a type that does not match the data type specified in the schema for a field. For example, the query might return a Float when the schema specifies an Integer. You can use the Mismatched Types Behavior property on the Advanced tab to configure how the origin handles mismatched types. The origin can retain the returned data, truncate the returned data to match the specified type, or round the returned data to match the specified type.
Bulk API 2.0 Queries
When querying existing data with version 2.0 of the Bulk API, you define the SOQL query and related properties to determine the data returned from Salesforce.
- SOQL query
-
When processing existing data, use the following query guidelines:
- In the WHERE clause, include the offset field and the offset value. The origin uses an offset field and value to determine the data that is returned. Include both in the WHERE clause of the query.
- In the WHERE clause, use the OFFSET constant to represent the
offset value.Use
${OFFSET}
to represent the offset value. For example, when you start a pipeline, the following query returns all data from the object where the data in the offset field is greater than the initial offset value:SELECT Id, Name FROM <object> WHERE <offset field> > ${OFFSET}
Note: When the offset values are strings, enclose${OFFSET}
in single quotation marks. - To avoid returning duplicate data, use the offset field as the
first field in the ORDER BY clause.Note: Using a field that is not the
Id
field in the ORDER BY clause can slow performance.
- Additional properties
-
You can configure several additional properties on the Query tab. For example:
- Offset Field - Typically the
Id
system field, the offset field should be an indexed field in the record. Default is theId
field. - Initial Offset - First offset value to use when the pipeline starts or after you reset the origin.
- Include Deleted Records - An optional property. Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.
- Offset Field - Typically the
Example
Let's say that you want to read all names and account numbers from the Salesforce Account object a single time.
- SOQL Query - Include the offset field and offset value in the WHERE and
ORDER BY clauses, as well as the fields to return, as
follows:
SELECT Id, Name, AccountNumber FROM Account WHERE Id > '${OFFSET}' ORDER BY Id
-
Repeat Query - Set to No Repeat to run the query a single time.
-
Initial Offset - Use the default value of fifteen zeros (
000000000000000
) for the offset value to ensure that the origin reads all records in the object. -
Offset Field - Use the default,
Id
, for the offset field.
Full and Incremental Mode
- Incremental mode
- When the origin performs an incremental query, it uses the initial offset as the offset value in the first SOQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
- Full mode
- When the origin performs a full query, it runs the specified SOQL query. The origin uses the initial offset as the offset value in the SQL query each time it requests data.
Multithreaded Processing
The Salesforce Bulk API 2.0 origin can perform parallel processing and enables the creation of a multithreaded pipeline.
When you enable multithreaded processing, the Salesforce Bulk API 2.0 origin uses multiple concurrent threads based on the Number of Threads property. When you start the pipeline, the origin creates the number of threads specified in the property.
Salesforce Bulk API 2.0 query results are returned in one or more result sets, each of which can be processed in parallel up to the Number of Threads configuration setting that you specify. You can control the maximum size of a result set with the Maximum Records per Query Result Set advanced stage property. If you are working with a very large number of query results, you might experience a timeout before receiving all of the data from Salesforce. To prevent a timeout, you can use this property to split the results into smaller sets.
When you don’t provide a value for the Maximum Records per Query Result Set property, the server uses a default value based on the service.
A Salesforce Bulk API 2.0 origin thread reads each result set into one or more batches of records. Upon filling each batch, the origin passes it to an available pipeline runner.
A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.
Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.
For example, say you enable multithreaded processing and set the Number of Threads property to 5. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.
Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read order.
At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.
For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.
Processing Deleted Records
The Salesforce Bulk API 2.0 origin can retrieve deleted records from the Salesforce recycle bin for processing.
To process deleted records, use the Include Deleted Records property on the Query tab.
Reading Custom Objects or Fields
If the origin reads custom Salesforce objects or fields, you might want to use a Field Renamer in the pipeline to rename the custom fields.
When you extend Salesforce objects, custom object and field names are appended
with the suffix __c
. For example, if you create a custom Transaction
object, Salesforce names the object Transaction__c
. The Transaction
object might contain fields named Credit_Card__c, Fare_Amount__c, and
Payment_Type__c
.
Instead of using field names appended with the suffix __c
throughout the rest of the pipeline, you can add a Field Renamer to remove the suffix
from the field names.
For more information about Salesforce custom objects, see the Salesforce documentation.
Salesforce Attributes
The Salesforce Bulk API 2.0 origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field. The origin receives these details from Salesforce.
Salesforce attributes include a user-defined prefix to differentiate the
Salesforce attributes from other attributes. The prefix is salesforce.
by default. You can change the prefix that the origin uses and you can configure the
origin not to create Salesforce attributes.
Salesforce Header Attribute
The Salesforce Bulk API 2.0 origin generates a Salesforce record header attribute to provide additional information about each record. The origin receives these details from Salesforce.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information in
the attribute.
Salesforce Header Attribute | Description |
---|---|
<Salesforce prefix>sobjectType | Provides the Salesforce source object for the record. Generated when the origin executes a query. |
For more information about record header attributes, see Record Header Attributes.
Salesforce Field Attributes
The Salesforce Bulk API 2.0 origin generates Salesforce field attributes that provide additional information about each field, such as the data type of the Salesforce field. The origin receives these details from Salesforce.
You can use the record:fieldAttribute
or
record:fieldAttributeOrDefault
functions to access the information
in the attribute.
Salesforce Field Attribute | Description |
---|---|
<Salesforce prefix>salesforceType | Provides the original Salesforce data type for the field. |
<Salesforce prefix>length | Provides the original length for all string and textarea fields. |
<Salesforce prefix>precision | Provides the original precision for all double fields. |
<Salesforce prefix>scale | Provides the original scale for all double fields. |
<Salesforce prefix>digits | Provides the maximum number of digits for all integer fields. |
For more information about field attributes, see Field Attributes.
Event Generation
The Salesforce Bulk API 2.0 origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query.
- With the Pipeline Finisher executor
to stop the pipeline and transition the pipeline to a Finished state when the
origin completes processing available data.
When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to repeat an incremental query, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. If you configure the origin to repeat a full query, when you restart the pipeline, the origin uses the initial offset.
For an example, see Stopping a Pipeline After Processing All Available Data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
-
With a destination to store information about completed queries.
For an example, see Preserving an Audit Trail of Events.
Event Record
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses the following type:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The no-more-data event record includes no record fields.
Changing the API Version
The Salesforce Bulk API 2.0 origin uses version 57.0.0 of the Salesforce API, by default. You can use a different Salesforce API version if you need to access functionality not present in version 57.0.0.
Configuring a Salesforce Bulk API 2.0 Origin
Configure a Salesforce Bulk API 2.0 origin to read data from Salesforce using Salesforce Bulk API 2.0.