Oracle Multitable Consumer
The Oracle Multitable Consumer source reads Oracle database data from multiple tables and multiple schemas through a JDBC connection. For information about supported versions, see Supported systems and versions.
Use the source to read multiple tables from one or more schemas in the same database. For example, you might use the source to perform database replication.
When you configure the source, you specify connection information and custom JDBC configuration properties to determine how the source connects to the database. You can also use a connection to configure the source.
You define groups of database tables to read. The source generates SQL queries based on the table configurations that you define, and then returns data as a map with column names and field values.
When you define the table configurations, you can optionally override the default key column and specify the initial offset to use. By default, the source processes tables incrementally, using primary key columns or user-defined offset columns to track its progress. You can configure the source to perform non-incremental processing to enable it to also process tables that do not have a key or offset column.
You can configure the source to perform multithreaded partition processing, multithreaded table processing, or use the default - a mix of both. You also specify the processing batch strategy. When configuring partitions, you can configure the offset size, number of active partitions, and offset conditions.
You can configure advanced properties, such as the initial order to read from tables, connection related properties, and transaction isolation. And you can specify what the source does when encountering an unsupported data type.
When the flow stops, the Oracle Multitable Consumer source notes where it stops reading. When the flow starts again, the source continues processing from where it stopped by default. You can reset the offset to process all available data, using any initial offsets that you defined.
By default, the source generates JDBC record header and field attributes that provide additional information about each record and field.
You can configure advanced connection properties. To use a JDBC version older than 4.0, you specify the driver class name and define a health check query.
The source can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
Before you use the Oracle Multitable Consumer source, you must complete a prerequisite task.
Prerequisite
Before using the Oracle Multitable Consumer source, install the Oracle JDBC driver for the database. The source cannot access the database until you install this driver.
- Download the Oracle JDBC driver from the Oracle website.Note: Writing XML data to Oracle requires installing the Oracle Data Integrator Driver for XML. For more information, see the Oracle documentation.
- Install the driver as an external library into the JDBC branded Oracle stage
library,
streamsets-datacollector-jdbc-branded-oracle-lib, which includes the source.
Oracle data types
The Oracle Multitable Consumer source converts Oracle data types into Data Collector data types.
| Oracle Data Type | Data Collector Data Type |
|---|---|
| Number | Decimal |
| Char | String |
| Varchar, Varchar2 | String |
| Nchar, NvarChar2 | String |
| Binary_float | Float |
| Binary_double | Double |
| Date | Datetime |
| Timestamp | Datetime |
| Timestamp with time zone | Zoned_datetime |
| Timestamp with local time zone | Zoned_datetime |
| Long | String |
| Blob | Byte_array |
| Clob | String |
| Nclob | String |
| XMLType | String |
Unsupported data types
- Stops the flow
- If the stage encounters an unsupported data type, the stage stops the
flow after completing the processing of the previous records and
displays the following
error:
JDBC_37 - Unsupported type 1111 for column. - Converts to string
- If the stage encounters an unsupported data type, the stage converts the data to string when possible, and then continues processing. Not all unsupported data types can successfully be converted to string. When using this option, verify that the data is converted to string as expected.
Table configuration
When you configure an Oracle Multitable Consumer source, you define a table configuration for each group of tables that you want to read. A table configuration defines a group of tables with the same table name pattern, that are from one or more schemas with the same name pattern, and that have proper primary keys or the same user-defined offset columns.
You can define one or more table configurations.
For example, you can define one table configuration to replicate a database that has a
proper primary key for each table. You simply enter the schema name and use the default
table name pattern % which matches all tables in the schema.
store_astore_bstore_ccustomers
The three store tables use orderID as the primary key. You want to
override the primary key for the customers table, and so need to define
customerID as the offset column for that table. You want to read
all available data in the tables, so do not need to define an initial offset value.
- Schema - SALES
- Table Name Pattern - store%
- Schema - SALES
- Table Name Pattern - customers
- Override Offset Columns - enabled
- Offset Columns - customerID
Schema, table name, and exclusion patterns
You define the group of tables that the Oracle Multitable Consumer source reads by defining schema and table name patterns for the table configuration. The source reads all tables with names that match the table pattern in the schemas with names that match the schema pattern.
The schema and table name patterns use the SQL LIKE syntax. For
example, the LIKE syntax uses the percentage wildcard (%) to represent any string of
zero or more characters. The schema name pattern st% matches schemas
whose names start with st. The default table name pattern
% matches all tables in the specified schemas.
For more information about valid patterns for the SQL LIKE syntax, see the Microsoft documentation.
You can optionally define a schema or table exclusion pattern to exclude some schemas or tables from being read. The schema and table exclusion patterns use a Java-based regular expression, or regex. For more information about using regular expressions with Data Collector, see Regular expressions overview.
US_WEST
and US_EAST schemas except for tables that start with
dept. You enter the following schema, table name pattern, and table
exclusion pattern: - Schema - US%
- Table Name Pattern - %
- Table Exclusion Pattern - dept.*
Since you do not need to exclude any schemas, you simply leave the schema exclusion pattern empty.
sys and system schemas. You enter the
following schema, table name pattern, and schema exclusion pattern and leave the table
exclusion pattern blank: - Schema - %
- Table Name Pattern - %
- Schema Exclusion Pattern - sys|system
Offset column and value
The Oracle Multitable Consumer source uses an offset column and initial offset value to determine where to start reading data within tables and partitions.
By default, the source uses the primary key of the tables as the offset column and uses no initial offset value. When you use multithreaded table processing and the table has a composite primary key, the source uses each primary key as an offset column. You cannot use composite keys with multithreaded partition processing.
SELECT * FROM <table> ORDER BY <offset column_1>, <offset column_2>, ...Where <offset column_n> represents each primary key of the
table, such as when the table has a composite primary key. When you restart the
flow or when the source switches back to a previously read table, the source
adds a WHERE clause to the SQL query to continue reading from the last saved
offset.
To use this default behavior, you do not need to configure any of the offset properties.
- Override the primary key as the offset column
- You can override the primary key and define another offset column or columns. Or if the table doesn’t have a primary key, you can define the offset column or columns to use.
- Define an initial offset value
- The initial offset value is a value within the offset column where you want the JDBC Multitable Consumer source to start reading. When you define an initial offset value, you must first enter the offset column name and then the value. If you are using the default primary key as the offset column, enter the name of the primary key.
- Define additional offset column conditions
- You can use the expression language to define additional conditions that the source uses to determine where to start reading data. The source adds the defined condition to the WHERE clause of the SQL query.
Multithreaded processing modes
The Oracle Multitable Consumer source performs parallel processing and enables the creation of a multithreaded flow. The source can use multiple threads to process entire tables or partitions within tables.
By default, the source performs multithreaded partition processing for the tables that fulfill the partition processing requirements, while performing multithreaded table processing for all other tables. When using the default behavior, the source notes the tables that allow partition processing in the Data Collector log. When needed, you can configure the source to require partition processing for all tables or to perform only table processing. You can also allow the single-threaded non-incremental processing of tables when needed.
- Multithreaded table processing - The source can use up to one thread per table. Can process tables with multiple offset columns.
- Multithreaded partition processing - The source can use up to one
thread per table partition. Use to process larger volumes of data than
multithreaded table processing.
Multithreaded partition processing requires a single primary key or user-defined offset column of a supported data type, and additional details for partition creation. Tables with composite keys or a key or user-defined offset column of an unsupported data type cannot be partitioned.
- Off - Use to perform multithreaded table processing.
Can be used to perform non-incremental loads of tables without key or offset columns, when enabled.
- On (Best Effort) - Use to perform partition processing where possible and allow
multithreaded table processing for tables with multiple key or offset columns.
Can be used to perform non-incremental loads of tables without key or offset columns, when enabled.
- On (Required) - Use to perform partition processing for all specified tables.
Does not allow performing other types of processing for tables that do not meet the partition processing requirements.
Multithreaded table processing
As the flow runs, each thread connects to the source system, creates a batch of data, and passes the batch to an available flow runner. A flow runner is a sourceless flow instance - an instance of the flow that includes all of the processors, executors, and targets in the flow and handles all flow processing after the source.
Each flow runner processes one batch at a time, just like a flow that runs on a single thread. When the flow of data slows, the flow runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time flow property to specify the interval or to opt out of empty batch generation.
Multithreaded flows preserve the order of records within each batch, just like a single-threaded flow. But since batches are processed by different flow runners, the order that batches are written to targets is not ensured.
The order of batch processing depends on many factors. For more information, see Processing queue.
For more information about multithreaded flows, see Multithreaded flow overview.
Example
Say you are reading from ten tables. You set the Number of Threads property to 5 and the Maximum Pool Size property to 6. When you start the flow, the source retrieves the list of tables. The source then creates five threads to read from the first five tables, and by default Data Collector creates a matching number of flow runners. Upon receiving data, a thread passes a batch to each of the flow runners for processing.
At any given moment, the five flow runners can each process a batch, so this multithreaded flow processes up to five batches at a time. When incoming data slows, the flow runners sit idle, available for use as soon as the data flow increases.
Multithreaded partition processing
By default, the Oracle Multitable Consumer source performs multithreaded partition processing for all tables that meet the partition processing requirements, and performs table processing for all other tables.
To perform multithreaded processing of partitions within a table, you enable partition processing in the table configuration, then specify the partition size and the maximum number of partitions to use. Limiting the number of partitions also limits the number of threads that can be dedicated to processing data in the table.
When you configure a set of tables for unlimited partitions, the source creates up to twice as many partitions as the flow thread count. For example, if you have 5 threads, the table can have up to 10 partitions.
Similar to multithreaded table processing, each thread reads data from a single partition, and each partition can have a maximum of one thread read from it at a time.
When processing partitions, the processing order depends on many factors. For a full description, see Processing queue.
Partition processing requirements
- Single key or offset column
- The table must have a single primary key or user-defined offset column. Performing multithreaded partition processing on a table with composite keys generates an error and stops the flow.
- Numeric data type
- To use partition processing, the primary key or user-defined offset column must have a numeric data type that allows arithmetic partitioning.
Multiple offset value handling
When processing partitions, the Oracle Multitable Consumer source can process multiple
records with the same offset value. For example, the source can process multiple records
with the same timestamp in a transaction_date offset column.
When you stop the flow as the source is processing a series of records with the same offset value, the source notes the offset. Then, when you restart the flow, it starts with a record with the next logical offset value, skipping any unprocessed records that use the same last-saved offset.
For example, say you specified a datetime column as a user-defined offset column, and five records in the table share the same datetime value. Now say you happen to stop the flow after it processes the second record. The flow stores the datetime value as the offset where it stopped. When you restart the flow, processing begins with the next datetime value, skipping the three unprocessed records with the last-saved offset value.
Best effort: Processing non-compliant tables
To process tables in a table configuration that might not meet the partition processing requirements, you can use the On (Best Effort) option when you configure the Multithreaded Partition Processing mode property.
When you select the best effort option, the source performs multithreaded partition processing for all tables that meet the partition processing requirements. The source performs multithreaded table processing for tables that include multiple key or offset columns. If you enable non-incremental processing, the source can also process all tables that do not include key or offset columns.
Non-incremental processing
You can configure the Oracle Multitable Consumer source to perform non-incremental processing for tables with no primary keys or user-defined offset columns. By default, the source performs incremental processing and does not process tables without a key or offset column.
You can enable non-incremental processing for the set of tables defined in a table configuration on the Tables tab.
Use On (Best Effort) or Off to perform non-incremental processing of the table. With either option selected, table is processed using a single thread, like multithreaded table processing.
- The source uses a single thread to process all available data in the table.
- After the source processes all available data, it notes that the table has been
processed as an offset. So, if you stop and restart the flow after the
source completes all processing, the source does not reprocess the table.
If you want to reprocess data in the table, you can reset the source before restarting the flow. This resets the source for all tables that the source processes.
- If the flow stops while the source is still processing available data, when the flow restarts, the source reprocesses the entire table. This occurs because the table has no key or offset column to allow for tracking progress.
For example, say you configure the source to use five threads and process a set of tables that includes a table with no key or offset column. To process data in this table, you enable the Enable Non-Incremental Load table configuration property. You also set Multithreaded Partition Processing Mode to On (Best Effort) to allow the source to use multithreaded partition processing when possible and allow both non-incremental processing and multithreaded table processing when needed.
When you start the flow, the source allocates one thread to the table that requires non-incremental processing. It processes the table data using multithreaded table processing until all data is processed. When the thread completes processing all available data, the source notes this as part of the offset and the thread becomes available to process data from other tables. In the meantime, the four other threads process data from the rest of the tables using multithreaded partition processing when possible.
Batch strategy
You can specify the batch strategy to use when processing data. The batch strategy behaves differently depending on whether you use multithreaded table processing or multithreaded partition processing.
Process all available rows
- Multithreaded table processing
-
When the source performs multithreaded table processing for all tables, each thread processes all available rows from a table. A thread runs a SQL query and processes all of the results for a table. Then, the thread switches to the next available table.
- Multithreaded partition processing
- Multithreaded partition processing is similar to multithreaded table processing, except that it works at a partition level. Each thread runs a SQL query for a partition and processes multiple batches of data from the results. When all data in the partition is processed, the thread switches to the next available partition.
Switch tables
- Multithreaded table processing
- When the source performs multithreaded table processing for all tables, each thread processes one batch of data, then switches to an available table and repeats the process. When a flow starts, each thread runs a SQL query, generates a result set, and processes a batch of records from the result set. The database driver caches the remaining records for the same thread to access again. Then, the thread switches to the next available table.
- Multithreaded partition processing
-
Multithreaded partition processing is similar to multithreaded table processing. The behavior around caching the result set and the number of batches to process from the result set is the same, but at a partition level. Threads also skip processing partitions that already have a result set cached from a different thread.
The difference is, when a thread works on a partition, all partitions from the same table are moved to the end of the processing queue. This allows the source to switch to the next available table.
For examples of how tables and partitions rotate through the processing queue, see Processing queue.
Initial table order strategy
- None
- Reads the tables in the order that they are listed in the database.
- Alphabetical
- Reads the tables in alphabetical order.
- Referential Constraints
- Reads the tables based on the dependencies between the tables. The source reads the parent table first, and then reads the child tables that refer to the parent table with a foreign key.
The source uses this table order only for the initial reading of the tables. When threads switch back to previously read tables, they read from the next available table, regardless of the defined order.
Processing queue
The Oracle Multitable Consumer source maintains a virtual queue to determine the data to process from different tables. The queue includes each table defined in the source. When a table is to be processed by partition, multiple partitions for the table are added to the queue, limited by the Max Partitions property defined for each table configuration on the Tables tab.
The source rotates and reorganizes the queue based on the Per Batch Strategy property. The source processes data from the queue with the threads specified in the Number of Threads property and the Batches from Result Set property.
The following scenarios to show how the queue works.
Multithreaded table processing only
- Multithreaded Partition Processing Mode property is set to Off.
- Multithreaded Partition Processing Mode property is set to On (Best Effort) and no tables meet the partition processing requirements.
A B C DWhen
a thread becomes available, it processes data from the first table in the queue. The
processing of the tables depends on how you define the Per Batch Strategy property:- Process All Available Rows in the Table
- With this batch strategy, threads process all data in a table before proceeding to the next table.
- Switch Tables
- With this batch strategy, each thread processes a single batch from a table before moving to the next available table, so the threads cycle through the tables quickly.
Multithreaded partition processing only
- Multithreaded Partition Processing Mode property is set to On (Required).
- Multithreaded Partition Processing Mode property is set to On (Best Effort) and all tables meet the partition processing requirements.
Say you have tables A, B, and C, and all three tables have a lot of data to process. Tables A and B are configured with a maximum of 3 active partitions. And since table C has the largest volume of data, you allow an unlimited number of partitions, which means double the number of threads for the flow. Again, let's use the alphabetical initial table ordering.
- Process All Available Rows in the Table
- As when processing tables, when processing partitions with this batch
strategy, threads process all data in a partition before proceeding to the
next partition. Note: In practice, this means that rows from subsequent tables can be processed before a previous table is completed, since available threads continue to pick up partitions from the queue.
- Switch Tables
- As when processing tables, when processing partitions with this batch strategy, each thread processes a single batch from a partition before moving to the next available partition, so the threads cycle through the partitions quickly.
Both multithreaded partition and table processing
- Multithreaded Partition Processing Mode property is set to On (Best Effort) and some tables meet the partition processing requirements while other tables do not.
When processing a mix of full tables and partitioned tables, the queue basically behaves the same as when processing only partitions, with full tables being processed as a single partition. Let's walk through it....
A B1 B2 B3 C1 C2 C3 C4 C5 C6When a
thread becomes available, it processes the first table or partition in the queue. The order of the
queue depends on how you define the Per Batch Strategy, as follows:- Process All Available Rows in the Table
- When processing tables and partitions with this batch strategy, threads process all data in a
table or partition before proceeding to the next partition. An unpartitioned table, like table A, is
processed like a table with a single partition.
When the flow starts, the 3 threads process a set of batches from table A and from partitions B1 and B2, leaving the queue like this:
B3 C1 C2 C3 C4 C5 C6As each thread completes processing, it picks up the next table or partition at the front of the queue. After each of the 3 threads takes another table or partition, the queue looks like this:C3 C4 C5 C6 - Switch Tables
- When processing tables and partitions with this batch strategy, each thread processes a single batch from a table or partition before moving to the next available table or partition, so the threads cycle through the queue quickly.
Generated records
The Oracle Multitable Consumer source generates record header attributes and field attributes that provide additional information about each record and field.
CRUD operation header attributes
- sdc.operation.type
- The source evaluates the operation type associated
with each entry that it processes. When appropriate, it writes the
operation type to the
sdc.operation.typerecord header attribute. - oracle.cdc.operation
- The source also writes the CRUD operation type to an
oracle.cdc.operationattribute. This attribute was implemented in an earlier release and is supported for backward compatibility.
JDBC header attributes
By default, the Oracle Multitable Consumer source generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record.
You can use the record:attribute or
record:attributeOrDefault functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with header attributes.
JDBC record header attributes include a jdbc prefix to differentiate
the JDBC attributes from other record header attributes.
You can configure the source to skip creating JDBC header attributes by clearing the Create JDBC Header Attributes property on the Advanced tab.
| JDBC Header Attribute | Description |
|---|---|
| jdbc.tables |
Provides a
comma-separated list of source tables for the fields in the
record.
Note: Not all JDBC drivers
provide this information.
|
| jdbc.partition | Provides the full offset key for the partition that produced the record. |
| jdbc.threadNumber | Provides the number of the thread that produced the record. |
| jdbc.vendor | Provides the name of the database vendor. |
| jdbc.<column name>.jdbcType | Provides the numeric value of the original SQL data type for each field in the record. See the Java documentation for a list of the data types that correspond to numeric values. |
| jdbc.<column name>.precision | Provides the original precision for all numeric and decimal fields. |
| jdbc.<column name>.scale | Provides the original scale for all numeric and decimal fields. |
| jdbc.primaryKeySpecification | Provides a JSON-formatted
string that lists the columns that form the primary key in the table and the
metadata for those columns. For example, a table with a
composite primary key contains the following attribute:
A table without a primary key
contains the attribute with an empty value:
|
JDBC field attributes
The Oracle Multitable Consumer source generates field attributes for columns converted to the Decimal or Datetime data types in Data Collector. The attributes provide additional information about each field.
- Decimal and Numeric data types are converted to the Data Collector Decimal data type, which does not store scale and precision.
- The Timestamp data type is converted to the Data Collector Datetime data type, which does not store nanoseconds.
| Data Collector Data Type | Generated Field Attribute | Description |
|---|---|---|
| Decimal | precision | Provides the original precision for every decimal or numeric column. |
| Decimal | scale | Provides the original scale for every decimal or numeric column. |
| Datetime | nanoSeconds | Provides the original nanoseconds for every timestamp column. |
You can use the record:fieldAttribute or
record:fieldAttributeOrDefault functions to access the information
in the attributes. For more information about working with field attributes, see Field attributes.
Event generation
The Oracle Multitable Consumer source can generate events that you can use in an event stream. When you enable event generation, the source generates an event when it completes processing the data returned by the specified queries for all tables. It also generates events when it completes processing the data returned from a table and the data returned from a schema.
- With the Pipeline Finisher executor to
stop the flow and transition the flow to a Finished state when the source completes processing available data.
For an example, see Stopping a flow after processing all available data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending email during flow processing.
-
With a target to store information about completed queries.
For an example, see Preserving an audit trail of events.
For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
Event records
| Record Header Attribute | Description |
|---|---|
| sdc.event.type | Event type. Uses one of the following types:
|
| sdc.event.version | Integer that indicates the version of the event record type. |
| sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
The Oracle Multitable Consumer source can generate the following event records:
- no-more-data
- The Oracle Multitable Consumer source generates a no-more-data event record when the source completes processing all data returned by the queries for all tables.
- schema-finished
- The Oracle Multitable Consumer source generates a schema-finished event record when the source completes processing all data within a schema.
- table-finished
- The Oracle Multitable Consumer source generates a table-finished event record when the source completes processing all data within a table.
Configuring an Oracle Multitable Consumer source
About this task
Configure an Oracle Multitable Consumer source to read Oracle database data from multiple tables.