Implementing Python Worker Tasks

Python worker tasks encapsulate Python processing libraries.

By default, the Platform generates:

  • workers/python-engine-worker, a preconfigured Python worker project that encapsulates the python-engine module.

  • processing/python-engine, a Python module library with a run() function that takes a path to a collector archive and uses the dbgene-data library to read it. It also contains a pytest test that illustrates how to run unit tests against your code.

    The dbgene-data Python library, available in our Nexus PYPI repository, contains utilities for reading, writing and checking a collector archive. You can find help with:

    • help(dbgene.data.utils)

    • help(dbgene.data.DataFrameDict)

    • help(dbgene.data.DataChecker)

    • help(dbgene.data.constants)

    In addition, the Optimization Server Python Bridge library helps you communicate with the worker-shell within your Python task. For more details, please refer to the Optimization Server documentation, Chapter Workers .

Python processing libraries rely on Dataframes data structure to handle their input and output.

Python Processing

Each scenario table is represented as a dataframe. Each record is uniquely identified by a unique ID db_gene_internal_id. Foreign key columns use the pointed record unique ID for reference.

In the example below, two records are presented which have a reference to a plant_id. java db_gene_internal_id,plant_id,name,id,duration_in_hours 01ead727-c2be-1b7c-8cd8-90a2d2c6143c,01ead727-c2cf-1b7c-8cd8-90a2d2c6143c,Prepare,PRE,4 01ead727-c2bf-1b7c-8cd8-90a2d2c6143c,01ead727-c2d0-1b7c-8cd8-90a2d2c6143c,Unscrew,UNS,2

The handling of those Dataframes can be performed using Platform and Optimization Server libraries.

A processing library code typically relies on the following structure:

# input data is received in the form of an archive_path 
def run(archive_path: str) -> DataFrameDict:
    # data is extracted as a dictionary of Dataframes
    input_data_frame_dict: DataFrame = DataFrameDict()
    input_data_frame_dict.load_collector_archive(CsvCollectorArchive(archive_path))

    # run the engine
    engine_result_dict= run_engine(input_data_frame_dict)

    # finalize data output preparation
    data_frame_dict= prepare_output(engine_result_dict)

    # optional data output validation
    validate_output(data_frame_dict)

    return data_frame_dict

# this function relies on DataChecker validation rules to ensure
# the output dictionary is consistent
def validate_output( data_frame_dict : DataFrameDict ) :
    print("Checking output...")
    checker = DataChecker()
    checker.internal_id(SCHEDULE)
    checker.uniqueness(SCHEDULE, INTERNAL_ID_FIELD)
    errors = checker.check_everything(data_frame_dict)
    if len(errors) > 0:
        print("There is(are) validation error(s):")
        for error in errors:
            print(" - " + error.get_message())
        sys.exit(1)
    return None

Concerning the generated microservice, each Python worker contains a run-python-engine.py file which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a dataframe dictionary as argument and retrieve the modified dataframe dictionary as a return value of the execution.

The extract below shows how this is happening:

# Init the worker bridge (ExecutionContext)
execution_context = ExecutionContext()
execution_context.start()

print("Reading input...")
input_collector_path = execution_context.get_input_path("inputCollector")

print("Processing...")
# If you need to emit metrics, you have to pass the execution_context to your engine.
data_frame_dict = engine.run(input_collector_path)

print("Writing output...")
output_path = execution_context.get_output_path("outputCollector")
output_archive = CsvCollectorArchive(output_path, "w")
data_frame_dict.store_collector_archive(output_archive)

# Stop the ExecutionContext properly
execution_context.stop()

The Platform allows using IntelliJ to debug Python workers. For more details, please refer to Section Using an Integrated Development Environment.