Tracking remote tasks

Some events that are scheduled by an Airflow run in a location outside of Airflow. To enable running such events, you must establish the connection between the Airflow operator and subprocess execution.

Bypassing the context of the current Airflow task instance into a subprocess

Databand can track remote process execution in the context of your current Airflow DagRun and TaskInstance if it has the following variables available at the time of execution. Both JVM and Python SDK support the variables.

AIRFLOW_CTX_DAG_ID

The ID of the parent DAG that started the remote execution.

AIRFLOW_CTX_EXECUTION_DATE

The date when the remote execution started.

AIRFLOW_CTX_TASK_ID

Task ID to associate with a run.

AIRFLOW_CTX_TRY_NUMBER

The number of the attempts for the current run.

AIRFLOW_CTX_UID

Airflow instance unique identifier used to distinguish runs performed on different environments.

Additionally, remote execution needs to access the Databand service. You can use the following variables or any of the SDK configuration methods that are supported by Databand:

DBND__CORE__DATABAND_URL

Databand environment URL.

DBND__CORE__DATABAND_ACCESS_TOKEN

Databand access token.

DBND__TRACKING

The variable enabling jobs tracking.

By default, sensitive data like an access token is added to your spark-submit command. To disable adding the data, use tracking_spark.provide_databand_service_endpoint option in the dbnd_config Airflow connection properties:

{
    "tracking_spark": {
       "provide_databand_service_endpoint": false
    }
}

Currently, bypassing execution context in addition to regular tracking is automatically supported for the following operators:

  • EmrAddStepsOperator
  • EmrPySparkOperator
  • DatabricksSubmitRunOperator
  • DataProcPySparkOperator (Airflow 1.0)
  • DataprocSubmitJobOperator (Airflow 2.0+)
  • DataprocSubmitPySparkJobOperator (Airflow 2.0+)
  • SparkSubmitOperator
  • BashOperator
  • SubDagOperator

For Spark-related operators, Databand provides an option to configure a Spark job with Databand agent and query listener. For more information, see Installing on a Spark cluster.

The Databand team is constantly integrating new operators for remote metadata tracking. Contact us if you don't see your operator on the list.

Custom integration

The best way to inject these variables is to use the already built-in mechanism of your remote operator if it has any. For example, you can pass these variables to your Spark operator by entering the following code:


from dbnd_airflow.utils import get_airflow_instance_uid

MyCustomDataProcPySparkOperator(
        #...
        dataproc_pyspark_properties= {
            "spark.env.AIRFLOW_CTX_DAG_ID": "{{dag.dag_id}}",
            "spark.env.AIRFLOW_CTX_EXECUTION_DATE": "{{ds}}",
            "spark.env.AIRFLOW_CTX_TASK_ID": "{{task.task_id}}",
            "spark.env.AIRFLOW_CTX_TRY_NUMBER": "{{ti.try_attempt}}",

            "spark.env.AIRFLOW_CTX_UID": get_airflow_instance_uid(),

             # static variables, can be set on the cluster itself
            "spark.env.DBND__TRACKING": True,
            "spark.env.DBND__CORE__DATABAND_URL": "https://tracker.databand.ai",
            "spark.env.DBND__CORE__DATABAND_ACCESS_TOKEN=TOKEN"
        }
        # ...
    )

If your operator doesn't have a way to provide environment variables in one of the supported formats, you can directly change the command line that you are generating.


from dbnd_airflow.utils import get_airflow_instance_uid

airflow_ctx_uid = get_airflow_instance_uid()
cmd =(f"spark-submit  ...  "
           f"--conf spark.env.AIRFLOW_CTX_DAG_ID={context.dag.dag_id}"
           f"--conf spark.env.AIRFLOW_CTX_EXECUTION_DATE={context.execution_date} "
           f"--conf spark.env.AIRFLOW_CTX_TASK_ID={context.task.task_id} "
           f"--conf spark.env.AIRFLOW_CTX_TRY_NUMBER={context.task_instance.try_attempt} "
           f"--conf spark.env.AIRFLOW_CTX_UID={airflow_ctx_uid}"
)