Sample

The following sample shows how to integrate the StructureParse operator with an spl.adapter::FileSource operator. The StructureParse operator generates data and metric tuples.


namespace test;
use com.ibm.streams.teda.parser.binary::StructureParse;
composite Main
{
	type
		Data = tuple
		<
			uint8 RECORD_TYPE,
			rstring ORIG_ADDR,
			rstring DEST_ADDR,
			rstring SMS_CENTRE,
			rstring INCOMING_TIME,
			rstring IMSI_ADDR,
			rstring filename,
			int64 tupleNo
		>;
		Metric = tuple
		<
			rstring filename,
			// operator lifetime metrics
			uint64 nTuplesReceivedTotal,
			uint64 nTuplesSentTotal,
			uint64 nBytesReceivedTotal,
			uint64 nBytesDroppedTotal,
			// metrics, which are reset after each metrics tuple
			uint64 nTuplesReceived,
			uint64 nTuplesSent,
			uint64 nBytesReceived,
			uint64 nBytesDropped
		>;
	graph
		stream<rstring filename> Files as O = DirectoryScan()
		{
			param directory: "input"; pattern: "^.*\\.dat$";
		}
		stream<rstring filename, int64 tupleNo, blob payload> SourceData as O = FileSource(Files as I)
		{
			param format: block; blockSize: 1024u * 1024u;
			output O: filename = FileName(), tupleNo = TupleNumber();
		}
		// Two output ports.
		(stream<Data> Output as O; stream<Metric> Metrics as M) = StructureParse(SourceData as I)
		{
			param
				structure: "etc/structure.xml";
				mapping: "etc/mapping.xml";
			output M:
				nTuplesReceivedTotal = nTuplesReceivedTotal(),
				nTuplesSentTotal = nTuplesSentTotal(),
				nBytesReceivedTotal = nBytesReceivedTotal(),
				nBytesDroppedTotal = nBytesDroppedTotal(),
				nTuplesReceived = nTuplesReceived(),
				nTuplesSent = nTuplesSent(),
				nBytesReceived = nBytesReceived(),
				nBytesDropped = nBytesDropped()
				;
		}
		() as OutputSink = FileSink(Output as I)
		{
			param file: "output.txt"; format: txt; flush: 1u;
		}
		() as MetricsSink = FileSink(Metrics as I)
		{ 
			param file: "metrics.txt"; format: txt; flush: 1u;
		}
}