The BigQuery Connector stage can be configured to run on multiple processing nodes and
read data from the data source. In this scenario, each of the processing nodes for the stage
retrieves a set (partition) of records from the data source. The partitions are then combined to
produce the entire result set for the output link of the stage.
To enable partitioned reads set the Enable partitioned reads property to
Yes.
When the stage is configured for partitioned reads, it runs the statement that is specified in
the Select statement property on each processing node. You can utilize
special placeholders in the statements to ensure that each of the processing nodes retrieves a
distinct partition of records from the data source.
The following placeholders can be used in the statements to specify distinct partitions of
records to retrieve in individual processing nodes:
- [[node-number]] - replaced at run time with the index of the current processing node. The
indexes are zero-based. The placeholder is replaced with the value 0 on the first processing node,
value 1 on the second processing node, value 2 on the third processing node, and so forth.
- [[node-count]] - the total number of processing nodes for the stage. By default this number is
the number of nodes in the parallel configuration file. The location of the parallel configuration
file is specified in the APT_CONFIG_FILE environment variable. The stage can be
configured to run on a subset of nodes from the parallel configuration file, by defining node
constraints on the Advanced page under the Stage page in
the stage editor.
When partitioned reads are enabled, the connector resolves any [[node-number]],
[[node-number-base-one]], and [[node-count]] placeholders in the Select
statement property value.
Ensure that the records returned by statements on individual processing nodes have matching field
definitions. For example, if the statement on one of the processing nodes returns records with the
columns C1 INTEGER, C2 VARCHAR(20), and C3 DATE, ensure that the statements on all the remaining
processing nodes return records with the same column definitions.
Example
Assume that the stage was configured to read records from the data source table TABLE1, which has
the following columns: C1 INTEGER, C2 VARCHAR(10) and C3 DATE. Also, assume that the stage was
configured to run in parallel on four processing nodes and perform partitioned reads. The
Select statement property was set to the following value:
SELECT C1, C2, C3 FROM TABLE1 WHERE MOD(C1, [[node-count]]) = [[node-number]]
The
connector runs the following statements on each of the processing nodes:
Table 1. Statements
that are run on each of the processing nodes
Node |
Statement |
1 |
SELECT C1, C2, C3 FROM TABLE1 WHERE MOD(C1, 4) = 0 |
2 |
SELECT C1, C2, C3 FROM TABLE1 WHERE MOD(C1, 4) = 1 |
3 |
SELECT C1, C2, C3 FROM TABLE1 WHERE MOD(C1, 4) = 2 |
4 |
SELECT C1, C2, C3 FROM TABLE1 WHERE MOD(C1, 4) = 3 |
Assume that the source table contains the following rows:
Table 2. Rows in the
source table
C1 |
C2 |
C3 |
1 |
Value one |
2021-01-01 |
2 |
Value two |
2021-01-02 |
3 |
Value three |
2021-01-03 |
4 |
Value four |
2021-01-04 |
5 |
Value five |
2021-01-05 |
6 |
Value six |
2021-01-06 |
7 |
Value seven |
2021-01-07 |
8 |
Value eight |
2021-01-08 |
9 |
Value nine |
2021-01-09 |
10 |
Value ten |
2021-01-10 |
The following rows are read by the first processing node 0:
Table 3. Rows
fetched by the first processing node
C1 |
C2 |
C3 |
4 |
Value four |
2021-01-04 |
8 |
Value eight |
2021-01-08 |
The following rows are fetched by the second processing node 1:
Table 4. Rows
fetched by the second processing node
C1 |
C2 |
C3 |
1 |
Value one |
2021-01-01 |
5 |
Value five |
2021-01-05 |
9 |
Value nine |
2021-01-09 |
The following rows are fetched by the third processing node 2:
Table 5. Rows
fetched by the third processing node
C1 |
C2 |
C3 |
2 |
Value two |
2021-01-02 |
6 |
Value six |
2021-01-06 |
10 |
Value ten |
2021-01-10 |
The following rows are fetched by the fourth processing node 3:
Table 6. Rows
fetched by the fourth processing node
C1 |
C2 |
C3 |
3 |
Value three |
2021-01-03 |
7 |
Value seven |
2021-01-07 |