You can query remote Confluent Tableflow tables using the IBM®
watsonx.data Spark engine through zero-copy data
federation. Spark supports both Confluent Managed Storage and provider-integrated storage
options.
Before you begin
Confluent requirements:
- Active Confluent Cloud account
- Kafka cluster with Tableflow-enabled topics
- Tableflow API key and secret
-
REST Catalog endpoint
To obtain credentials:
- Log in to Confluent Cloud
- Navigate to your Tableflow-enabled topic
- Create a new API key and note the key and secret
- Copy the REST Catalog endpoint (format:
https://tableflow.{region}.aws.confluent.cloud/iceberg/catalog/organizations/{org-id}/environments/{env-id})
watsonx.data requirements:
- Provisioned Spark engine
- Network connectivity to Confluent Cloud endpoint
Storage-specific requirements:
- Confluent Managed Storage: No additional requirements
- Customer integration (AWS S3):
- S3 bucket in the same region as your Kafka cluster
- S3 access key and secret key
Procedure
-
Configure Spark catalog properties for remote lakehouse access.
Create a configuration dictionary with your Tableflow connection details to enable zero-copy querying.
For Confluent managed storage:
tableflow_config = { "spark.sql.catalog.tableflow": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.tableflow.type": "rest", "spark.sql.catalog.tableflow.uri": "https://tableflow.{CLOUD_REGION}.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/ {ENV_ID}", "spark.sql.catalog.tableflow.credential": "{APIKEY}:{SECRET}", "spark.sql.catalog.tableflow.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.tableflow.rest-metrics-reporting-enabled": "false", "spark.sql.catalog.tableflow.s3.remote-signing-enabled": "true", "spark.sql.catalog.tableflow.client.region": "{CLOUD_REGION}" }
For Customer integration (AWS S3):
Add these additional properties:
"spark.sql.catalog.tableflow.s3.access-key-id": "{S3_ACCESS_KEY}", "spark.sql.catalog.tableflow.s3.secret-access-key": "{S3_SECRET_KEY}", "spark.sql.catalog.tableflow.s3.region": "{S3_REGION}"
Replace the placeholders:
{CLOUD_REGION}: Your Confluent cluster region (e.g., us-east-1)
{ORG_ID}: Your Confluent organization ID
{ENV_ID}: Your Confluent environment ID
{APIKEY}:{SECRET}: Your Tableflow API credentials
{S3_ACCESS_KEY}, {S3_SECRET_KEY}, {S3_REGION}: Your S3 credentials (for provider integration only)
Note: The catalog name tableflow is a local alias. You can use any name.
-
Choose a submission method
You can query remote Tableflow tables using one of three methods:
-
Query remote Tableflow tables
-
Open SparkLab in watsonx.data.
-
Create a new PySpark notebook.
-
Add the following code:
from pyspark.sql import SparkSession
# Create Spark session with TableFlow configuration
spark = (
SparkSession.builder
.appName("Query Confluent TableFlow")
.config("spark.sql.catalog.tableflow", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.tableflow.type", "rest")
.config("spark.sql.catalog.tableflow.uri", "https://tableflow.us-east-1.aws.confluent.cloud/iceberg/catalog/organizations/abc123/environments/env-xyz")
.config("spark.sql.catalog.tableflow.credential", "your-api-key:your-secret")
.config("spark.sql.catalog.tableflow.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.tableflow.s3.remote-signing-enabled", "true")
.config("spark.sql.catalog.tableflow.client.region", "us-east-1")
.getOrCreate()
)
# Discover available namespaces (Kafka cluster IDs)
print("Available namespaces:")
spark.sql("SHOW NAMESPACES IN tableflow").show()
# List tables in a namespace
namespace = "lkc-xxxxx" # Replace with your cluster ID
print(f"Tables in {namespace}:")
spark.sql(f"SHOW TABLES IN tableflow.`{namespace}`").show()
# Query a table
table_name = "your_topic_name"
df = spark.sql(f"SELECT * FROM tableflow.`{namespace}`.{table_name} LIMIT 10")
df.show()
# Get row count
count = spark.sql(f"SELECT COUNT(*) as count FROM tableflow.`{namespace}`.{table_name}").collect()[0]['count']
print(f"Total rows: {count}")
-
Run the notebook.
Using REST API:
- Create a PySpark script file (e.g.,
query_tableflow.py)
- Submit the application:
curl -X POST "https://{wxd-host}/lakehouse/api/v2/spark_engines/{engine_id}/applications" \
-H "Authorization: Bearer {token}" \
-H "Content-Type: application/json" \
-d '{
"application_details": {
"application": "s3://bucket/query_tableflow.py",
"conf": {
"spark.sql.catalog.tableflow": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.tableflow.type": "rest",
"spark.sql.catalog.tableflow.uri": "https://tableflow.us-east-1.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}",
"spark.sql.catalog.tableflow.credential": "{apikey}:{secret}",
"spark.sql.catalog.tableflow.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"spark.sql.catalog.tableflow.s3.remote-signing-enabled": "true",
"spark.sql.catalog.tableflow.client.region": "us-east-1"
}
}
}'
Using CPDCTL CLI:
cpdctl wx-data sparkjob create \
--engine-id {engine_id} \
--path s3://{bucket_name}/query_tableflow.py \
--conf '{
"spark.app.name":"{spark_app_name}",
"spark.sql.catalog.tableflow":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.tableflow.type":"rest",
"spark.sql.catalog.tableflow.uri":"https://tableflow.us-east-1.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}",
"spark.sql.catalog.tableflow.credential":"{apikey}:{secret}",
"spark.sql.catalog.tableflow.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
"spark.sql.catalog.tableflow.s3.remote-signing-enabled":"true",
"spark.sql.catalog.tableflow.client.region":"us-east-1"
}' \
--api-key {api_key}
Results
You can now query remote data from Confluent Tableflow without copying data. The tables automatically reflect new messages published to Kafka topics.
Example output
Available namespaces:
+------------+
|namespace |
+------------+
|lkc-5g8orq |
+------------+
Tables in lkc-5g8orq:
+------------+---------+-----------+
|namespace |tableName|isTemporary|
+------------+---------+-----------+
|lkc-5g8orq |topic_0 |false |
|lkc-5g8orq |topic_2 |false |
+------------+---------+-----------+
Total rows: 4
What to do next
Related information