Operator InetSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streamsx.inet/op$com.ibm.streamsx.inet$InetSource.svg

The InetSource operator periodically retrieves data from network servers and generates a stream from those contents. This operator can also be used to stream the contents of local files. The InetSource operator shares some characteristics with the FileSource operator from the IBM Streams Processing Language (SPL) standard toolkit, but it provides more functions. The two operators differ in the following ways:

  • The FileSource operator reads from local files. In addition to reading local files, the InetSource operator supports remote reading of files by using HTTP, HTTPS, and FTP.
  • By default, the FileSource operator reads its target input only once. By default, the InetSource operator periodically checks the target files for updates on a user-controllable schedule. In addition, you can choose to use the InetSource operator to either stream out of the entire target file contents on each read, or stream out only the additions to the contents of the target file since the previous read.

  • The FileSource operator supports a single target for reading, while the InetSource operator can support multiple targets. The URI of each target can use any of the supported protocols (HTTP, HTTPS, FTP, FTPS, or FILE) and a mix of these protocols. An example of multi-target reading is in the WeatherConditions.spl file within the Commodity Purchasing Sample Application, where 24 HTTP-based URLs are simultaneously monitored for changes.

The data retrieved is implicitly assigned to the first output attribute that does not have explicit output assignments. The data attribute must be of type rstring, list<rstring>, blob, or xml:

  • When the data attribute is of type rstring or list<rstring>, the retrieved data is assumed to contain text: the operator parses the data into lines and assigns them to the attribute in one or more output tuples, according to the emitTuplePerXxxx parameters.
  • When the data attribute is of type blob or xml, all of the retrieved data is assigned to the attribute in one output tuple, and the emitTuplePerXxxx parameters are ignored. Note that if the attribute is of type xml and the retrieved data contains invalid XML, an error is logged and no output is produced.

Other output attributes may be assigned values explicitly using the operator's output assignment functions. These functions assign the requested and responding URLs, the response code, the data type, and the elapsed time for fetching the data.

The output stream of the InetSource operator can be fed into a Functor operator with custom logic to parse the data into any number of SPL types. For more information and examples, see the WeatherConditions.spl and GetHeadlines.spl sample applications.

By default, there is no time limit for fetching data from remote servers; servers that accept a request but do not return a response can block the operator indefinitely. Optionally, a time limit for fetching data can be specified. If the time limit is exceeded, the fetch is canceled, an error is logged, and no output is produced.

Behavior in a consistent region

This operator cannot be used in a consistent region.

Note: The InetSource operator uses the open source libcurl library to enable data transfer for various protocols, including HTTP, HTTPS, FTP, and FTPS. Proxy support can be enabled by configuring the http_proxy, HTTPS_PROXY, FTP_PROXY, and ALL_PROXY environment variables as described on the curl man page at the following link: http://curl.haxx.se/docs/manpage.html.

Examples

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 15 parameters.

Required: URIList

Optional: doNotStreamInitialFetch, emitTuplePerFetch, emitTuplePerRecordCount, emitTuplePerURI, fetchInterval, ignoreURIcheckException, incrementalFetch, initDelay, inputLinesPerRecord, intraRecordPadValue, iterations, punctPerFetch, timeout, unconditionalFetch

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
InetSourceOutputAssignments
rstring TargetURL()

This function assigns the URL that was requested when fetching the data assigned to the data attribute. This is one of the URLs specified in the 'URLList' parameter.

rstring EffectiveURL()

This function assigns the URL that responded when fetching the data assigned to the data attribute. This may be one of the URLs specified in the 'URLList' parameter, or it may be different if the requested URL was redirected elsewhere.

rstring ContentType()

This function assigns the value of the 'ContentType' header from an HTTP response, if the data fetched was an HTTP response and a 'ContentType' header was inclued, or else an empty string if not.

int32 ResponseCode()

This function assigns the value of the response code.

float64 FetchTime()

This function assigns the elapsed time the fetch took to complete, in seconds

<any T> T copy()

Ports (0)

Properties

Parameters

This operator supports 15 parameters.

Required: URIList

Optional: doNotStreamInitialFetch, emitTuplePerFetch, emitTuplePerRecordCount, emitTuplePerURI, fetchInterval, ignoreURIcheckException, incrementalFetch, initDelay, inputLinesPerRecord, intraRecordPadValue, iterations, punctPerFetch, timeout, unconditionalFetch

URIList

This parameter specifies a comma-separated list of quoted strings that contain Uniform Resource Identifiers (URIs) from which you want to retrieve data. A fetch request is sent to each URI in the list and processing occurs on any data that is returned. Each URI in the list must include the protocol to be used when making a retrieval request to that URI; supported protocols include HTTP, HTTPS, FTP, FTPS, and file. For the file protocol, there is support for accessing the input file by using either absolute paths or relative paths. Absolute paths must be preceded by a slash (/), denoting the top-most directory where the file is located. Relative paths are rooted at the data directory which is created automatically by the SPL compiler under the current application directory. Thus, an absolute path access will result in four slashes (////) after the file protocol, that is, file:////, while a relative path access will result in three slashes (///) after the file protocol, that is,file:///. If there is more than one URI in the list, each URI can use any of the supported protocols. This parameter is required and must have at least one quoted URI rstring in its value list.

Properties

doNotStreamInitialFetch

By default, the InetSource operator will start streaming out the contents of its target URI(s) on the very first fetch interval. If you want to suppress the streaming out of the first fetch results, use the doNotStreamInitialFetch parameter. This modifier is especially useful when monitoring the state of files that might have leftover data in them from a previous application run, where you do not want to reprocess that data again. Note that this modifier is not very useful unless you also specify the incrementalFetch parameter with it. The effect of specifying both modifiers is to instruct the InetSource operator to load its cache on the initial fetch, then start streaming out additions to the target URIs on subsequent fetches. This parameter is optional; if present, it must have exactly one value that is a boolean constant.

Properties

emitTuplePerFetch

This parameter controls how frequently a tuple is emitted from the InetSource operator. It is mutually exclusive with the emitTuplePerURI and emitTuplePerRecordCount parameters. When this parameter is specified, the output schema must be of type list<rstring> because the data from each fetch is contained in an element of the attribute. This parameter is optional; if present, it must have exactly one value that is a boolean constant.

Properties

emitTuplePerRecordCount

This parameter controls how frequently a tuple is emitted from the InetSource operator. It is mutually exclusive with the emitTuplePerFetch and emitTuplePerURI paramters. When this parameter is specified, a tuple is emitted each time the specified number of records are received. The output schema must be of type list<rstring> because string data that exceeds the SPL string length limit is broken up into multiple elements in the output list. By default, a line is a record. If you change the inputLinesPerRecord parameter, this changes. The emit parameters control how those records are added to the output tuple. If inputLinesPerRecord is higher than the number of lines read, then you only get as many lines as were read. With multiple URIs, records never span content from URIs, however the emit parameters control how the records are put into tuples and when tuples are submitted (as described in each emit parameter section). This parameter is optional; if present, it must have exactly one value of type uint32 that is an expression which can be evaluated at compile time.

Properties

emitTuplePerURI

This parameter controls how frequently a tuple is emitted from the InetSource operator. It is mutually exclusive with the emitTuplePerFetch and emitTuplePerRecordCount parameters. When this parameter is specified, the output schema must be of type list<rstring> because the data from each URI is contained in an element of the attribute. This parameter is optional; if present, it must have exactly one value that is a boolean constant.

Properties

fetchInterval

One of the important differences between the InetSource and the SPL Standard Toolkit source operators is the ability of the InetSource operator to periodically refetch the data from the target URI or list of URIs. By default, the InetSource operator reexamines each URI in the target every ten minutes (600 seconds), but this interval can be customized by using the fetchInterval parameter. Note that the interval begins immediately after the final piece of data from a given fetch is processed, so the next fetch will begin at (processing_time + fetchInterval) after the start of the current fetch. This parameter is optional; if present, it must have exactly one value of type uint32 that is an expression which can be evaluated at compile time.

Properties

ignoreURIcheckException

This optional parameter has the default value false. In the default case the supplied URI value is checked against RFC 2396. If the validation fails, an dynamically supplied URI is ignored and a statically supplied URI stops execution. If this parameter is set to true the result of the URI check is ignored and the URI is passed unchecked. In this case no canonical path extension and no adaptation to the data directory is available for file schema.

Properties

incrementalFetch

By default, the InetSource operator streams out the entire contents of each URI (remote server, or local file) as it exists at the time of each fetch, unless the URI's content timestamp is unchanged from the previous fetch (in which case the InetSource operator does not stream out any content). If you want the InetSource operator to stream out the URI contents starting at the first line that is different since the previous fetch, use the incrementalFetch parameter. This parameter is optional; if present, it must have exactly one value that is a boolean constant.

Note: The InetSource operator uses the open-source libcurl package for performing its content retrievals. The precise mechanism that libcurl uses to determine whether the timestamp on the target content has changed, is not documented in the libcurl API specification. For reference, see http://curl.haxx.se/libcurl/c/curl_easy_setopt.html#CURLOPTTIMECONDITION.

Properties

initDelay

This parameter specifies an initial processing delay, in seconds, before the InetSource operator begins emitting tuples. This parameter is optional; if present, it must have exactly one value of type float64 that is an expression which can be evaluated at operator startup time.

Properties

inputLinesPerRecord

The InetSource operator starts its processing by formatting the data that is retrieved from the source URI into individual records. Each record is then copied into the attribute of the output stream tuple (if the attribute type is an rstring) or appended as a member to the end of the attribute (if the attribute type is a list<rstring> data type). By default, each line in the source URI is considered to be a single record and is carried into the tuple attribute directly. However, it is also fairly common for the data in the source URI to be laid out as multiple lines per logical record. For example, each logical record might be formatted within the raw input data as two non-blank lines followed by a blank line, for a total record length of three consecutive lines of text within the raw input data. For such cases, the inputLinesPerRecord modifier is used. If this modifier is specified, the operator reads n lines of the input file and concatenates them into a single logical record, placing a blank character as a pad value between them. Note that the pad character can be changed to a different value by using the intraRecordPadValue parameter, which is specified by using a double-quoted string of zero or more characters. Both parameters are optional; if present, inputLinesPerRecord must have exactly one value of type uint32 that is an expression which can be evaluated at compile time; and intraRecordPadValue must have exactly one value of type rstring that is an expression which can be evaluated at compile time.

Properties

intraRecordPadValue

The InetSource operator starts its processing by formatting the data that is retrieved from the source URI into individual records. Each record is then copied into the attribute of the output stream tuple (if the attribute type is an rstring) or appended as a member to the end of the attribute (if the attribute type is a list<rstring> data type). By default, each line in the source URI is considered to be a single record and is carried into the tuple attribute directly. However, it is also fairly common for the data in the source URI to be laid out as multiple lines per logical record. For example, each logical record might be formatted within the raw input data as two non-blank lines followed by a blank line, for a total record length of three consecutive lines of text within the raw input data. For such cases, the inputLinesPerRecord modifier is used. If this modifier is specified, the operator reads n lines of the input file and concatenates them into a single logical record, placing a blank character as a pad value between them. Note that the pad character can be changed to a different value by using the intraRecordPadValue parameter, which is specified by using a double-quoted string of zero or more characters. Both parameters are optional; if present, inputLinesPerRecord must have exactly one value of type uint32 that is an expression which can be evaluated at compile time; and intraRecordPadValue must have exactly one value of type rstring that is an expression which can be evaluated at compile time.

Properties

iterations

This parameter specifies how many times the operator will fetch data from the URLs specified by the URLList parameter. If the iterations parameter is not specified, or if a value of zero is specified, the operator will iterate endlessly, until the PE terminates. In any case, the operator pauses after each fetch, as specified by the fetchInterval parameter, except the last one.

Properties

punctPerFetch

This parameter determines whether window punctuation is emitted after each fetch. A fetch consists of retrieving data from all of the URIs in the list. One window marker is submitted after each fetch, including a fetch that returns no data (or no updated data, depending on the parameters). So with 3 URIs and a period of 10 minutes, you would get a window marker every 10 minutes or so, depending on how long it takes to fetch, even if no output tuple is submitted as a result of that 10-minute fetch cycle. This parameter is optional; if present, it must have exactly one value that is a boolean constant.

Properties

timeout

This parameter specifies a timeout, in seconds, for fetching data from the URLs specified by the 'URLList' parameter. If not specified, no timeout is set, and remote servers that accept requests but do not respond quickly will block the operator.

Properties

unconditionalFetch

As mentioned in the previous paragraph, the default behavior of the InetSource operator is to query the target device for the last-modified timestamp on each URI in the URIList parameter. The last-modified timestamp is used to determine whether to retrieve and stream out the data for that URI during the current retrieval cycle. If you want the InetSource operator to disregard this timestamp and stream out the entire contents on every retrieval, use the unconditionalFetch modifier. This parameter is optional; if present, it must have exactly one value that is a boolean constant.

Properties

Code Templates

InetSource

stream<${streamType}> ${streamName} = InetSource() {
            param
                URIList : ["${urilistitem}"];
        }
      

Libraries

curl-library
Library Name: curl
messages
Include Path: ../../impl/include