Tracking remote tasks

The following steps describe how to track remote Airflow tasks.

Some events that are scheduled by an Airflow run in a location outside of Airflow. To enable running such events, you must establish the connection between the Airflow operator and subprocess execution.

Bypassing the context of the current Airflow task instance into a subprocess

Databand can track remote process execution in the context of your current Airflow DagRun and TaskInstance if it has the following variables available at the time of execution. Both JVM and Python SDK support the variables.

AIRFLOW_CTX_DAG_ID
The ID of the parent DAG that started the remote execution.
AIRFLOW_CTX_EXECUTION_DATE
The date when the remote execution started.
AIRFLOW_CTX_TASK_ID
Task ID to associate with a run.
AIRFLOW_CTX_TRY_NUMBER
The number of the attempts for the current run.
AIRFLOW_CTX_UID
Airflow instance unique identifier used to distinguish runs performed on different environments.

Additionally, remote execution needs to access the Databand service. You can use the following variables or any of the SDK configuration methods that are supported by Databand:

DBND__CORE__DATABAND_URL
Databand environment URL.
DBND__CORE__DATABAND_ACCESS_TOKEN
Databand access token.
DBND__TRACKING
The variable enabling jobs tracking.

By default, sensitive data like an access token is added to your spark-submit command. To disable adding the data, use tracking_spark.provide_databand_service_endpoint option in the dbnd_config Airflow connection properties:

{
    "tracking_spark": {
       "provide_databand_service_endpoint": false
    }
}

The Databand team is constantly integrating new operators for remote metadata tracking. Currently, bypassing execution context in addition to regular tracking is automatically supported for the following operators:

  • EmrAddStepsOperator
  • EmrPySparkOperator
  • DatabricksSubmitRunOperator
  • DataProcPySparkOperator (Airflow 1.0)
  • DataprocSubmitJobOperator (Airflow 2.0+)
  • DataprocSubmitPySparkJobOperator (Airflow 2.0+)
  • SparkSubmitOperator
  • BashOperator
  • SubDagOperator

For Spark-related operators, Databand provides an option to configure a Spark job with Databand agent and query listener. For more information, see Installing on a Spark cluster.

Custom integration

The best way to inject these variables is to use the already built-in mechanism of your remote operator if it has any. For example, you can pass these variables to your Spark operator by entering the following code:


from dbnd_airflow.utils import get_airflow_instance_uid

MyCustomDataProcPySparkOperator(
        #...
        dataproc_pyspark_properties= {
            "spark.env.AIRFLOW_CTX_DAG_ID": "{{dag.dag_id}}",
            "spark.env.AIRFLOW_CTX_EXECUTION_DATE": "{{ds}}",
            "spark.env.AIRFLOW_CTX_TASK_ID": "{{task.task_id}}",
            "spark.env.AIRFLOW_CTX_TRY_NUMBER": "{{ti.try_attempt}}",

            "spark.env.AIRFLOW_CTX_UID": get_airflow_instance_uid(),

             # static variables, can be set on the cluster itself
            "spark.env.DBND__TRACKING": True,
            "spark.env.DBND__CORE__DATABAND_URL": "https://tracker.databand.ai",
            "spark.env.DBND__CORE__DATABAND_ACCESS_TOKEN=TOKEN"
        }
        # ...
    )

If your operator doesn't have a way to provide environment variables in one of the supported formats, you can directly change the command line that you are generating.


from dbnd_airflow.utils import get_airflow_instance_uid

airflow_ctx_uid = get_airflow_instance_uid()
cmd =(f"spark-submit  ...  "
           f"--conf spark.env.AIRFLOW_CTX_DAG_ID={context.dag.dag_id}"
           f"--conf spark.env.AIRFLOW_CTX_EXECUTION_DATE={context.execution_date} "
           f"--conf spark.env.AIRFLOW_CTX_TASK_ID={context.task.task_id} "
           f"--conf spark.env.AIRFLOW_CTX_TRY_NUMBER={context.task_instance.try_attempt} "
           f"--conf spark.env.AIRFLOW_CTX_UID={airflow_ctx_uid}"
)