Code operator examples
The Code operator can be of type Source, Processing and Analytics, or Target. You select the desired script from the scripts folder.
Important
- The code must be created with the same version of packages that are listed here.
- Regardless of the operator type, you must declare all output attributes in the Edit Schema window.
Restriction
Passed datetime
objects must not be timezone-aware.
When you return or submit event tuples with datetime
objects, make sure that they are “naive”, that is “timezone-unaware”. For more information about “timezone aware” and “naive” in Python coding, see datetime — Basic date and time types.
Passing datetime
objects that are “timezone-aware” results in runtime errors, and such events are ignored.
Tip
In Python, you can convert a timezone-aware datetime
object to a “naive”, UTC-based datetime
object. The following code snippet is an example.
from dateutil.tz import tzutc
event['dt_utc'] = dt.astimezone(tzutc()).replace(tzinfo=None)
See Example 2 for a full example.
Code as a Source operator
You might use Code as a Source operator when you want to generate some test data without having to set up a Kafka instance. Another use is when you want to bring in data from a web socket or from your proprietary data source.
A. Asynchronous or synchronous cases: produce
The produce
approach with its background queue is adequate for asynchronous (such as websockets) as well as for synchronous use cases.
The following code shows an example of produce
. The code generates some test data and sends it to the output, at a rate of two events per second.
def produce(submit, state):
counter = 0
while(counter <= 10000):
# Submit a tuple in each iteration:
submit({"number": counter, "square": counter * counter})
counter += 1
time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
The produce()
function is called when the job starts to run. It is called on a background thread, and it typically invokes the submit()
callback whenever a tuple of data is ready to be emitted from this operator. The produce()
function allows for using asynchronous data services as well as synchronous data generation or retrieval.
@submit
is a Python callback function that takes one argument. The agrument is a dictionary that represents a single tuple.
@state
is a Python dictionary object for keeping state.
In the Edit Output Schema window, you declare two output attributes - number
and square
- of type Number.
When the streams flow is running, the Flow of Events shows the generated test data.
B. Optimized for synchronous cases: generate_sync
The produce
approach with its background queue’s interthread communication has its performance cost. Therefore, for synchronous use cases with high data throughput, it is recommended to use the faster generate_sync
function approach.
To this end, instead of the produce
function, your code needs to implement the generate_sync
function. The code structure might look very similar, except that instead of calling submit
for each emitted event, the code will use the yield
directive for this purpose.
The following code shows an example of generate_sync
:
# The generate_sync() function will be called when the job starts to run.
# It emits events by calling 'yield' on the output event dictionary.
# @state a Python dictionary object for keeping state
# You must declare all output attributes in the Edit Schema window.
def generate_sync(state):
counter = 0
while(True):
# Submit a tuple in each iteration:
yield {"number": counter, "square": counter * counter}
counter += 1
time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
The output schema and runtime output data are the same as in the example for produce
.
Comparison of produce
and generate_sync
The following table compares the produce
and generate_sync
approaches:
Aspect | produce |
generate_sync |
---|---|---|
Use cases | Async, sync | Sync |
Performance | Slower, due to interthread communication | Faster |
Function signature | def produce(submit, state) | def generate_sync(state) |
Emitting event | submit(…) | yield … |
Code as a Processing and Analytics operator
The Code as a Processing and Analytics operator has both an input parameter and a return value.
Example 1
Goal: You need to return to the output schema two new attributes that are not present in the input schema.
The following code snippet shows an example for two attributes, “friendly_greeting” and “formal_greeting”, in the output schema of the Code operator.
import sys
def process(event):
if 'name' in event:
name = event['name']
else:
name = 'stranger'
friendly_greeting = 'Hey ' + name + '!'
formal_greeting = "Dear ' + name + ','
return
{'friendly_greeting':friendly_greeting,
'formal_greeting':formal_greeting}
The returned attributes ‘friendly_greeting’ and ‘formal_greeting’ must be included in the output schema of the Code operator.
Example 2
Goal: You need to return to the output schema the time of an event that was changed from IST to UTC time zone.
from dateutil.parser import parse
from dateutil import tz
def process(event):
# datetime.parse doesn't understand "IST" as a time zone indicator, so swap for +05:30
dt = parse(event['event_time'].replace('IST','+05:30'))
# convert to UTC time zone too
event['dt_utc'] = dt.astimezone(tz.gettz('UTC'))
return event
For more information about date formats, see Date formats.
Example 3
Goal: You need to return every other tuple to the output schema.
import sys
counter=0
def process(event):
global counter
counter+=1
event['counter'] = counter
if counter%2 is 0:
return None
return event
Example 4
Goal: You want to report the geographic movement of mobile GPS-enabled devices (phones, tablets, cars). This scenario is an example for a use case that requires state
.
def init(state):
# Nothing to initialize, in this example
pass
def process(event, state):
deviceId = event['deviceId'] # Extract the device ID from the event tuple
if deviceId not in state:
# No previous record for this device ID, meaning it's the first event for it
message = "Detected initial location"
else:
record = state[deviceId] # Extract the device record of last (previous) location
directions = [] # Array for gathering directions: north or south, east or west
if event['lat'] > record['last_lat']:
directions.append("north")
elif event['lat'] < record['last_lat']:
directions.append("south")
if event['long'] > record['last_long']:
directions.append("east")
elif event['long'] < record['last_long']:
directions.append("west")
message = "Moved " + "-".join(directions)
output = {
'deviceId': deviceId,
'message': message
}
rememberDeviceLocation(event, state) # For comparing with future locations
return output
Example output
Input event 1 {‘deviceId’: ‘p008’, ‘lat’: ‘32.678611’, ‘long’: ‘35.576944’}
Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Detected initial location’} ***
Input event 2 {‘deviceId’: ‘p008’, ‘lat’: ‘32.67862’, ‘long’: ‘35.576944’}
Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved north’} ***
Input event 3 {‘deviceId’: ‘p008’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}
Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved south-west’} ***
Input event 4 {‘deviceId’: ‘x123’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}
Resulting output {‘deviceId’: ‘x123’, ‘message’: ‘Detected initial location’}
Example 5
Goal: Watson IoT operator ingests electricity usage readings from smart meters to bill users. You need to take that data and implement time-based billing. You also want to send an email alert to customers whose usage for the current month is high.
Watch the video Use Python code in a streams flow.
Code as a Target operator
The Code as Target operator allows you to write Python code in a target node.
The following code shows an example:
# process() function will be invoked on every event tuple
# @event a Python dictionary object representing the input event tuple as defined by the input schema
# @state a Python dictionary object for keeping state over subsequent function calls
def process(event, state):
# Do something with the event, for example use a complex filtering condition and send the event to a database.
pass