Using the API
The section provides examples on using API.
Example 1
The following is an example of how to use the API without integrating with a specific tool. It consists of a payload and the request to Databand.
Example
import gzip
import json
import uuid
from datetime import datetime, timedelta
import pytz
import requests
run_uid = uuid.uuid4()
simple_payload = [
{
"eventType": "FAIL",
"eventTime": datetime.utcnow().replace(tzinfo=pytz.utc),
"inputs": [],
"job": {"facets": {}, "namespace": "airflow-prod", "name": "my_dag"},
"outputs": [],
"run": {
"facets": {
"nominalTime": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/NominalTimeRunFacet.json",
"nominalStartTime": datetime.utcnow().replace(tzinfo=pytz.utc),
},
"log": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"logBody": "very helpful log.. and very long",
"logUrl": "https://bucket.s3.somewhere.com/.../file.log",
},
"startTime": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"startTime": datetime.utcnow().replace(tzinfo=pytz.utc)
- timedelta(minutes=5),
},
"errorMessage": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ErrorMessageRunFacet.json",
"message": "org.apache.spark.sql.AnalysisException: Table or view not found: wrong_table_name; line 1 pos 14",
"programmingLanguage": "JAVA",
"stackTrace": 'Exception in thread "main" java.lang.RuntimeException: A test exception\nat io.openlineage.SomeClass.method(SomeClass.java:13)\nat io.openlineage.SomeClass.anotherMethod(SomeClass.java:9)',
},
"tags": {
"projectName": "test_project",
"runName": "test_run_name",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"_producer": "https://some.producer.com/version/1.0",
},
},
"runId": run_uid,
},
"producer": "https://custom.api",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",
},
{
"eventTime": datetime.utcnow().replace(tzinfo=pytz.utc),
"eventType": "FAIL",
"job": {
"facets": {},
"namespace": "airflow-prod",
"name": "my_dag.failing_task_with_log",
},
"run": {
"facets": {
"nominalTime": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/SQLJobFacet.json",
"nominalStartTime": datetime.utcnow().replace(tzinfo=pytz.utc),
},
"log": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"logBody": "very helpful log.. and very long",
"logUrl": "https://bucket.s3.somewhere.com/.../file.log",
},
"startTime": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
"startTime": datetime.utcnow().replace(tzinfo=pytz.utc)
- timedelta(minutes=5),
},
"errorMessage": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ErrorMessageRunFacet.json",
"message": "org.apache.spark.sql.AnalysisException: Table or view not found: wrong_table_name; line 1 pos 14",
"programmingLanguage": "JAVA",
"stackTrace": 'Exception in thread "main" java.lang.RuntimeException: A test exception\nat io.openlineage.SomeClass.method(SomeClass.java:13)\nat io.openlineage.SomeClass.anotherMethod(SomeClass.java:9)',
},
"parent": {
"_producer": "https://some.producer.com/version/1.0",
"_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet",
"job": {"name": "my_dag", "namespace": "airflow-prod"},
"run": {"runId": run_uid},
},
},
"runId": uuid.uuid4(),
},
"producer": "https://custom.api",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",
},
]
ACCESS_TOKEN = "<your_access_token>"
DATABAND_CUSTOM_INTEGRATION_FULL_API = "<your_api_endpoint_provided_in_ui>"
if __name__ == "__main__":
resp = requests.post(
DATABAND_CUSTOM_INTEGRATION_FULL_API,
data=gzip.compress(json.dumps(simple_payload,default=str).encode("utf-8")),
headers={
"Authorization": f"Bearer {ACCESS_TOKEN}",
"Content-Type": "application/json",
"Content-Encoding": "gzip",
},
timeout=30,
)
if resp.ok:
print("success")
else:
resp.raise_for_status()
Example 2
The following example is a Python script that shows how to map from other tools (in this example - GitLab) to the Databand schema. The example shows how you can create a monitor and provide the data to the Databand API. You do not have to use the following example to map from your orchestration or data integration tools, but any code you supply must include the following information:
- How often to pull the data
- A way to pull the data from your orchestration or data integration tool
- A conversion to the Databand schema
- A request to the custom integration API
This example represents the request to the custom integration API. It uses an openlineage_api
function that sends a compressed JSON payload, which contains tracking data, to the specified endpoint. The example uses a POST request
with gzip compression and authorization headers.
In this example, the run corresponds to the individual execution of a CICD pipeline. From the point that this pipeline begins until it either completes successfully, fails, or is cancelled, everything that happens during that time is considered part of a single run. Over time, Databand will build a history of runs for this pipeline. As a result, users will be able to go back to any historical run of this pipeline to see all of the metrics and logs that were collected at the time.
Within a GitLab pipeline, there are stages. Stages act as groups of tasks within a pipeline, but the individual stages are not reported as their own jobs. With Databand’s API, each stage can be mapped as a task within its parent pipeline. This means that in Databand, all metadata are logically divided and assigned to its corresponding stage. In the event of a pipeline incident, the user will be able to:
- Tell the specific state and duration of each individual stage
- View logs and error messages in the context of the stage in which they occurred
Example
import gzip
import json
import requests
class DbndClient:
def __init__(
self, access_token: str, databand_custom_integration_full_api: str
) -> None:
self.default_headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
self.databand_custom_integration_full_api = databand_custom_integration_full_api
def openlineage_api(self, data: list[dict]) -> None:
resp = requests.post(
self.databand_custom_integration_full_api,
data=gzip.compress(json.dumps(data).encode("utf-8")),
headers=self.default_headers | {"Content-Encoding": "gzip"},
timeout=30,
)
resp.raise_for_status()
This example code interacts with GitLab, using a GitlabClient
function to retrieve and process information about your pipelines and jobs.
import logging
from collections import defaultdict
from datetime import datetime
from functools import partial
from time import sleep
from typing import Any
from requests import Session
logger = logging.getLogger(__file__)
class GitlabPipelineInfo:
pipeline: dict
job_stages: dict
class GitlabClient:
def __init__(self, gitlab_project_id: str, gitlab_token: str) -> None:
self.gitlab_project_id = gitlab_project_id
self.session = Session()
self.session.request = partial(self.session.request, timeout=15)
self.session.headers.update({"Private-Token": gitlab_token}) # type: ignore
@property
def gitlab_base_url(self):
return f"https://gitlab.com/api/v4/projects/{self.gitlab_project_id}"
def _get(self, url):
"""
Performs a GET request to a specified URL with retry logic for rate limits.
Args:
url (str): The URL to send the GET request to.
Returns:
requests.Response: The response object from the request.
"""
while True:
resp = self.session.get(url)
if not resp.ok:
logger.warning("Failed fetching url %s: %s", url, resp)
if resp.status_code != 429:
return resp
logger.info("got 429, sleeping")
sleep(60)
def list_pipelines(self, last_updated_after: datetime) -> Any:
"""
Lists GitLab pipelines updated after a specified time.
Args:
last_updated_after (str): ISO 8601 formatted string representing the starting point for fetching pipelines.
Returns:
list[dict]: A list of dictionaries representing pipelines.
[]: An empty list if no pipelines are found or the request fails.
"""
resp = self._get(
f"{self.gitlab_base_url}/pipelines?updated_after={last_updated_after.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}&order_by=updated_at&sort=asc&per_page=100"
)
if resp.ok:
return resp.json()
return []
def get_pipeline(self, pipeline_id: int) -> Any:
"""
Fetches details of a specific GitLab pipeline by its ID.
Args:
pipeline_id (int): The ID of the pipeline to retrieve.
Returns:
dict: A dictionary containing pipeline details if the request is successful.
None: If the pipeline is not found or the request fails.
"""
resp = self._get(f"{self.gitlab_base_url}/pipelines/{pipeline_id}")
if resp.ok:
return resp.json()
return None
def get_jobs(self, pipeline_id: int) -> Any:
"""
Retrieves all jobs associated with a given GitLab pipeline.
Args:
pipeline_id (int): The ID of the pipeline to retrieve jobs from.
Returns:
list[dict]: A list of dictionaries containing job details.
[]: An empty list if no jobs are found or the request fails.
"""
resp = self._get(
f"{self.gitlab_base_url}/pipelines/{pipeline_id}/jobs?per_page=100&include_retried=true"
)
if resp.ok:
return resp.json()
return []
def get_pipeline_full_info(self, pipeline_id: int) -> GitlabPipelineInfo:
"""
Retrieves all jobs associated with a given GitLab pipeline.
Args:
pipeline_id (int): pipeline ID to retrieve info for.
Returns:
GitlabPipelineInfo: pipeline full info (pipeline + jobs details)
"""
pipeline_run = GitlabPipelineInfo()
pipeline_run.pipeline = self.get_pipeline(pipeline_id)
if not pipeline_run.pipeline:
logger.error("Can't get pipeline %s information from Gitlab")
jobs = self.get_jobs(pipeline_id)
if not jobs:
logger.info("No jobs found for pipeline %s on Gitlab", pipeline_id)
sorted_jobs: list[Any] = sorted(jobs, key=lambda x: x["created_at"])
stages: dict[str, list[Any]] = defaultdict(list)
# group by stage
for job in sorted_jobs:
stages[job["stage"]].append(job)
pipeline_run.job_stages = stages
return pipeline_run
@staticmethod
def get_last_pipeline_datetime(
pipeline: GitlabPipelineInfo, last_pipeline_datetime: datetime
) -> datetime:
created_at = datetime.strptime(
pipeline.pipeline.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ"
)
if not last_pipeline_datetime:
return created_at
if last_pipeline_datetime < created_at:
return created_at
return last_pipeline_datetime
GITLAB_JOB_STATUS_MAP = {
"failed": "FAIL",
"warning": "COMPLETE",
"pending": "START",
"running": "RUNNING",
"manual": "START",
"scheduled": "START",
"canceled": "ABORT",
"success": "COMPLETE",
"skipped": "ABORT",
"created": "START",
}
GITLAB_PIPELINE_STATUS_MAP = {
"created": "START",
"waiting_for_resource": "START",
"preparing": "START",
"pending": "START",
"running": "RUNNING",
"success": "COMPLETE",
"failed": "FAIL",
"canceled": "ABORT",
"skipped": "ABORT",
"manual": "START",
"scheduled": "START",
}
def map_job_status(gitlab_job_status: str) -> str:
"""
Maps a GitLab job status to a corresponding status for OLRunState.
Args:
gitlab_job_status (str): The GitLab job status.
Returns:
str: The mapped status for OLRunState.
"""
status = GITLAB_JOB_STATUS_MAP.get(gitlab_job_status, "START")
return status
def map_pipeline_status(gitlab_pipeline_status: str) -> str:
"""
Maps a GitLab pipeline status to a corresponding status for OLRunState.
Args:
gitlab_pipeline_status (str): The GitLab pipeline status.
Returns:
str: The mapped status for OLRunState.
"""
return GITLAB_PIPELINE_STATUS_MAP.get(gitlab_pipeline_status, "scheduled")
# mapping between gitlab statuses to openlineage statuses
def calc_stage_status(jobs: list[Any]):
job_statuses = [job["status"] for job in jobs]
if all(js == "skipped" for js in job_statuses):
return "skipped", None
not_started = {"queued", "skipped", "scheduled"}
if all(js in not_started for js in job_statuses):
return "scheduled", None
finished_statuses = {"failed", "skipped", "cancelled", "success"}
if all(js in finished_statuses for js in job_statuses):
try:
finished_at = max(job["finished_at"] for job in jobs if job["finished_at"])
except ValueError:
finished_at = None
for status in ("failed", "cancelled", "success"):
if status in job_statuses:
return status, finished_at
return "skipped", finished_at
return "running", None
finished_states_str = {"FAIL", "COMPLETE", "ABORT"}
def calc_event_time(job: Any, status: str) -> tuple[str, str]:
if status in finished_states_str:
event_time = job["finished_at"]
elif job["started_at"]:
event_time = job["started_at"]
else:
event_time = job["created_at"]
return event_time
This example code synchronizes GitLab pipeline data with Databand. It uses a GitlabClient
function to interact with GitLab API and a DbndClient
function to send the data to Databand.
# © Copyright Databand.ai, an IBM Company 2024
import logging
from datetime import datetime, timedelta
from time import sleep
from typing import Any
from uuid import NAMESPACE_URL, UUID, uuid5
from docs.custom_integration.databand_client import DbndClient
from docs.custom_integration.gitlab_client import (
GitlabClient,
GitlabPipelineInfo,
calc_event_time,
calc_stage_status,
map_job_status,
map_pipeline_status,
)
logger = logging.getLogger(__name__)
GITLAB_TOKEN = "<your_gitlab_token>"
GITLAB_PROJECT_ID = "<your_gitlab_project_id>"
DATABAND_ACCESS_TOKEN = "<your_databand_access_token>"
DATABAND_CUSTOM_INTEGRATION_FULL_API = "<your_api_endpoint_provided_in_ui>" # example: https://client.databand.ai/api/v1/tracking/open-lineage/3a8bc7f8-0c65-115f-98f6-aa662e60cbe7/events/bulk
DBND_CLIENT = DbndClient(
access_token=DATABAND_ACCESS_TOKEN,
databand_custom_integration_full_api=DATABAND_CUSTOM_INTEGRATION_FULL_API,
)
GITLAB_CLIENT = GitlabClient(GITLAB_PROJECT_ID, GITLAB_TOKEN)
_PRODUCER = "https://some.producer.com/version/1.0"
_SCHEMA_URL = (
"https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/SQLJobFacet.json"
)
RUN_ID_NAMESPACE = uuid5(NAMESPACE_URL, DATABAND_CUSTOM_INTEGRATION_FULL_API)
def generate_run_id(job_name: str, parent_run_id: UUID | None):
return str(uuid5(UUID(parent_run_id or RUN_ID_NAMESPACE), job_name))
class RunPayloadBuilder:
def __init__(self) -> None:
self.runs = []
self.pipeline_run_uid = None
self.pipeline_job_name = None
self.pipeline_project_name = None
self.namespace = None
def get_data(self) -> list[dict]:
return self.runs
def add_job(self, job: Any) -> None:
"""
Adds a new job to the run data.
Args:
job (Any): A dictionary representing the job to add.
Returns:
None
"""
execution_time = {
"nominalStartTime": str(job["created_at"]),
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
}
start_time = None
if job["started_at"]:
start_time = {
"startTime": str(job["started_at"]),
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
}
parent_facet = {
"job": {
"name": job["parent_name"],
"namespace": self.namespace,
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
},
"run": {
"runId": job["paren_run_uid"],
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
},
}
inputs = []
if job["upstream_name"]:
dataset = {
"name": job["upstream_name"],
"namespace": self.namespace,
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
}
inputs.append(dataset)
event_time = calc_event_time(job, job["status"])
data = {
"eventTime": str(event_time),
"eventType": map_job_status(job["status"]),
"inputs": inputs,
"job": {"name": job["name"], "namespace": self.namespace},
"outputs": [],
"run": {
"runId": job["run_uid"],
"facets": {
"startTime": start_time,
"nominalTime": execution_time,
"parent": parent_facet,
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
},
},
"producer": _PRODUCER,
"schemaURL": _SCHEMA_URL,
}
self.runs.append(data)
def set_jobs(self, jobs: Any) -> None:
for job in jobs:
run_uid = generate_run_id(job["name"], UUID(self.pipeline_run_uid))
parent_run_uid = generate_run_id(job["stage"], UUID(self.pipeline_run_uid))
virtual_job = {
"created_at": job["created_at"],
"started_at": job["started_at"],
"status": job["status"],
"finished_at": job["finished_at"],
"name": job["name"],
"parent_name": job["stage"],
"paren_run_uid": parent_run_uid,
"run_uid": run_uid,
"upstream_name": None,
}
self.add_job(virtual_job)
def set_stages(self, stages: Any) -> None:
"""
This function iterates over the provided stages and their associated jobs
to create virtual jobs representing the stages in the pipeline run.
Each stage is represented as a virtual job with metadata.
In GitLab, stages act as task groups but are not directly reported as jobs.
This function bridges the gap by creating virtual jobs for each stage, allowing them to be tracked as tasks in Databand.
Args:
stages (Any): A list of tuples where each tuple contains the stage name and a list of jobs associated with that stage.
"""
prev_stage_task_id = None
for stage_name, jobs in stages:
created_at = min(job["created_at"] for job in jobs if job["created_at"])
try:
started_at = min(job["started_at"] for job in jobs if job["started_at"])
except ValueError:
started_at = None
# Calculate the status and finish time of the stage based on its jobs
stage_status, finished_at = calc_stage_status(jobs)
# Generate a unique run ID for the stage
run_uid = generate_run_id(stage_name, UUID(self.pipeline_run_uid))
# Create a virtual job representing the stage
virtual_job = {
"created_at": created_at,
"started_at": started_at,
"status": stage_status,
"finished_at": finished_at,
"name": stage_name,
"parent_name": self.pipeline_job_name,
"paren_run_uid": self.pipeline_run_uid,
"run_uid": run_uid,
"upstream_name": prev_stage_task_id,
}
# Add the virtual job to the pipeline
self.add_job(virtual_job)
# Update the previous stage task ID for establishing upstream relationship
prev_stage_task_id = stage_name
def set_pipeline(self, pipeline: Any) -> None:
"""
Sets up the initial pipeline details in the run data.
Args:
pipeline (Any): A dictionary representing the pipeline details.
tracking_source_uid (str): The unique ID for tracking source.
Returns:
None
"""
status = map_pipeline_status(pipeline["status"])
event_time = calc_event_time(pipeline, status)
start_time_facet = None
if pipeline["started_at"]:
start_time_facet = {
"startTime": str(pipeline["started_at"]),
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
}
self.pipeline_run_uid = generate_run_id(str(pipeline["id"]), None)
self.pipeline_job_name = pipeline["ref"]
self.pipeline_project_name = pipeline["web_url"].split("/")[4]
self.namespace = pipeline["web_url"].split("/")[4]
data = {
"eventTime": str(event_time),
"eventType": status,
"inputs": [],
"job": {"name": self.pipeline_job_name, "namespace": self.namespace},
"outputs": [],
"run": {
"runId": self.pipeline_run_uid,
"facets": {
"startTime": start_time_facet,
"nominalTime": {
"nominalStartTime": str(pipeline["created_at"]),
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
},
"tags": {"projectName": self.pipeline_project_name},
"_producer": _PRODUCER,
"_schemaURL": _SCHEMA_URL,
},
},
"producer": _PRODUCER,
"schemaURL": _SCHEMA_URL,
}
self.runs.append(data)
def sync_pipeline(pipeline_id: int) -> GitlabPipelineInfo:
pipeline_full_info = GITLAB_CLIENT.get_pipeline_full_info(pipeline_id)
payload_builder = RunPayloadBuilder()
payload_builder.set_pipeline(pipeline_full_info.pipeline)
payload_builder.set_stages(pipeline_full_info.job_stages.items())
flattened_jobs = [
item for sublist in pipeline_full_info.job_stages.values() for item in sublist
]
payload_builder.set_jobs(flattened_jobs)
DBND_CLIENT.openlineage_api(payload_builder.get_data())
return pipeline_full_info
if __name__ == "__main__":
pipeline_cursor = datetime.now() - timedelta(days=1)
while True:
pipelines = GITLAB_CLIENT.list_pipelines(pipeline_cursor)
if not pipelines:
logger.info("No new pipelines on gitlab")
for i, pipeline in enumerate(pipelines):
logger.info(
"========= Pipeline %s (%s/%s)", pipeline["id"], i, len(pipelines)
)
try:
pipeline_full_info = sync_pipeline(pipeline["id"])
pipeline_cursor = GITLAB_CLIENT.get_last_pipeline_datetime(
pipeline_full_info, pipeline_cursor
)
except Exception as e:
logger.exception(
"Failed syncing pipline %s error: %s", pipeline["id"], e
)
logger.info("sleeping %s", datetime.now())
sleep(2 * 60)
Known issues
When you send a POST request to Databand, remember to add to the headers the following request:
"Content-Type": "application/json"