User generated nodes
Use the Python snippet node to include simple code of your own when procesing data. For more complex processing, you can also create and use custom operators.
Python snippet operator
This operator allows you to input custom Python code snippets to work on the table data with strict security controls.
Node input
python_code- A string of Python code snippet that transforms the input PyArrow table. This code is executed securely within a restricted namespace.new_columns- a list fo new column names added by the user's transfrmation code. Any additional columns added by the code will be automatically removed.tableis not allowed as a new column name.
The code is parsed using Abstract Syntax Tree (AST) analysis. It ensures syntax correctness, no unsafe or blacklisted operations, and compliance with naming rules (for example, table cannot be used as a column name).
The code is run in a tightly controlled environment with access only to explicitly allowed functions and modules, and in a restricted and curated namespace.
Node output
The node returns a PyArrow table with the user's code transformations applied, containing:
- Columns from the original input (if not dropped by the user code)
- Columns listed in
new_columns.
The following metadata is also available:
columns_added: Number of new columns.new_column_names: Names of new columns.columns_modified: Names of modified columns.rows_added: Number of new rows.
Execution environment
The execution context is created to run the code. User code can access the table by using the variable name table and similarly all the columns can be accessed by column names.
Two helper functions are provided:
-
add_column: This function can be used to add a column to the table. Usage:table=add_column(table, 'column_name', column_data)where
column_namemust be included in thenew_columnslist. -
update_column: This function can be used to modify an existing column. Usage:table = update_column(table, 'column_name', column_data)It should only be used for columns that already exist in the table.
Security features
Only a limited set of built-in Python functions is permitted during execution. These include commonly used safe functions such as:
abs, round, max, min, sum, list, tuple, set, dict, range, map, filter, zip,
sorted, reversed, type,
isinstance, getattr, hasattr,all, any, len, str, int, float, bool
To prevent malicious or unsafe behavior, the following names and operations are explicitly blocked:
'__import__', 'exec', 'eval', 'compile', 'open', 'file', 'input','raw_input', 'reload', 'globals', 'locals', 'vars', 'dir',
'help', 'copyright', 'credits', 'license', 'quit', 'exit', '__builtins__', '__builtin__', '__file__', '__name__'
If the user code attempts to use any of these, the operator raises an exception.
The code is run in a curated environment that exposes only a small set of safe modules and functions. Users have access to:
math, datetime, date, timedelta, str, re, numpy, pandas, pyarrow as pa and pyarrow.compute as pc
No other modules or functions are available, and external imports are not permitted.
Python snippet examples
Example 1: Updating an existing column
The following snippet updates the pages_processed column:
update_processed_columns = [[s.as](http://s.as/)_py() + 1 for s in pages_processed]
table = update_column(table, "pages_processed", update_processed_columns)
Example 2: Adding a new column
This example demonstrates how to add a new column to the table:
table = add_column(table, 'new_column_name', value)
Example 3: Extracting entities as scalar fields
This example shows how to add an extracted entity as a scalar field in the vector store.
After processing invoice documents through the Extract operator (with entity extraction enabled, using Invoice document class) and the Entity curation operator, you can use the following Python snippet operator to extract specific entities
like invoice_date and make them available as separate columns for downstream operators like Vector store (Milvus).
invoice_dates = [
row.get('Invoice', [{}])[0].get('invoice_date', None)
if row and isinstance(row, dict) and row.get('Invoice')
else None
for row in table.column('transformed_entities').to_pylist()
]
table = add_column(table, 'invoice_date', invoice_dates)
This Python snippet operator would be placed after the Entity curation operator in your data flow. The transformed_entities is the output feature of the Entity curation operator which contains the extracted entities. invoice_date is the extracted entity which is made an output feature of the Python snippet operator so that is can be used as a feature to map in vector store.
Creating custom operators
To create your own custom operators, follow these steps:
-
Create a new Python file (e.g.,
custom_operator.py) to define your custom operator class. -
Define a class that inherits from
AbstractOperator:In your new file, define a class that inherits from
AbstractOperator. This base class provides the necessary structure for your custom operator. TheAbstractOperatorclass comes from theDatasiftcore module. Import the class along with other required classes.from datasift_core.operators.abstract_operator import OperatorCategory, AbstractOperator from datasift_common.util.constants import OperatorConstants from datasift_common.util.log import get_logger logger = get_logger() class CustomOperator(AbstractOperator): """ A custom operator that adds a new column on a PyArrow Table. """ short_name = "custom_operator" category = OperatorCategory.Custom def __init__(self, config: Dict[str, Any]): """ Initialize the operator with the provided configuration. Args: config (Dict[str, Any]): Configuration parameters for the operator. """ super().__init__(config) self.sample_input = config.get("sample_input", "default_value") -
Implement the
transformmethod:The
transformmethod contains the core logic of your operator. It processes the input data and returns the transformed output along with any relevant metadata.def transform(self, table: pa.Table) -> Tuple[List[pa.Table], Dict[str, Any]]: """ Transform the input table by adding a new column. Args: table (pa.Table): Input PyArrow Table to be transformed. Returns: Tuple[List[pa.Table], Dict[str, Any]]: A tuple containing a list of transformed tables and a dictionary of metadata. """ logger.info(">>> Executing custom operator transformation") new_column_data = pa.array([self.sample_input] * table.num_rows) updated_table = table.append_column("new_column", new_column_data) metadata = { "num_rows": updated_table.num_rows, "num_columns": updated_table.num_columns } return [updated_table], metadata -
Implement the
get_metadatamethod:The
get_metadatamethod provides information about the operator's configurable attributes, which can be useful in UI or SDK.def get_metadata(self) -> Dict[str, Any]: """ Retrieve metadata about the operator's configurable attributes. Returns: Dict[str, Any]: A dictionary containing metadata about the operator's attributes. """ return { OperatorConstants.SDK: True, OperatorConstants.CATEGORY: self.category.value, OperatorConstants.LABEL: "Name of Custom Operator", OperatorConstants.IS_OPERATOR_AVAILABLE: self.is_available(), OperatorConstants.ATTRIBUTES: { "sample_input": { OperatorConstants.NAME: "Sample Input", OperatorConstants.DESCRIPTION: "A sample input parameter for the custom operator", OperatorConstants.DEFAULT: "default_value", OperatorConstants.REQUIRED: True } } } -
Add any additional methods or attributes as needed: Depending on your operator's complexity, you might need additional methods or attributes to support its functionality.
Custom operators that involve dependencies
The operator files might depend on some custom logic, which, for reasons of modularity, is better suited to be made separate. Python enables modularity using packages and modules. For example, in the following folder structure:
DUMMY_WORKSPACE/
└── test_dir/
├── dir2/
│ ├── dir2.py
│ └── dir1.py
└── sample_python_file.py
where sample_python_file.py is the file that contains the operator code. There is a dependency on a module from test_dir package. Following is the import example in the main operator file:
import pyarrow as pa
from datasift.operators.abstract_operator import
from test_dir import
When ready to upload, compress the entire folder test_dir, so that the folder structure is preserved. The compressed file can be sent in the file parameter custom_operator_dependency_archive.
Custom operators that involve custom packages
The standard installation comes with a robust set of most commonly used Python packages. However, you might need to use packages that are not published to Pypi, or published to Pypi, but not available in the default installation. In such case,
create a virtual environment, and add dependencies using pip. Following is the structure of a typical virtual environment folder:
bin/
include/
lib/
pyvenv.cfg
There is a site-packages folder within <venv_name>/lib/<python3.version>/ that contains the actual libraries in Python. Compress this folder, and upload it, and then uncompress the file into a specific
location on the runtime cluster to be used when the operator code is executed.
The compressed file can be sent in the file parameter custom_operator_package_archive.
Example custom operator
from typing import Any, Dict, List, Tuple
import pyarrow as pa
from datasift_core.operators.abstract_operator import OperatorCategory, AbstractOperator
from datasift_common.util.constants import OperatorConstants
from datasift_common.util.log import get_logger
logger = get_logger()
class CustomOperator(AbstractOperator):
"""
A custom operator that performs a specific transformation on a PyArrow Table.
"""
short_name = "custom_operator"
category = OperatorCategory.Custom
def __init__(self, config: Dict[str, Any]):
"""
Initialize the operator with the provided configuration.
Args:
config (Dict[str, Any]): Configuration parameters for the operator.
"""
super().__init__(config)
self.sample_input = config.get("sample_input", "default_value")
def transform(self, table: pa.Table) -> Tuple[List[pa.Table], Dict[str, Any]]:
"""
Transform the input table by adding a new column.
Args:
table (pa.Table): Input PyArrow Table to be transformed.
Returns:
Tuple[List[pa.Table], Dict[str, Any]]: A tuple containing a list of transformed tables
and a dictionary of metadata.
"""
logger.info(">>> Executing custom operator transformation")
new_column_data = pa.array([self.sample_input] * table.num_rows)
updated_table = table.append_column("new_column", new_column_data)
metadata = {
"num_rows": updated_table.num_rows,
"num_columns": updated_table.num_columns
}
return [updated_table], metadata
def get_metadata(self) -> Dict[str, Any]:
"""
Retrieve metadata about the operator's configurable attributes.
Returns:
Dict[str, Any]: A dictionary containing metadata about the operator's attributes.
"""
return {
OperatorConstants.SDK: True,
OperatorConstants.CATEGORY: self.category.value,
OperatorConstants.LABEL: "Name of Custom Operator",
OperatorConstants.IS_OPERATOR_AVAILABLE: True,
OperatorConstants.ATTRIBUTES: {
"sample_input": {
OperatorConstants.NAME: "Sample Input",
OperatorConstants.DESCRIPTION: "A sample input parameter for the custom operator",
OperatorConstants.DEFAULT: "default_value",
OperatorConstants.REQUIRED: True
}
}
}
Adding and managing custom operators
To upload a custom operator:
- Navigate to Project Settings.
- Click Add Custom Operator.
- Provide the following details:
- Name – The display name of your operator.
- Description – A short explanation of what the operator does.
- Operator File (.py) – Upload the Python file defining your custom operator.
- Dependencies (.zip) – (Optional) Upload a ZIP file containing any dependencies required by your operator.
Use the refresh button to get the latest validation status (completed, failed, or queued).
Edit and save the files for the operator using the Edit button.
To delete the custom operator, use the Delete icon.
Verifing the uploaded custom operator
After uploading, you can verify your custom operator in the flow canvas:
-
Open the User Generated section in the side panel. You will see your uploaded custom operator listed by its label name defined in your operator file. For example, if your operator defines:
OperatorConstants.LABEL: "Hello World36"you will see Hello World36 displayed in the list along with its description.
-
Drag the operator to the canvas to use it in the flow.
-
Run the flow when ready.