Querying Confluent Tableflow using Spark engine

You can query remote Confluent Tableflow tables using the IBM® watsonx.data Spark engine through zero-copy data federation. Spark supports both Confluent Managed Storage and provider-integrated storage options.

Before you begin

Confluent requirements:

  • Active Confluent Cloud account
  • Kafka cluster with Tableflow-enabled topics
  • Tableflow API key and secret
  • REST Catalog endpoint

    To obtain credentials:

    1. Log in to Confluent Cloud
    2. Navigate to your Tableflow-enabled topic
    3. Create a new API key and note the key and secret
    4. Copy the REST Catalog endpoint (format: https://tableflow.{region}.aws.confluent.cloud/iceberg/catalog/organizations/{org-id}/environments/{env-id})

watsonx.data requirements:

  • Provisioned Spark engine
  • Network connectivity to Confluent Cloud endpoint

Storage-specific requirements:

  • Confluent Managed Storage: No additional requirements
  • Customer integration (AWS S3):
    • S3 bucket in the same region as your Kafka cluster
    • S3 access key and secret key

Procedure

  1. Configure Spark catalog properties for remote lakehouse access.

    Create a configuration dictionary with your Tableflow connection details to enable zero-copy querying.

    For Confluent managed storage:

    tableflow_config = { "spark.sql.catalog.tableflow": "org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.tableflow.type": "rest", "spark.sql.catalog.tableflow.uri": "https://tableflow.{CLOUD_REGION}.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/ {ENV_ID}", "spark.sql.catalog.tableflow.credential": "{APIKEY}:{SECRET}", "spark.sql.catalog.tableflow.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.tableflow.rest-metrics-reporting-enabled": "false", "spark.sql.catalog.tableflow.s3.remote-signing-enabled": "true", "spark.sql.catalog.tableflow.client.region": "{CLOUD_REGION}" }

    For Customer integration (AWS S3):

    Add these additional properties:

    "spark.sql.catalog.tableflow.s3.access-key-id": "{S3_ACCESS_KEY}", "spark.sql.catalog.tableflow.s3.secret-access-key": "{S3_SECRET_KEY}", "spark.sql.catalog.tableflow.s3.region": "{S3_REGION}"

    Replace the placeholders:

    • {CLOUD_REGION}: Your Confluent cluster region (e.g., us-east-1)
    • {ORG_ID}: Your Confluent organization ID
    • {ENV_ID}: Your Confluent environment ID
    • {APIKEY}:{SECRET}: Your Tableflow API credentials
    • {S3_ACCESS_KEY}, {S3_SECRET_KEY}, {S3_REGION}: Your S3 credentials (for provider integration only)
    Note: The catalog name tableflow is a local alias. You can use any name.
  2. Choose a submission method

    You can query remote Tableflow tables using one of three methods:

    Method Best for Documentation
    SparkLab (VS Code) Interactive development, testing, debugging VS Code Development Environment
    REST API Automation, CI/CD pipelines, programmatic control Submitting Spark Application by using REST API
    CPDCTL CLI Command-line submissions, shell scripts, DevOps workflows Submitting Spark Application by using CPDCTL
  3. Query remote Tableflow tables

    Using SparkLab:

    1. Open SparkLab in watsonx.data.
    2. Create a new PySpark notebook.
    3. Add the following code:
      from pyspark.sql import SparkSession
      
      # Create Spark session with TableFlow configuration
      spark = (
          SparkSession.builder
          .appName("Query Confluent TableFlow")
          .config("spark.sql.catalog.tableflow", "org.apache.iceberg.spark.SparkCatalog")
          .config("spark.sql.catalog.tableflow.type", "rest")
          .config("spark.sql.catalog.tableflow.uri", "https://tableflow.us-east-1.aws.confluent.cloud/iceberg/catalog/organizations/abc123/environments/env-xyz")
          .config("spark.sql.catalog.tableflow.credential", "your-api-key:your-secret")
          .config("spark.sql.catalog.tableflow.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
          .config("spark.sql.catalog.tableflow.s3.remote-signing-enabled", "true")
          .config("spark.sql.catalog.tableflow.client.region", "us-east-1")
          .getOrCreate()
      )
      
      # Discover available namespaces (Kafka cluster IDs)
      print("Available namespaces:")
      spark.sql("SHOW NAMESPACES IN tableflow").show()
      
      # List tables in a namespace
      namespace = "lkc-xxxxx"  # Replace with your cluster ID
      print(f"Tables in {namespace}:")
      spark.sql(f"SHOW TABLES IN tableflow.`{namespace}`").show()
      
      # Query a table
      table_name = "your_topic_name"
      df = spark.sql(f"SELECT * FROM tableflow.`{namespace}`.{table_name} LIMIT 10")
      df.show()
      
      # Get row count
      count = spark.sql(f"SELECT COUNT(*) as count FROM tableflow.`{namespace}`.{table_name}").collect()[0]['count']
      print(f"Total rows: {count}")
  4. Run the notebook.

    Using REST API:

    1. Create a PySpark script file (e.g., query_tableflow.py)
    2. Submit the application:
    curl -X POST "https://{wxd-host}/lakehouse/api/v2/spark_engines/{engine_id}/applications" \
      -H "Authorization: Bearer {token}" \
      -H "Content-Type: application/json" \
      -d '{
        "application_details": {
          "application": "s3://bucket/query_tableflow.py",
          "conf": {
            "spark.sql.catalog.tableflow": "org.apache.iceberg.spark.SparkCatalog",
            "spark.sql.catalog.tableflow.type": "rest",
            "spark.sql.catalog.tableflow.uri": "https://tableflow.us-east-1.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}",
            "spark.sql.catalog.tableflow.credential": "{apikey}:{secret}",
            "spark.sql.catalog.tableflow.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
            "spark.sql.catalog.tableflow.s3.remote-signing-enabled": "true",
            "spark.sql.catalog.tableflow.client.region": "us-east-1"
          }
        }
      }'

    Using CPDCTL CLI:

    cpdctl wx-data sparkjob create \
      --engine-id {engine_id} \
      --path s3://{bucket_name}/query_tableflow.py \
      --conf '{
        "spark.app.name":"{spark_app_name}",
        "spark.sql.catalog.tableflow":"org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.tableflow.type":"rest",
        "spark.sql.catalog.tableflow.uri":"https://tableflow.us-east-1.aws.confluent.cloud/iceberg/catalog/organizations/{ORG_ID}/environments/{ENV_ID}",
        "spark.sql.catalog.tableflow.credential":"{apikey}:{secret}",
        "spark.sql.catalog.tableflow.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
        "spark.sql.catalog.tableflow.s3.remote-signing-enabled":"true",
        "spark.sql.catalog.tableflow.client.region":"us-east-1"
      }' \
      --api-key {api_key}

Results

You can now query remote data from Confluent Tableflow without copying data. The tables automatically reflect new messages published to Kafka topics.

Example output

Available namespaces:
+------------+
|namespace   |
+------------+
|lkc-5g8orq  |
+------------+

Tables in lkc-5g8orq:
+------------+---------+-----------+
|namespace   |tableName|isTemporary|
+------------+---------+-----------+
|lkc-5g8orq  |topic_0  |false      |
|lkc-5g8orq  |topic_2  |false      |
+------------+---------+-----------+

Total rows: 4

What to do next

Related information