Implementing Python Worker Tasks
Python worker tasks encapsulate Python processing libraries.
By default, the Platform generates:
-
workers/python-engine-worker, a preconfigured Python worker project that encapsulates thepython-enginemodule. -
processing/python-engine, a Python module library with arun()function that takes a path to a collector archive and uses thedbgene-datalibrary to read it. It also contains a pytest test that illustrates how to run unit tests against your code.The
dbgene-dataPython library, available in our Nexus PYPI repository, contains utilities for reading, writing and checking a collector archive. You can find help with:-
help(dbgene.data.utils) -
help(dbgene.data.DataFrameDict) -
help(dbgene.data.DataChecker) -
help(dbgene.data.constants)
In addition, the Optimization Server Python Bridge library helps you communicate with the worker-shell within your Python task. For more details, please refer to the Optimization Server documentation, Chapter Workers .
-
Python processing libraries rely on Dataframes data structure to handle their input and output.
Python Processing

Each scenario table is represented as a dataframe. Each record is uniquely identified by a unique ID db_gene_internal_id. Foreign key columns use the pointed record unique ID for reference.
In the example below, two records are presented which have a reference to a plant_id. java db_gene_internal_id,plant_id,name,id,duration_in_hours 01ead727-c2be-1b7c-8cd8-90a2d2c6143c,01ead727-c2cf-1b7c-8cd8-90a2d2c6143c,Prepare,PRE,4 01ead727-c2bf-1b7c-8cd8-90a2d2c6143c,01ead727-c2d0-1b7c-8cd8-90a2d2c6143c,Unscrew,UNS,2
The handling of those Dataframes can be performed using Platform and Optimization Server libraries.
A processing library code typically relies on the following structure:
# input data is received in the form of an archive_path
def run(archive_path: str) -> DataFrameDict:
# data is extracted as a dictionary of Dataframes
input_data_frame_dict: DataFrame = DataFrameDict()
input_data_frame_dict.load_collector_archive(CsvCollectorArchive(archive_path))
# run the engine
engine_result_dict= run_engine(input_data_frame_dict)
# finalize data output preparation
data_frame_dict= prepare_output(engine_result_dict)
# optional data output validation
validate_output(data_frame_dict)
return data_frame_dict
# this function relies on DataChecker validation rules to ensure
# the output dictionary is consistent
def validate_output( data_frame_dict : DataFrameDict ) :
print("Checking output...")
checker = DataChecker()
checker.internal_id(SCHEDULE)
checker.uniqueness(SCHEDULE, INTERNAL_ID_FIELD)
errors = checker.check_everything(data_frame_dict)
if len(errors) > 0:
print("There is(are) validation error(s):")
for error in errors:
print(" - " + error.get_message())
sys.exit(1)
return None
Concerning the generated microservice, each Python worker contains a run-python-engine.py file which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a dataframe dictionary as argument and retrieve the modified dataframe dictionary as a return value of the execution.
The extract below shows how this is happening:
# Init the worker bridge (ExecutionContext)
execution_context = ExecutionContext()
execution_context.start()
print("Reading input...")
input_collector_path = execution_context.get_input_path("inputCollector")
print("Processing...")
# If you need to emit metrics, you have to pass the execution_context to your engine.
data_frame_dict = engine.run(input_collector_path)
print("Writing output...")
output_path = execution_context.get_output_path("outputCollector")
output_archive = CsvCollectorArchive(output_path, "w")
data_frame_dict.store_collector_archive(output_archive)
# Stop the ExecutionContext properly
execution_context.stop()
The Platform allows using IntelliJ to debug Python workers. For more details, please refer to Section Using an Integrated Development Environment.