Implementing a sink operator using the Java Operator API
A sink operator transmits information or events to an external system, such as a dashboard, web-server, mail-server, or a database. When such a system provides a Java™ API, a sink operator to that system can be implemented in Java.
Before you begin
About this task
PreparedStatement
based on the
tuple's attributes values, and then run the PreparedStatement
to
update the database.A simple model is that the arrival of each
incoming tuple results in data transmitted to the external system
in the Operator.process()
method. Typically this
model might not be sufficient for performance reasons and, instead,
batching must be used so that a batch of information that represents
multiple tuples is transmitted to the external system in a single operation.
For more information about batching, see Batching in a sink operator.
The
general approach for writing a sink operator in Java is described here. Some steps include example
information for the implementation of a sink operator in Java that implements HTTP POST, where incoming
tuples result in HTTP POST operations, such as a web order form. Sample
class com.ibm.streams.operator.samples.sinks.HttpPOST
is
provided.
Procedure
Results
Example
com.ibm.streams.operator.samples.sinks.HttpPOST
shows the logic specific to creating an HTTP POST request from tuples.com.ibm.streams.operator.samples.patterns.TupleConsumer
is an abstract class that demonstrates a flexible batching of tuples that is intended for use as a sink operator.
HttpPost extends TupleConsumer: Starting with HttpPost,
there are two key methods initialize()
and processBatch()
.
The initialize()
method uses the SPL parameter url to
determine the URL for the POST request. A connection attempt is not
made to validate the URL since the only knowledge is that the URL
is suitable for a POST request and a POST request with no data might
have unknown effects on the server.
/**
* URL of the HTTP server we will be posting data to.
*/
private URL url;
/**
* Initialize by setting the URL and the batch size.
*/
@Override
public void initialize(OperatorContext context) throws Exception {
super.initialize(context);
url = getURL();
setBatchSize(batchSize());
}
/**
* Get the URL for the POST requests from the required parameter url.
* Sub-classes may override this to set the URL another way.
*
* @return URL for POST requests
* @throws MalformedURLException
*/
protected URL getURL() throws MalformedURLException {
String urlPath = getOperatorContext().getParameterValues("url").get(0);
return new URL(urlPath);
}
/**
* Get the batch size to use from the parameter batchSize using 1 if that is
* not set. Sub-classes may override this to set the batchSize another way.
*
* @return batchSize to use
*/
protected int batchSize() {
List<String> bp = getOperatorContext().getParameterValues("batchSize");
if (bp.isEmpty())
return 1;
return Integer.getInteger(bp.get(0));
}
The processBatch()
method is an abstract
method that the super-class TupleConsumer
requires
and whose purpose is to take a batch of Tuple
objects
and transmit them to the external system. For HttpPost each attribute
in the tuple is converted into a name-value pair with URL encoding.
A single HTTP POST request might contain multiple tuples, with each
name, value pair repeated multiple times.
@Override
protected final boolean processBatch(Queue<BatchedTuple> batch)
throws Exception {
StringBuilder data = new StringBuilder(1024);
for (BatchedTuple item : batch) {
StreamSchema schema = item.getStream().getStreamSchema();
for (Attribute attribute : schema) {
if (data.length() != 0)
data.append('&');
data.append(URLEncoder.encode(attribute.getName(), "UTF-8"));
data.append('=');
data.append(URLEncoder.encode(item.getTuple().getString(
attribute.getName()), "UTF-8"));
}
}
// Send data
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
OutputStreamWriter wr = new OutputStreamWriter(conn.getOutputStream(),
"UTF-8");
wr.write(data.toString());
wr.flush();
// Get the response
BufferedReader rd = new BufferedReader(new InputStreamReader(conn
.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
trace.log(TraceLevel.DEBUG, line);
}
wr.close();
rd.close();
return false;
}
}
TupleConsumer
implements
generic batching of Tuples as follows:- Batch that is defined by number of tuples, including support for a single tuple in the batch.
- Asynchronous processing of the batch so that processing of the batch does not affect the submission of tuples to the operator.
- Optional timeout, so that if a time elapsed since the last tuple arrival, then the batch is processed regardless of how full it is and when it is not empty.
- Option to preserve tuple arrival order of batch processing. If order preservation is not required, then multiple threads might be processing batches asynchronously.
- The ability for the subclass to process partial batches, allowing unprocessed tuples to be processed in a future batch.
- Final punctuation handling ensures that all batches are processed before
they return from
processPunctuation()
.
What to do next
- Study the code that implements HttpPost (
com.ibm.streams.operator.samples.sinks.HttpPOST
andcom.ibm.streams.operator.samples.patterns.TupleConsumer
). - Implement a sink operator in Java.
- Study the code for other sample sink Java operators.