Snowflake
The Snowflake destination writes data to one or more tables in a Snowflake database. You can use the Snowflake destination to write to any accessible Snowflake database, including those hosted on Amazon S3, Google Cloud Storage, Microsoft Azure, and private Snowflake installations. For information about supported versions, see Supported Systems and Versions in the Data Collector documentation.
The Snowflake destination stages CSV files to either an internal Snowflake stage or an external stage in Amazon S3, Google Cloud Storage, or Microsoft Azure. Then, the destination sends a command to Snowflake to process the staged files.
You can use the Snowflake destination to write new data or change data capture (CDC) data to Snowflake. When processing new data, the destination can load data to Snowflake using the COPY command or Snowpipe. When processing CDC data, the destination uses the MERGE command.
The Snowflake destination writes data from record fields to table columns based on matching names. The destination can compensate for data drift by creating new columns and tables in Snowflake when new fields and table references appear in records.
When you configure the Snowflake destination, you specify the Snowflake region, account and connection information and the number of connections to use to write to Snowflake. You can specify an organization name to use and define additional Snowflake connection properties, as needed. You can also use a connection to configure the destination.
You configure the Snowflake warehouse, database, schema, and the tables to use. You specify load method, error handling, and staging details, and optionally define advanced properties for Amazon S3 or Microsoft Azure.
You can optionally enable data drift. You can have the destination create new tables, as needed. You can also specify whether to create all new columns as Varchar instead of inferring the type, and whether to create Decimal columns for decimal data.
You can configure the root field for the row, and any first-level fields that you want to exclude from the record. You can also configure the destination to replace missing fields or fields containing invalid data types with the specified default values, and to replace newline characters in string fields with a specified character. You can specify the quoting mode, define quote and escape characters, and configure the destination to trim spaces. When processing CDC data, you specify the primary key location.
Before you use the Snowflake destination, you must complete several prerequisite tasks.
Sample Use Cases
Here are a couple common scenarios for using the Snowflake destination:
- Replicating a database
-
Say you want to replicate data being written to five tables in an Oracle database schema. You want to write both existing data and incoming CDC data to Snowflake.
To do this, you create two pipelines, one to load the existing data and one to process incoming data, as follows:- First pipeline for replicating data - The first pipeline uses the
multithreaded JDBC Multitable Consumer origin to read from the
tables that you want to replicate. To take advantage of Snowflake’s
bulk load abilities, you configure the origin to use a very large
batch size - somewhere between 20,000 and 50,000 records per batch.
You set the number of threads to five to read data from all five
tables concurrently, and you increase the connection pool size to
five to allow writing to five tables in Snowflake concurrently.
In the pipeline, you use as many processors as needed to process the data. Then, you configure the Snowflake destination to load the data into Snowflake.
If you wanted the data to be immediately available after being written to Snowflake, you would use the default COPY command load method. But since you can tolerate a bit of latency, you use the faster, cheaper Snowpipe to load the data. Using Snowpipe requires performing some prerequisite steps in Snowflake.
After the initial load is complete, you stop the first pipeline and start the second pipeline to process incoming CDC data.
- Second pipeline for CDC data - In the second pipeline, you use the
Oracle CDC Client origin and the Snowflake destination. You
configure this origin to use a very large batch size as well,
somewhere between 20,000 and 50,000 records per batch.
In the destination, you select the Processing CDC Data (Use MERGE) property to perform CRUD operations when writing to Snowflake. This results in the destination using the MERGE command to load data into Snowflake. You specify a field in the records that contains the table name to use when writing to Snowflake, and you define the key columns for each table or configure the destination to query Snowflake for the key columns.
To improve performance, you also increase the connection pool size. For more information, see Performance Optimization.
- First pipeline for replicating data - The first pipeline uses the
multithreaded JDBC Multitable Consumer origin to read from the
tables that you want to replicate. To take advantage of Snowflake’s
bulk load abilities, you configure the origin to use a very large
batch size - somewhere between 20,000 and 50,000 records per batch.
You set the number of threads to five to read data from all five
tables concurrently, and you increase the connection pool size to
five to allow writing to five tables in Snowflake concurrently.
- Offloading from Hadoop
- Say you have a Hadoop data lake that you want to move into Snowflake. In this case, you only need one pipeline that includes the multithreaded Hadoop FS Standalone origin, all of the processors that you need, and the Snowflake destination.
Prerequisites
Before you configure the Snowflake destination, complete the following prerequisites:
- Create an internal or external Snowflake stage.
- To use the COPY command to load new data, complete the COPY prerequisites.
- To use Snowpipe to load new data, complete the Snowpipe prerequisites.
- To use the MERGE command to load CDC data, complete the MERGE prerequisites.
Create a Snowflake Stage
Before using the destination in a pipeline, you must create a Snowflake internal or external stage.
The Snowflake destination stages CSV files to either an internal Snowflake stage or an external stage in Amazon S3, Google Cloud Storage, or Microsoft Azure. Then, the destination sends a command to Snowflake to process the staged files.
To use an external stage, create the external stage with the cloud service provider that hosts your Snowflake warehouse.
- Snowflake internal stage
- You can stage data in Snowflake internal user stages or named stages. Do not use internal table stages.
- Amazon S3 external stage
- To stage data in an Amazon S3 external stage, create a Snowflake external stage in a bucket in the same S3 region that hosts your Snowflake virtual warehouse. For example, if your Snowflake warehouse is in AWS US West, then create the Snowflake external stage in a bucket in the AWS US West region.
- Google Cloud Storage external stage
- To stage data in a Google Cloud Storage external stage, create a Snowflake storage integration in Google Cloud Storage. This is a multistep process described in the Snowflake documentation that ends with creating a Snowflake external stage. Be sure to complete all required steps.
- Microsoft Azure external stage
- To stage data in a Microsoft Azure external stage, complete the
following tasks:
- Configure Snowflake authentication for the Microsoft
Azure Blob Storage container that you want to use.
You can use an SAS token or an Azure account name and key for authentication. For information about configuring SAS token authentication, see the Snowflake documentation.
- Create a Snowflake external stage in the container.
When you create a Snowflake external stage, you specify a URL that defines the name and location for the stage. Include a trailing slash in the URL to ensure that Snowflake loads all staged data. You might also include a prefix in the stage name to indicate that the external stage is for Data Collector.
For example, the following URL creates an external stage namedsdc-externalstage
inazure://myaccount.blob.core.windows.net/mycontainer/load/
and loads all staged data to Snowflake:azure://myaccount.blob.core.windows.net/mycontainer/load/sdc-externalstage/
You can create an Azure stage using the Snowflake web interface or SQL. For more information, see Creating an Azure Stage in the Snowflake documentation.
- Configure Snowflake authentication for the Microsoft
Azure Blob Storage container that you want to use.
AWS Credentials
When the Snowflake destination stages data on Amazon S3, it must pass credentials to Amazon Web Services.
- Instance profile
- When the execution Data Collector runs on an Amazon EC2 instance that has an associated instance profile, Data Collector uses the instance profile credentials to automatically authenticate with AWS.
- AWS access key pair
-
When the execution Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an instance profile, you can connect with an AWS access key pair.
To connect with an AWS access key pair, on the Staging tab, set the Authentication Method property to AWS Keys. Then define the access key ID and secret access key.Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
Google Cloud Storage Credentials
Before staging data on Google Cloud Storage, the Snowflake destination must pass credentials to Google Cloud.
- Google Cloud default credentials
- Credentials in a file
- Credentials in a stage property
For details on how to configure each option, see Security in Google Cloud Stages.
COPY Prerequisites
When processing new data, you can configure the destination to use the COPY command to load data to Snowflake tables.
Using the COPY command to load data requires a role with one of the following sets of access privileges:
- Required privileges when using an internal Snowflake stage:
Object Type Privilege Internal Snowflake stage READ, WRITE Table SELECT, INSERT - Required privileges when using an external stage:
Object Type Privilege External stage USAGE Table SELECT, INSERT
If necessary, create a custom role in Snowflake and grant the role the required access privileges. Then, associate the role with the stage. For more information, see Define a Role.
Snowpipe Prerequisites
When processing new data, you can use Snowpipe, the Snowflake continuous ingestion engine, to load data to Snowflake tables.
- In Snowflake, create a pipe for Snowpipe to use to load data.
For more information about creating a pipe, see the Snowflake documentation.
- In Snowflake, generate a private key PEM and a public key PEM.
For details about key-pair authentication, see the Snowflake documentation. You do not need to generate JSON Web Tokens (JWT) as described in Step 5.
When you configure the destination, you specify the private key PEM and password and the public key PEM.
- In Snowflake, assign the public key to the Snowflake user account configured in
the destination.
You can use the Snowflake console or the ALTER USER command.
- Optionally, to secure the private key PEM and password, use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
- In Snowflake, create a custom role and grant the role the access privileges
required to use Snowpipe.
For a list of the required Snowpipe access privileges, see the Snowflake documentation.
- Then, associate the role with the stage.
For more information, see Define a Role.
MERGE Prerequisites
When processing CDC data, you can configure the destination to use the MERGE command to load data to Snowflake tables.
Using the MERGE command to load data requires a role with one of the following sets of access privileges:
- Required privileges when using an internal Snowflake stage:
Object Type Privilege Internal Snowflake stage READ, WRITE Table SELECT, INSERT, UPDATE, DELETE - Required privileges when using an external stage:
Object Type Privilege External stage USAGE Table SELECT, INSERT, UPDATE, DELETE
If necessary, create a custom role in Snowflake and grant the role the required access privileges. Then, associate the role with the stage. For more information, see Define a Role.
Implementation Notes
Note the following implementation requirements when working with the Snowflake destination:
- It is best practice to configure the Snowflake warehouse to auto-resume upon receiving new queries.
- If you change the destination table schema manually rather than enabling data drift handling, restart the pipeline to allow the destination to discover schema changes.
Load Methods
- COPY command for new data
- The COPY command, the default load method, performs a bulk synchronous load to Snowflake, treating all records as INSERTS. Use this method to write new data to Snowflake tables.
- Snowpipe for new data
- Snowpipe, the Snowflake continuous ingestion service, performs an asynchronous load to Snowflake, treating all records as INSERTS. Use this method to write new data to Snowflake tables. When needed, you can configure the destination to use a custom Snowflake endpoint.
- MERGE command for CDC data
- Like the COPY command, the MERGE command performs a bulk synchronous load to Snowflake. But instead of treating all records as INSERT, it inserts, updates, upserts, and deletes records as appropriate. Use this method to write change data capture (CDC) data to Snowflake tables using CRUD operations.
For more information about Snowpipe or the COPY or MERGE commands, see the Snowflake documentation.
Primary Key Location
- Query Snowflake
- The destination queries Snowflake for the primary key columns for each table that it writes to. Use this option only when primary key columns are defined in the Snowflake tables.
- Use JDBC record header attribute
- The destination uses primary key columns stored in the
jdbc.primaryKeySpecification
record header attribute. - Use specified columns for each table
- The destination uses the primary key columns that you specify in the Table Key Columns properties.
Error Handling
You can configure Snowflake error handling in the Snowflake destination. The error
handling properties determine how the on_error
option is defined in the
Snowflake SQL query.
Since the Snowflake MERGE command does not allow the on_error
option,
you cannot configure error handling when the destination processes CDC data.
- COPY command
- Use the Error Behavior property on the Snowflake tab when you use the COPY
command to load data to Snowflake. When you use Snowpipe or the MERGE
command, the Error Behavior property is ignored.
The Error Behavior property provides the following error handling options:
- Snowpipe
- You can use the Snowpipe Error Behavior property on the Snowpipe tab when you enable the Use Snowflake and the Snowpipe Auto Create properties on the Snowflake tab. The destination cannot manage Snowpipe error handling when the destination does not create the Snowpipe.
Define a Role
The Snowflake destination requires a Snowflake role that grants all privileges needed to load data using the configured load method. Each load method requires a different set of privileges.
Before configuring the destination, ensure that you have granted the required privileges to a Snowflake role, as explained in Prerequisites.
- Assign the custom role as the default role
- In Snowflake, assign the custom role as the default role for the Snowflake user account specified in the stage. A Snowflake user account is associated with a single default role.
- Override the default role with the custom role
- In the stage, use the Role property to specify the name of the custom role. The custom role overrides the role assigned to the Snowflake user account specified in the stage.
- Use a Snowflake connection
- When working with Control Hub, you can configure a Snowflake connection to provide connection details for Snowflake stages.
Performance Optimization
Use the following tips to optimize for performance and cost-effectiveness when using the Snowflake destination:
- Increase the batch size
- The maximum batch size is determined by the origin in the pipeline and typically has a default value of 1,000 records. To take advantage of Snowflake's bulk loading abilities, increase the maximum batch size in the pipeline origin to 20,000-50,000 records. Be sure to increase the Data Collector java heap size, as needed. For more information, see Java Heap Size in the Data Collector documentation.
- Configure pipeline runners to wait indefinitely when idle
- With the default configuration, a pipeline runner generates an empty batch
after waiting idly for 60 seconds. As a result, the destination continues to
execute metadata queries against Snowflake, even though no data needs to be
processed. To reduce Snowflake charges when a pipeline runner waits idly,
set the Runner Idle Time pipeline property to -1. This configures pipeline
runners to wait indefinitely when idle without generating empty batches,
which allows Snowflake to pause processing.Important: Configuring pipeline runners to wait indefinitely when idle is strongly recommended. Using the default pipeline runner idle time can result in unnecessary Snowflake resource consumption and runtime costs.
- Use multiple threads
- When writing to Snowflake using Snowpipe or the COPY command, you can use multiple threads to improve performance when you include a multithreaded origin in the pipeline. When Data Collector resources allow, using multiple threads enables processing multiple batches of data concurrently.
- Enable additional connections to Snowflake
- When writing to multiple Snowflake tables using the COPY or MERGE commands, increase the number of connections that the Snowflake destination makes to Snowflake. Each additional connection allows the destination to write to an additional table, concurrently.
Row Generation
When writing a record to a table, the Snowflake destination includes all record
fields in the resulting row, by default. The destination uses the root field,
/
, as the basis for the resulting row.
You can configure the Row Field property to specify a map or list-map field in the record as the basis for the row. The resulting record includes only the data from the specified map or list-map field and excludes all other record data. Use this functionality when the data that you want to write to Snowflake exists in a single map or list-map field within the record.
If you want to use the root field, but do not want to include all fields in the resulting row, you can configure the destination to ignore all specified first-level fields.
The Snowflake destination converts all map or list-map fields within the specified root field to the Snowflake Variant data type. The Snowflake destination fully supports the Variant data type.
By default, records with missing fields or with invalid data types in fields are treated as error records. You can configure the destination to replace missing fields and data of invalid types with user-defined default values. Then, you specify the default values to use for each data type. You can also configure the destination to replace newline characters in string fields with a replacement character.
Specifying Tables
You can use the Snowflake destination to write to one or more tables in a Snowflake schema.
- Single table
- To write data to a single table, simply enter table name as
follows:
<table_name>
- Multiple tables
- To write data to multiple tables, specify a field in the record that defines the database and tables.
Use the Table property on the Snowflake tab to specify the tables to write to.
Enabling Data Drift Handling
The Snowflake destination can automatically compensate for changes in column or table requirements, also known as data drift. You cannot enable data drift handling when the destination is configured to use Snowpipe.
- Create new columns
-
The destination can create new columns in Snowflake tables when new fields appear in records. For example, if a record suddenly includes a new
Address2
field, the destination creates a newAddress2
column in the target table.By default, the destination creates new columns based on the data in the new fields, such as creating a Double column for decimal data. You can, however, configure the destination to create all new columns as Varchar.
- Create new tables
- The destination can create new
tables as needed. For example, say the destination writes data to
tables based on the region name in the
Region
field. When a newSW-3
region shows up in a record, the destination creates a newSW-3
table in Snowflake and writes the record to the new table.
Generated Data Types
When creating new tables or creating new columns in existing tables, the Snowflake destination uses field names to generate the new column names.
Record Field Data Type | Snowflake Column Data Type |
---|---|
Byte Array | Binary |
Char | Char |
String | Varchar |
Byte, Integer, Long, Short | Number |
Decimal, Double | Double |
Boolean | Boolean |
Date | Date |
Datetime | Timestamp_Ntz |
Float | Float |
Time | Time |
Zoned Datetime | Timestamp_Tz |
Map, List-Map | Variant |
The Snowflake destination fully supports the Variant data type.
Creating Tables
If you configure the Snowflake destination to use Snowpipe or to handle data drift, you can also configure the destination to create tables. Once configured, the destination creates tables when the specified or needed tables do not exist in Snowflake.
- Table
- The destination creates a table if the table specified in the Table property does not exist.
- Table columns
- In the created tables, the destination determines the columns from the first batch of data processed. The destination infers the data types from the data in the batch.
- Primary key columns
- In the created tables, the destination creates primary key columns as
follows:
- If configured to use Snowpipe or the COPY command for new data, the
destination uses the primary key information in the
jdbc.primaryKeySpecification
record header attribute to create primary key columns.Several origins, such as the Oracle CDC Client origin and the PostgreSQL CDC Client origin, populate the record header attribute automatically. You can also use a processor, such as the Expression Evaluator, to populate the header attribute within the pipeline.
When the
jdbc.primaryKeySpecification
record header attribute does not exist, the destination creates tables without primary key columns. - If configured to use the MERGE command for CDC data and handle data drift, the destination uses the primary key information in the Primary Key Location property to create primary key columns. To allow for existing records that do not include the new primary key columns, the new primary key columns are nullable.
- If configured to use the MERGE command for CDC data but not handle data drift, the destination uses the primary key information in the Primary Key Location property to create primary key columns. The new primary key columns are not nullable.
- If configured to use Snowpipe or the COPY command for new data, the
destination uses the primary key information in the
CRUD Operation Processing
The Snowflake destination can insert, update, upsert, or delete data when you configure the destination to process CDC data. When processing CDC data, the destination uses the MERGE command to write data to Snowflake.
sdc.operation.type
record header attribute. The destination
performs operations based on the following numeric values:- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
- 4 for UPSERT
If
your pipeline has a CRUD-enabled origin that processes
changed data, the destination simply reads the operation
type from the sdc.operation.type
header
attribute that the origin generates. If your pipeline has
a non-CDC origin, you can use the Expression Evaluator
processor or a scripting processor to define the record
header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled
origins, see Processing Changed Data.
Configuring a Snowflake Destination
Configure a Snowflake destination to write data to Snowflake tables. Before you use the destination in a pipeline, complete the prerequisite tasks.