Code operator examples

The Code operator can be of type Source, Processing and Analytics, or Target. You select the desired script from the scripts folder.

Important

Restriction

Passed datetime objects must not be timezone-aware.

When you return or submit event tuples with datetime objects, make sure that they are “naive”, that is “timezone-unaware”. For more information about “timezone aware” and “naive” in Python coding, see datetime — Basic date and time types.

Passing datetime objects that are “timezone-aware” results in runtime errors, and such events are ignored.

Tip

In Python, you can convert a timezone-aware datetime object to a “naive”, UTC-based datetime object. The following code snippet is an example.

from dateutil.tz import tzutc

event['dt_utc'] = dt.astimezone(tzutc()).replace(tzinfo=None)

See Example 2 for a full example.

 

Code as a Source operator

You might use Code as a Source operator when you want to generate some test data without having to set up a Kafka instance. Another use is when you want to bring in data from a web socket or from your proprietary data source.

A. Asynchronous or synchronous cases: produce

The produce approach with its background queue is adequate for asynchronous (such as websockets) as well as for synchronous use cases.

The following code shows an example of produce. The code generates some test data and sends it to the output, at a rate of two events per second.

def produce(submit, state):
    counter = 0
    while(counter <= 10000):
        # Submit a tuple in each iteration:
        submit({"number": counter, "square": counter * counter})
        counter += 1
        time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events

 

The produce() function is called when the job starts to run. It is called on a background thread, and it typically invokes the submit() callback whenever a tuple of data is ready to be emitted from this operator. The produce() function allows for using asynchronous data services as well as synchronous data generation or retrieval.

@submit is a Python callback function that takes one argument. The agrument is a dictionary that represents a single tuple.

@state is a Python dictionary object for keeping state.

 

In the Edit Output Schema window, you declare two output attributes - number and square - of type Number.

Edit schema for code as source

When the streams flow is running, the Flow of Events shows the generated test data.

Flow of Events for code as source

B. Optimized for synchronous cases: generate_sync

The produce approach with its background queue’s interthread communication has its performance cost. Therefore, for synchronous use cases with high data throughput, it is recommended to use the faster generate_sync function approach.

To this end, instead of the produce function, your code needs to implement the generate_sync function. The code structure might look very similar, except that instead of calling submit for each emitted event, the code will use the yield directive for this purpose.

The following code shows an example of generate_sync:

# The generate_sync() function will be called when the job starts to run.
# It emits events by calling 'yield' on the output event dictionary.
# @state a Python dictionary object for keeping state
# You must declare all output attributes in the Edit Schema window.
def generate_sync(state):
    counter = 0
    while(True):
         # Submit a tuple in each iteration:
         yield {"number": counter, "square": counter * counter}
         counter += 1
         time.sleep(0.5)    # Simulates a delay of 0.5 seconds between emitted events

The output schema and runtime output data are the same as in the example for produce.

Comparison of produce and generate_sync

The following table compares the produce and generate_sync approaches:

Aspect produce generate_sync
Use cases Async, sync Sync
Performance Slower, due to interthread communication Faster
Function signature def produce(submit, state) def generate_sync(state)
Emitting event submit(…) yield …

Code as a Processing and Analytics operator

The Code as a Processing and Analytics operator has both an input parameter and a return value.

Example 1

Goal: You need to return to the output schema two new attributes that are not present in the input schema.

The following code snippet shows an example for two attributes, “friendly_greeting” and “formal_greeting”, in the output schema of the Code operator.

import sys
def process(event):
  if 'name' in event:
    name = event['name']
  else:
    name = 'stranger'
  friendly_greeting = 'Hey ' + name + '!'
  formal_greeting = "Dear ' + name + ','
  return
  {'friendly_greeting':friendly_greeting,
   'formal_greeting':formal_greeting}

The returned attributes ‘friendly_greeting’ and ‘formal_greeting’ must be included in the output schema of the Code operator.

Example 2

Goal: You need to return to the output schema the time of an event that was changed from IST to UTC time zone.

from dateutil.parser import parse
from dateutil import tz
def process(event):
  # datetime.parse doesn't understand "IST" as a time zone indicator, so swap for +05:30

  dt = parse(event['event_time'].replace('IST','+05:30'))

  # convert to UTC time zone too

  event['dt_utc'] = dt.astimezone(tz.gettz('UTC'))
  return event

For more information about date formats, see Date formats.

Example 3

Goal: You need to return every other tuple to the output schema.

import sys

counter=0

def process(event):
	global counter
	counter+=1
	event['counter'] = counter
	if counter%2 is 0:
		return None
	return event

Example 4

Goal: You want to report the geographic movement of mobile GPS-enabled devices (phones, tablets, cars). This scenario is an example for a use case that requires state.

def init(state):
    # Nothing to initialize, in this example
    pass


def process(event, state):
    deviceId = event['deviceId']  # Extract the device ID from the event tuple
    if deviceId not in state:
        # No previous record for this device ID, meaning it's the first event for it
        message = "Detected initial location"
    else:
        record = state[deviceId]  # Extract the device record of last (previous) location
        directions = []  # Array for gathering directions: north or south, east or west

        if event['lat'] > record['last_lat']:
            directions.append("north")
        elif event['lat'] < record['last_lat']:
            directions.append("south")

        if event['long'] > record['last_long']:
            directions.append("east")
        elif event['long'] < record['last_long']:
            directions.append("west")

        message = "Moved " + "-".join(directions)

    output = {
        'deviceId': deviceId,
        'message': message
    }
    rememberDeviceLocation(event, state)  # For comparing with future locations
    return output

Example output

Input event 1 {‘deviceId’: ‘p008’, ‘lat’: ‘32.678611’, ‘long’: ‘35.576944’}

Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Detected initial location’} ***

Input event 2 {‘deviceId’: ‘p008’, ‘lat’: ‘32.67862’, ‘long’: ‘35.576944’}

Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved north’} ***

Input event 3 {‘deviceId’: ‘p008’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}

Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved south-west’} ***

Input event 4 {‘deviceId’: ‘x123’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}

Resulting output {‘deviceId’: ‘x123’, ‘message’: ‘Detected initial location’}

Example 5

Goal: Watson IoT operator ingests electricity usage readings from smart meters to bill users. You need to take that data and implement time-based billing. You also want to send an email alert to customers whose usage for the current month is high.

Watch the video Use Python code in a streams flow.

Code as a Target operator

The Code as Target operator allows you to write Python code in a target node.

The following code shows an example:

# process() function will be invoked on every event tuple
# @event a Python dictionary object representing the input event tuple as defined by the input schema
# @state a Python dictionary object for keeping state over subsequent function calls
def process(event, state):
    # Do something with the event, for example use a complex filtering condition and send the event to a database.
    pass

Learn more