Implementing Python Worker Tasks
Python worker tasks encapsulate Python processing libraries.
By default, the Platform generates:
-
workers/python-engine-worker
, a preconfigured Python worker project that encapsulates thepython-engine
module. -
processing/python-engine
, a Python module library with arun()
function that takes a path to a collector archive and uses thedbgene-data
library to read it. It also contains a pytest test that illustrates how to run unit tests against your code.The
dbgene-data
Python library, available in our Nexus PYPI repository, contains utilities for reading, writing and checking a collector archive. You can find help with:-
help(dbgene.data.utils)
-
help(dbgene.data.DataFrameDict)
-
help(dbgene.data.DataChecker)
-
help(dbgene.data.constants)
In addition, the Optimization Server Python Bridge library helps you communicate with the worker-shell within your Python task. For more details, please refer to the Optimization Server documentation, Chapter Workers .
-
Python processing libraries rely on Dataframes data structure to handle their input and output.
Python Processing
Each scenario table is represented as a dataframe. Each record is uniquely identified by a unique ID db_gene_internal_id
. Foreign key columns use the pointed record unique ID for reference.
In the example below, two records are presented which have a reference to a plant_id
. java db_gene_internal_id,plant_id,name,id,duration_in_hours 01ead727-c2be-1b7c-8cd8-90a2d2c6143c,01ead727-c2cf-1b7c-8cd8-90a2d2c6143c,Prepare,PRE,4 01ead727-c2bf-1b7c-8cd8-90a2d2c6143c,01ead727-c2d0-1b7c-8cd8-90a2d2c6143c,Unscrew,UNS,2
The handling of those Dataframes can be performed using Platform and Optimization Server libraries.
A processing library code typically relies on the following structure:
# input data is received in the form of an archive_path def run(archive_path: str) -> DataFrameDict: # data is extracted as a dictionary of Dataframes input_data_frame_dict: DataFrame = DataFrameDict() input_data_frame_dict.load_collector_archive(CsvCollectorArchive(archive_path)) # run the engine engine_result_dict= run_engine(input_data_frame_dict) # finalize data output preparation data_frame_dict= prepare_output(engine_result_dict) # optional data output validation validate_output(data_frame_dict) return data_frame_dict # this function relies on DataChecker validation rules to ensure # the output dictionary is consistent def validate_output( data_frame_dict : DataFrameDict ) : print("Checking output...") checker = DataChecker() checker.internal_id(SCHEDULE) checker.uniqueness(SCHEDULE, INTERNAL_ID_FIELD) errors = checker.check_everything(data_frame_dict) if len(errors) > 0: print("There is(are) validation error(s):") for error in errors: print(" - " + error.get_message()) sys.exit(1) return None
Concerning the generated microservice, each Python worker contains a run-python-engine.py
file which is where the actual processing code is called in the worker context. The pattern is to call the processing library code with a dataframe dictionary as argument and retrieve the modified dataframe dictionary as a return value of the execution.
The extract below shows how this is happening:
# Init the worker bridge (ExecutionContext) execution_context = ExecutionContext() execution_context.start() print("Reading input...") input_collector_path = execution_context.get_input_path("inputCollector") print("Processing...") # If you need to emit metrics, you have to pass the execution_context to your engine. data_frame_dict = engine.run(input_collector_path) print("Writing output...") output_path = execution_context.get_output_path("outputCollector") output_archive = CsvCollectorArchive(output_path, "w") data_frame_dict.store_collector_archive(output_archive) # Stop the ExecutionContext properly execution_context.stop()
The Platform allows using IntelliJ to debug Python workers. For more details, please refer to Section Using an Integrated Development Environment.