JDBC Query Consumer
The JDBC Query Consumer source reads database data using a user-defined SQL query through a JDBC connection. The source returns data as a map with column names and field values. For information about supported versions, see Supported systems and versions.
Data Collector includes database-specific sources, such as the Oracle Bulkload source. When available, use a database-specific source. Data Collector also provides CDC sources to process changed data and the JDBC Multitable Consumer source to perform database replication or to read from multiple tables in the same database.
When you configure the JDBC Query Consumer source, you define the SQL query that the source uses to read data from a single table or from a join of tables.
When you configure the source, you specify connection information, query interval, and custom JDBC configuration properties to determine how the source connects to the database. You can also use a connection to configure the source.
You configure the query mode and SQL query to define the data returned by the database. When in full query mode and reading from certain databases, you can use a stored procedure instead of a SQL query. When the source database has high-precision timestamps, such as IBM Db2 TIMESTAMP(9) fields, you can configure the source to write strings rather than datetime values to maintain the precision.
You can configure the JDBC Query Consumer source to perform change data capture for databases that store the information in a table. And you can specify what the source does when encountering an unsupported data type.
You can specify custom properties that your driver requires. You can configure advanced connection properties. To use a JDBC version older than 4.0, you specify the driver class name and define a health check query.
By default, the source generates JDBC record header and field attributes that provide additional information about each record and field.
The source can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
Database vendors and drivers
The JDBC Query Consumer source can read database data from multiple database vendors.
| Database Vendor | Supported Versions | Tested Versions |
|---|---|---|
| MySQL | MySQL 5.7 and later |
|
| Oracle |
Also supported:
|
|
| PostgreSQL |
|
|
| Microsoft SQL Server |
|
|
MySQL data types
The JDBC Query Consumer source converts MySQL data types into Data Collector data types.
| MySQL Data Type | Data Collector Data Type |
|---|---|
| Bigint | Long |
| Bigint Unsigned | Decimal |
| Binary | Byte Array |
| Blob | Byte Array |
| Char | String |
| Date | Date |
| Datetime | Datetime |
| Decimal | Decimal |
| Double | Double |
| Enum | String |
| Float | Float |
| Int | Integer |
| Int Unsigned | Long |
| Json | String |
| Linestring | Byte Array |
| Medium Int | Integer |
| Medium Int Unsigned | Long |
| Numeric | Decimal |
| Point | Byte Array |
| Polygon | Byte Array |
| Set | String |
| Smallint | Short |
| Smallint Unsigned | Integer |
| Text | String |
| Time | Time |
| Timestamp | Datetime |
| Tinyint, Tinyint Unsigned | Short |
| Varbinary | Byte Array |
| Varchar | String |
| Year | Date |
Oracle data types
The JDBC Query Consumer source converts Oracle data types into Data Collector data types.
| Oracle Data Type | Data Collector Data Type |
|---|---|
| Number | Decimal |
| Char | String |
| Varchar, Varchar2 | String |
| Nchar, NvarChar2 | String |
| Binary_float | Float |
| Binary_double | Double |
| Date | Datetime |
| Timestamp | Datetime |
| Timestamp with time zone | Zoned_datetime |
| Timestamp with local time zone | Zoned_datetime |
| Long | String |
| Blob | Byte_array |
| Clob | String |
| Nclob | String |
| XMLType | String |
PostgreSQL data types
The JDBC Query Consumer source converts PostgreSQL data types into Data Collector data types.
| PostgreSQL Data Type | Data Collector Data Type |
|---|---|
| Bigint | Long |
| Boolean | Boolean |
| Bytea | Byte Array |
| Char | String |
| Date | Date |
| Decimal | Decimal |
| Double Precision | Double |
| Enum | String |
| Integer | Integer |
| Money | Double |
| Numeric | Decimal |
| Real | Float |
| Smallint | Short |
| Text | String |
| Time, Time with Time Zone | Time |
| Timestamp, Timestamp with Time Zone | Time |
| Varchar | String |
SQL Server data types
The JDBC Query Consumer source converts SQL Server data types into Data Collector data types.
| SQL Server Data Type | Data Collector Data Type |
|---|---|
| Bigint | Long |
| Binary | Byte_Array |
| Bit | Boolean |
| Char | String |
| Date | Date |
| Datetime, Datetime2 | Datetime |
| Datetimeoffset | Zoned_datetime |
| Decimal | Decimal |
| Float | Double |
| Image | Byte_Array |
| Int | Integer |
| Money | Decimal |
| Nchar | String |
| Ntext | String |
| Numeric | Decimal |
| Nvarchar | String |
| Real | Float |
| Smalldatetime | Datetime |
| Smallint | Short |
| Smallmoney | Decimal |
| Text | String |
| Time | Time |
| Tinyint | Short |
| Varbinary | Byte_Array |
| Varchar | String |
| XML | String |
Unsupported data types
- Stops the flow
- If the stage encounters an unsupported data type, the stage stops the
flow after completing the processing of the previous records and
displays the following
error:
JDBC_37 - Unsupported type 1111 for column. - Converts to string
- If the stage encounters an unsupported data type, the stage converts the data to string when possible, and then continues processing. Not all unsupported data types can successfully be converted to string. When using this option, verify that the data is converted to string as expected.
Installing the JDBC driver
You install the driver into the JDBC stage library, streamsets-datacollector-jdbc-lib, which includes the source.
To use the JDBC driver
with multiple stage libraries, install the driver into each stage library associated
with the stages.
For example, if you want to use a MySQL JDBC driver with the JDBC Lookup processor
and with the MySQL Binary Log source, you install the driver as an external library
for the JDBC stage library, streamsets-datacollector-jdbc-lib, and for the MySQL Binary Log stage library, streamsets-datacollector-mysql-binlog-lib.
For information about installing additional drivers, see Install external libraries.
Offset column and offset value
The JDBC Query Consumer source uses an offset column and initial offset value to determine where to start reading data within a table. Include both the offset column and the offset value in the WHERE clause of the SQL query.
The offset column must be a column in the table with unique non-null values, such as a primary key or indexed column. The initial offset value is a value within the offset column where you want the source to start reading.
When the source performs an incremental query, you must configure the offset column and offset value. For full queries, you can optionally configure them.
Full and incremental mode
The JDBC Query Consumer source can perform queries in two modes:
- Incremental mode
- To use incremental mode, you must select the Incremental Mode property and configure an offset column and initial offset value for the source. When you define the SQL query, you must use the ${OFFSET} parameter to represent the offset value in the WHERE clause.
- Full mode
- To use full mode, you must clear the Incremental Mode property for the source. You can optionally configure an offset column and initial offset value and can define any type of SQL query.
Recovery
The JDBC Query Consumer source supports recovery after a deliberate or unexpected stop when it performs incremental queries. Recovery is not supported for full queries.
In incremental mode, the source uses offset values in the offset column to determine where to continue processing after a deliberate or unexpected stop. To ensure seamless recovery in incremental mode, use a primary key or indexed column as the offset column. As the JDBC Query Consumer source processes data, it tracks the offset value internally. When the flow stops, the source notes where it stopped processing data. When you restart the flow, it continues from the last-saved offset.
When the JDBC Query Consumer source performs full queries, the source runs the full query again after you restart the flow.
SQL query
The SQL query defines the data returned from the database. You define the query in the SQL Query property on the JDBC tab.
runtime:loadResource
function in the SQL Query property to load the query from the resource file at runtime.
For example, you might enter the following expression for the
property:${runtime:loadResource("myquery.sql", false)}When running the source in full query mode and reading from certain databases, you can define a stored procedure, then call the stored procedure using the SQL Query property.
SQL query for incremental mode
When you define the SQL query for incremental mode, the JDBC Query Consumer source requires a WHERE and ORDER BY clause in the query.
Use the following guidelines when you define the WHERE and ORDER BY clauses in the query:
- In the WHERE clause, include the offset column and the offset value
- The source uses an offset column and value to determine the data that is returned. Include both in the WHERE clause of the query.
- Use the OFFSET parameter to represent the offset value
- In the WHERE clause, use ${OFFSET} to represent the offset value.
- In the ORDER BY clause, include the offset column as the first column
- To avoid returning duplicate data, use the offset column as the first column in the ORDER BY clause.
invoice table where the ID column is the offset column. The query
returns all data where the ID is greater than the offset and orders the data by the
ID: SELECT * FROM invoice WHERE id > ${OFFSET} ORDER BY idSQL query for full mode
You can define any type of SQL query for full mode.
invoice table:SELECT * FROM invoiceWhen you define the SQL query for full mode, you can optionally include the WHERE and ORDER BY clauses using the same guidelines as for incremental mode. However, using these clauses to read from large tables can cause performance issues.
Stored procedure in full mode
When reading from certain databases, you can call a stored procedure from the JDBC Query Consumer source. Currently, you can use stored procedures with MySQL, PostgreSQL, and SQL Server databases.
You can call a stored procedure when using the JDBC Query Consumer source in full mode. Using stored procedures in other modes is not supported.
- In your database, define the stored procedure.
- In the source, on the JDBC tab, configure the SQL Query property to call the stored procedure. Use the appropriate syntax for your database.
- Also on the JDBC tab, clear the Incremental Mode property, which is selected by default.
- Test the flow to ensure that the procedure performs as expected.
Examples
- MySQL database
- To read all data from a MySQL table, you might create a stored procedure
as
follows:
CREATE PROCEDURE <procedure_name>() BEGIN SELECT * FROM <table_name>; END; - PostgreSQL database
- To read all data from a PostgreSQL table, you might create a stored
procedure as
follows:
create or replace function <procedure_name>() returns table (id int) language plpgsql as $$ begin return query select * from <table_name>; end;$$ - SQL Server database
- To read all data from a SQL Server table, you might create a stored
procedure as
follows:
CREATE PROCEDURE <procedure_name> AS SELECT * FROM <table_name> RETURN
JDBC attributes
The JDBC Query Consumer source generates record header attributes and field attributes that provide additional information about each record and field. The source receives these details from the JDBC driver.
JDBC header attributes
By default, the JDBC Query Consumer source generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record. The source receives these details from the JDBC driver.
You can use the record:attribute or
record:attributeOrDefault functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with header attributes.
JDBC header attributes include a user-defined prefix to
differentiate the JDBC header attributes from other record header attributes. By
default, the prefix is jdbc.
You can change the prefix that the source uses and you can configure the source not to create JDBC header attributes with the Create JDBC Header Attributes and JDBC Header Prefix properties on the Advanced tab.
| JDBC Header Attribute | Description |
|---|---|
| <JDBC prefix>.tables |
Provides a
comma-separated list of source tables for the fields in the
record.
Note: Not all JDBC drivers
provide this information.
For example, at this time, the MySQL MariaDB and PostgreSQL drivers provide a comma-separated list of source tables in random order. In contrast, the Oracle and Microsoft SQL Server drivers provide only an empty string. |
| <JDBC prefix>.<column name>.jdbcType | Provides the numeric value of the original SQL data type for each field in the record. See the Java documentation for a list of the data types that correspond to numeric values. |
| <JDBC prefix>.<column name>.precision | Provides the original precision for all numeric and decimal fields. |
| <JDBC prefix>.<column name>.scale | Provides the original scale for all numeric and decimal fields. |
JDBC field attributes
The JDBC Query Consumer source generates field attributes for columns converted to the Decimal or Datetime data types in Data Collector. The attributes provide additional information about each field.
- Decimal and Numeric data types are converted to the Data Collector Decimal data type, which does not store scale and precision.
- The Timestamp data type is converted to the Data Collector Datetime data type, which does not store nanoseconds.
| Data Collector Data Type | Generated Field Attribute | Description |
|---|---|---|
| Decimal | precision | Provides the original precision for every decimal or numeric column. |
| Decimal | scale | Provides the original scale for every decimal or numeric column. |
| Datetime | nanoSeconds | Provides the original nanoseconds for every timestamp column. |
You can use the record:fieldAttribute or
record:fieldAttributeOrDefault functions to access the information
in the attributes. For more information about working with field attributes, see Field attributes.
Event generation
The JDBC Query Consumer source can generate events that you can use in an event stream. When you enable event generation, the source generates an event when it completes processing the data returned by the specified query. The source also generates an event when a query completes successfully and when it fails to complete.
- With the Pipeline Finisher executor to stop the flow and transition the flow to a Finished state when the source
completes processing available data.
When you restart a flow stopped by the Pipeline Finisher executor, the source processes data based on how you configured the source. For example, if you configure the source to run in incremental mode, the source saves the offset when the executor stops the flow. When it restarts, the source continues processing from the last-saved offset. In contrast, if you configure the source to run in full mode, when you restart the flow, the source uses the initial offset, if specified.
For an example, see Stopping a flow after processing all available data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending email during flow processing.
-
With a target to store information about completed queries.
For an example, see Preserving an audit trail of events.
For more information about dataflow triggers and the event framework, see Dataflow triggers overview.
Event record
| Record Header Attribute | Description |
|---|---|
| sdc.event.type | Event type. Uses one of the following types:
|
| sdc.event.version | Integer that indicates the version of the event record type. |
| sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
- No-more-data
- The source generates a no-more-data event record when it completes processing all data returned by a query.
- Query success
- The source generates a query success event record when it completes processing the data returned from a query.
- Query failure
- The source generates a query failure event record when it fails to complete processing the data returned from a query.
Configuring a JDBC Query Consumer source
About this task
Configure a JDBC Query Consumer source to use a single configured SQL query to read database data through a JDBC connection.