Tracking remote tasks
Some events that are scheduled by an Airflow run in a location outside of Airflow. To enable running such events, you must establish the connection between the Airflow operator and subprocess execution.
Bypassing the context of the current Airflow task instance into a subprocess
Databand can track remote process execution in the context of your current Airflow DagRun and TaskInstance if it has the following variables available at the time of execution. Both JVM and Python SDK support the variables.
AIRFLOW_CTX_DAG_ID
-
The ID of the parent DAG that started the remote execution.
AIRFLOW_CTX_EXECUTION_DATE
-
The date when the remote execution started.
AIRFLOW_CTX_TASK_ID
-
Task ID to associate with a run.
AIRFLOW_CTX_TRY_NUMBER
-
The number of the attempts for the current run.
AIRFLOW_CTX_UID
-
Airflow instance unique identifier used to distinguish runs performed on different environments.
Additionally, remote execution needs to access the Databand service. You can use the following variables or any of the SDK configuration methods that are supported by Databand:
DBND__CORE__DATABAND_URL
-
Databand environment URL.
DBND__CORE__DATABAND_ACCESS_TOKEN
-
Databand access token.
DBND__TRACKING
-
The variable enabling jobs tracking.
By default, sensitive data like an access token is added to your spark-submit
command. To disable adding the data, use tracking_spark.provide_databand_service_endpoint
option in the dbnd_config
Airflow
connection properties:
{
"tracking_spark": {
"provide_databand_service_endpoint": false
}
}
Currently, bypassing execution context in addition to regular tracking is automatically supported for the following operators:
EmrAddStepsOperator
EmrPySparkOperator
DatabricksSubmitRunOperator
DataProcPySparkOperator
(Airflow 1.0)DataprocSubmitJobOperator
(Airflow 2.0+)DataprocSubmitPySparkJobOperator
(Airflow 2.0+)SparkSubmitOperator
BashOperator
SubDagOperator
For Spark-related operators, Databand provides an option to configure a Spark job with Databand agent and query listener. For more information, see Installing on a Spark cluster.
The Databand team is constantly integrating new operators for remote metadata tracking. Contact us if you don't see your operator on the list.
Custom integration
The best way to inject these variables is to use the already built-in mechanism of your remote operator if it has any. For example, you can pass these variables to your Spark operator by entering the following code:
from dbnd_airflow.utils import get_airflow_instance_uid
MyCustomDataProcPySparkOperator(
#...
dataproc_pyspark_properties= {
"spark.env.AIRFLOW_CTX_DAG_ID": "{{dag.dag_id}}",
"spark.env.AIRFLOW_CTX_EXECUTION_DATE": "{{ds}}",
"spark.env.AIRFLOW_CTX_TASK_ID": "{{task.task_id}}",
"spark.env.AIRFLOW_CTX_TRY_NUMBER": "{{ti.try_attempt}}",
"spark.env.AIRFLOW_CTX_UID": get_airflow_instance_uid(),
# static variables, can be set on the cluster itself
"spark.env.DBND__TRACKING": True,
"spark.env.DBND__CORE__DATABAND_URL": "https://tracker.databand.ai",
"spark.env.DBND__CORE__DATABAND_ACCESS_TOKEN=TOKEN"
}
# ...
)
If your operator doesn't have a way to provide environment variables in one of the supported formats, you can directly change the command line that you are generating.
from dbnd_airflow.utils import get_airflow_instance_uid
airflow_ctx_uid = get_airflow_instance_uid()
cmd =(f"spark-submit ... "
f"--conf spark.env.AIRFLOW_CTX_DAG_ID={context.dag.dag_id}"
f"--conf spark.env.AIRFLOW_CTX_EXECUTION_DATE={context.execution_date} "
f"--conf spark.env.AIRFLOW_CTX_TASK_ID={context.task.task_id} "
f"--conf spark.env.AIRFLOW_CTX_TRY_NUMBER={context.task_instance.try_attempt} "
f"--conf spark.env.AIRFLOW_CTX_UID={airflow_ctx_uid}"
)