Workflow DAGs installation and configuration (Open Data for Industries)

Open Data for Industries uses some third-party tools to create functional workloads that do the ingestion and conversion of data on the platform. Learn how to implement and configure the workflow definitions in the form of Directed Acyclic Graph (DAG).

Third-party libraries are used for data conversion: The Open Data for Industries data format conversion process uses industry standard solutions that are provided by third-party vendors. These solutions are in the form of external libraries. The details of the third-party solutions are out of scope of the current document.
Obtaining the DAGs: You can access all currently implemented DAGs from the Open Group Community Projects repository, or you can create your own DAGs.
DAG license is owned by Schlumberger: The license for the sgy-to-zgy DAG in the OSDU Platform project is owned by Schlumberger, and the DAG is not open source.

Apache Airflow installation on Open Data for Industries

The Apache Airflow installation process is a child process to the software utility installation process. For more information, see Installing software utilities.

The Apache Airflow manifest file declares all the Kubernetes objects that it needs. After the manifest file is applied to the Kubernetes-enabled Red Hat® OpenShift® Container Platform, the system moves from its present state to the wanted state as defined by the manifest file. It creates and wires everything that is needed to install the Apache Airflow on the Open Data for Industries platform.

Kubernetes objects that are declared in the manifest file:
  • Namespace
  • List
  • ConfigMap
  • Deployment
  • PodDisruptionBudget
  • Role
  • RoleBinding
  • ClusterRole
  • ServiceAccount
  • Group
  • Secret
  • Service
  • StatefulSet

DAGs processing

Use workflow management platform to build and run workflows. It represents a workflow as a DAG.

The DAG implementations use Open Data for Industries endpoints to validate, parse, and store records from the data files to the data and metadata repositories on the Open Data for Industries storage layer.

A DAG consist of tasks (individual pieces of work) and instructions what to do if a task fails.

A task describes how to:
  • Do data fetching.
  • Run analysis.
  • Trigger other systems.
  • Do other pieces of work.

Many data intensive processes need an ordered execution of the tasks. The individual tasks can be run in sequence or in parallel.

The task orchestration, which is done through DAGs, is important in achieving synchronized processed state.

The tasks orchestration uses three common types of tasks, that are subclasses of the Airflow's Base Operator.
Operators
Predefined tasks that you can string together to build most parts of your DAGs.
Sensors
A special subclass of operators that are waiting for an external event to happen.
Task flow
A custom Python function that is packaged up as a task.

The operators and sensors are both templates, and when you call one of them in a DAG file, you're making a task.

The DAG definitions use pre-built and custom operators. In that sense, the concepts of task and operator are interchangeable.

Operators used in the Open Data for Industries DAGs definitions

Two types of operators are used in the Open Data for Industries DAGs definitions.
Python Operator

Python-based operators. The DAG definitions contain instructions on how to orchestrate the order and message exchange between the different tasks.

The Open Data for Industries Apache Airflow utility installs a common Python SDK for all the Python operators, which are used in Open Data for Industries DAGs. For more information, see the Python SDK OSDU repository.

Note: Ingestion of Manifest artifacts is done through Python operators. For more information, see Sample steps to ingest Manifest files.
KubernetesPodOperator
This operator uses the Kubernetes Pod object to decouple the workflow logic from the Airflow context. It helps to create a new context that is able to run any logic written in different programming languages. By using KubernetesPodOperator, you ensure that any business logic can be reused from different forms and context.

The following diagram shows the interaction between the Apache Airflow context and the Kubernetes Pod instance:

Some benefits of using the KubernetesPodOperator.
Increased flexibility for deployments.
You do not need to write any new logic or you need to implement significantly less Apache Airflow code. Furthermore, in Kubernetes Pod objects, the logic is fully containerized.
Flexibility of configurations and dependencies.
Custom Docker images ensure that the tasks environment, configuration, and dependencies are independent.
Usage of Kubernetes secrets for added security.
With the KubernetesPodOperator, you can use the Kubernetes Vault technology to store all the sensitive data.

DAGs operators and configuration

IBM Open Data for Industries runs ordered processes as DAG definitions. Each DAG implemented in Open Data for Industries uses either Python or KubernetesPodOperator.

DAGs Operator used Supported input format Configurations needed Description
Osdu_ingest

(Manifest artifacts ingestion)

Python Operator JSON You need to configure the operating system variables or the Apache Airflow environment variables. The Manifest DAG uses custom Python operators to orchestrate the workflow. The Operators refer to variables at operating system level or to variables from the Airflow context.
csv-parser-dag

(CSV file ingestion)

KubernetesPodOperator CSV You need to configure the Apache Airflow environment variables. The CSV DAG uses Kubernetes Pod object to run the ingestion logic. The ingestion logic is containerized as an image and it is configured with the DAG definition.
sgy-to-zgy

(SGY to ZGY file conversion)

KubernetesPodOperator SEGY You need to configure the Apache Airflow environment variables. The SGY to ZGY file conversion DAG uses a Kubernetes Pod object to run the conversion logic. The conversion logic is containerized as an image and configured with the DAG definition.
openvds_import

(SGY to Open VDS file conversion)

KubernetesPodOperator SEGY You need to configure the Apache Airflow environment variables. The SGY to Open VDS file conversion DAG uses a Kubernetes Pod object to run the conversion logic. The conversion logic is containerized as an image and configured with the DAG definition.
Energistics_xml_ingest (WITSML file ingestion) Python Operator and KubernetesPodOperator XML You need to configure the Operating System level variables. The DAG uses custom Python Operators and KubernetesPodOperator to orchestrate the workflow. The operators refer to the variables at Operating System level.

DAGs deployment and configuration

Apache Airflow installation on Open Data for Industries is a containerized provisioning of the framework, achieved through various Kubernetes objects. Apache Airflow provisions the DAG definitions from a configured directory. The scheduler reads any changes in the directory to update the DAG definitions in the running context. The Executors reference to the same directory when the tasks are scheduled on them.

Configure and enable DAGs

With the previous steps, you ensure the scheduler arranges the DAGs and the DAGs are available on the Apache Airflow web server.

Before they are triggered for functional processes, you need to configure and enable the DAGs.

  1. Log in to the Airflow web server.
    1. Log in to the OpenShift admin console.
    2. Select the project, where Apache Airflow is installed. The default one is osdu-airflow.
    3. Select the Networking option and then Routes.
    4. Locate the web console of the Apache Airflow. Click the URL in the column Location for the row Name.
    5. Get the login credentials for Apache Airflow web console.
      1. Log in to OpenShift admin console.
      2. Select the project osdu.
      3. Select the menu option Workloads and then Secrets.
      4. Click the row that is named props-secret.
      5. Copy the username and password of the airflow.
  2. For each DAG validate that the variables that are configured as part of the Apache Airflow provisioning by Open Data for Industries utilities installation process are as listed in the table.
    DAG Key Value type Sample value
    Manifest artifact ingestion

    This DAG definition gets some of the variables from the operating system environment and some from the trigger request JSON object.

    For more information, see Sample steps to ingest Manifest files.

    core__ingestion__batch_save_enabled Boolean true
    core__ingestion__batch_save_size Integer 20 (only if core__ingestion__batch_save_enabled is set to true)
    core__auth__access_token String {{keycloak_JWT_Access_Token}}
    core__ingestion__batch_count Integer 3
    core__config__dataload_config_path String abc
    core__service__search__url String http://os-search-ibm.osdu.svc.cluster.local:8080/api/search/v2/query
    core__service__schema__url String http://os-schema-ibm.osdu.svc.cluster.local:8080/api/schema-service/v1/schema
    core__service__storage__url String http://os-storage-ibm.osdu.svc.cluster.local:8080/api/storage/v2/records
    core__service__file__host String http://os-file-ibm.osdu.svc.cluster.local:8080/api/file
    core__service__workflow__host String http://os-workflow-ibm.osdu.svc.cluster.local:8080/api/workflow
    client_id String osdu-login
    client_secret String a_valid_client_secret_for_the_environment
    core__config__show_skipped_ids Boolean true
    CSV Parser ingestion

    This DAG definition uses Kubernetes Pod Operator to process tasks. It requires the docker image URL and namespace to create a Kubernetes pod.

    The variable env_var provides all the service endpoints that the operator needs to process the input file.

    DOCKER_IMAGE String community.opengroup.org:5555/osdu/platform/data-flow/ingestion/csv-parser/csv-parser/csv-parser-dag-ibm-m10
    NAMESPACE String {{project_name}}
    env_var JSON

    {"FILE_SERVICE_ENDPOINT": "<http://os-file-ibm:8080/api/file/v2",> "OSDU_STORAGE_BATCH_SIZE": "50", "OSDU_STORAGE_THREAD_POOL_SIZE": "10", "PARTITION_API": "<http://os-partition-ibm:8080/api/partition/v1",> "SCHEMA_SERVICE_ENDPOINT": "<http://os-schema-ibm:8080/api/schema-service/v1",> "SEARCH_SERVICE_ENDPOINT": "<http://os-search-ibm:8080/api/search/v2",> "STORAGE_SERVICE_ENDPOINT": "<http://os-storage-ibm:8080/api/storage/v2",> "UNIT_SERVICE_ENDPOINT": "<http://os-unit-ibm:8080/api/unit/v2",> "WORKFLOW_SERVICE_ENDPOINT": "<http://os-workflow-ibm:8080/api/workflow/v1",> "ibm.cos.secret_key": "{{cos.secret_key}}", "DATASET_SERVICE_ENDPOINT": "<http://os-file-ibm:8080/api/file/v2",> "spring.security.oauth2.resourceserver.jwt.jwk-set-uri": "keycloak_route/auth/realms/OSDU/protocol/openid-connect/certs", "AUTHORIZE_API": "<http://os-entitlements-ibm-v2:8080/api/entitlements/v2",> "amqphub.amqp10jms.remoteUrl": "amqp://ex-aao-amqp-0-svc:5672", "schema.update.event.topic": "{{schema-update-event-topic}}", "partition.keycloak.url": "keycloak_route", "amqphub.amqp10jms.username": "{{amquser}}", "amqphub.amqp10jms.password": "{{amqpass}}", "partition.keycloak.grant_type": "password", "partition.keycloak.realm": "OSDU", "partition.keycloak.user": "partition-service-admin", "partition.keycloak.client_id": "partition-service", "tenantCache.exp.time.in.second": "60", "ibm.health-check-uri": "/api/schema-service/v1/info,/api/schema-service/v1/schemas/system", "DELAY":"2000", "STATUS_CHANGED_MESSAGING_ENABLED": "false", "partition.keycloak.client_secert": "{{partition_secret}}", "partition.keycloak.password": "{{partition_password}}" }

    SGY to ZGY conversion

    This DAG definition uses Kubernetes Pod Operator to process tasks. It requires the docker image URL and namespace to create a Kubernetes pod.

    The variable sgy_to_zgy_env_var provides all the service endpoints that the operator needs to process the input file. Additionally, other variables to control the processing are needed to optimize the process.

    SGY_TO_ZGY_DOCKER_IMAGE String community.opengroup.org:5555/osdu/platform/data-flow/ingestion/segy-to-zgy-conversion/segy-to-zgy-conversion-v0-13-1:e9941b0ac5c244cb77f8fb366c1a32df406c8f07
    SGY_TO_ZGY_NAMESPACE String osdu-airflow
    sgy_to_zgy_env_var JSON

    { "STORAGE_SVC_URL": "http://os-storage-ibm:8080/api/storage/v2/", "SD_SVC_URL": "http://os-seismic-store-service:5000/api/v3", "SD_SVC_TOKEN": "", "SPATIALREF_SVC_TOKEN": "authorization", "STORAGE_SVC_TOKEN": "", "STORAGE_SVC_API_KEY": "", "SD_SVC_API_KEY": "ok", "OSDU_DATAPARTITIONID": "opendes", "SD_READ_CACHE_PAGE_SIZE": "4195024", "SD_READ_CACHE_MAX_PAGES": "256", "SEGYTOZGY_VERBOSITY": "3", "SEGYTOZGY_GENERATE_INDEX": "1", "IBM_COS_URL": "{{minio_cos_url}}" }

    SGY to Open VDS conversion

    This DAG definition uses Kubernetes Pod Operator to process tasks. It requires the docker image URL and namespace to create a Kubernetes pod.

    The variable sgy_to_vds_env_var provides all the service endpoints that the operator needs to process the input file.

    SGY_TO_VDS_DOCKER_IMAGE String community.opengroup.org:5555/osdu/platform/domain-data-mgmt-services/seismic/open-vds/openvds-ingestion:2.2.1
    SGY_TO_VDS_NAMESPACE String {{project_name}}
    sgy_to_vds_env_var JSON { "IBM_COS_URL": "{{minio_cos_url}}", "SD_SVC_URL": "http://seismic-store-service::5000/api/v3", "SD_SVC_API_KEY": "test", "SDAPI_LOGLEVEL" : "2" }
    WITSML ingestion

    This DAG definition uses KubernetesPodOperator and Python Operators to ingest WITSML XML documents. It requires NAMESPACE to create a Kubernetes pod. Python operators are available as part of common python SDK, which is installed as part of Open Data for Industries Airflow installation.

    NAMESPACE String {{project_name}}
    OSDU_API_CONFIG String /opt/airflow/dags/repo/osdu_api_config.yaml
  3. Validate the props-ingestion configuration map in the osdu namespace.

    You get a key set osdu.airflow.url to the proper value of the Apache Airflow web service. It enables the Open Data for Industries core services to interact with Apache Airflow through REST APIs.

  4. Enable the DAGs definition, which appears on the Apache Airflow console.

    Toggle the ON and OFF switch, which is located left to the DAG name. As a result, the DAGs are ready to process the DAG-triggered requests.

  5. For WITSML ingestion DAG, validate that:
    • On the OpenShift or Kubernetes cluster, where Open Data for Industries services are installed, check that the configuration map airflow-env exists under the Apache Airflow namespace. Furthermore, check that it has a OSDU_API_CONFIG key with a value, which is pointing to the location of the config file osdu_api_config.yaml.
    • The git-sync process copied the following two files at the DAG locations on the Apache Airflow namespace pods (scheduler, web and worker):
      • osdu_api_config.yaml A configuration file for WITSML DAG, which contains the path of the community WITSML parser.
      • witsml_parser_dag.py The WITSML DAG definition, which contains all the tasks.