Interface FunctionContext
-
public interface FunctionContextContext for a function executing in a IBM Streams application.
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method and Description voidaddClassLibraries(java.lang.String[] libraries)Add class libraries to the functional class loader.voidcreateCustomMetric(java.lang.String name, java.lang.String description, java.lang.String kind, java.util.function.LongSupplier value)Create a custom metric.intgetChannel()Get the index of the parallel channel the function is on.FunctionContainergetContainer()Get the container hosting a functionjava.util.Set<java.lang.String>getCustomMetricNames()Get the set of custom metric names created in this context.intgetMaxChannels()Get the total number of parallel channels for the parallel region that the function is in.java.util.concurrent.ScheduledExecutorServicegetScheduledExecutorService()Return a scheduler to execute background tasks.java.util.concurrent.ThreadFactorygetThreadFactory()Return a ThreadFactory that can be used by the function with the thread context class loader set correctly.
-
-
-
Method Detail
-
getContainer
FunctionContainer getContainer()
Get the container hosting a function- Returns:
- Container hosting a function.
-
getScheduledExecutorService
java.util.concurrent.ScheduledExecutorService getScheduledExecutorService()
Return a scheduler to execute background tasks. Functions should utilize this service orgetThreadFactory()rather than creating their own threads to ensure that the SPL runtime will wait for any background work before completing an application.The scheduler will be shutdown when the processing element is to be shutdown. Once the scheduler is shutdown no new tasks will be accepted. Existing scheduled tasks will remain in the scheduler's queue but periodic tasks will canceled.
Functions that implementAutoCloseablethat wish to complete any outstanding tasks at close time can callExecutorService.awaitTermination()to wait for outstanding tasks to complete or wait on the specificFuturereference for a task.The returned scheduler service is guaranteed to be an instance of
java.util.concurrent.ScheduledThreadPoolExecutorand initially has this configuration:-
corePoolSizeSet toRuntime.availableProcessors()with a minimum of 2 and maximum of 8. -
allowsCoreThreadTimeOut()set totrue -
keepAliveTimeset to 5 seconds
corePoolSizeis eight, eight threads will only be created if there are eight concurrent tasks scheduled. Threads will be removed if they are not needed for thekeepAliveTimevalue andallowsCoreThreadTimeOut()returnstrue.- Returns:
- Scheduler service that can be used by the function.
- See Also:
- "java.util.concurrent.ExecutorService", "java.util.concurrent.Future", "java.util.concurrent.ScheduledThreadPoolExecutor"
-
-
getThreadFactory
java.util.concurrent.ThreadFactory getThreadFactory()
Return a ThreadFactory that can be used by the function with the thread context class loader set correctly. Functions should utilize the returned factory to create Threads.Threads returned by the ThreadFactory have not been started and are set as daemon threads. Functions may set the threads as non-daemon before starting them. The SPL runtime will wait for non-daemon threads before terminating a processing element in standalone mode.
Any uncaught exception thrown by the
Runnablepassed to theThreadFactory.newThread(Runnable)will cause the processing element containing the function to terminate.The ThreadFactory will be shutdown when the processing element is to be shutdown. Once the ThreadFactory is shutdown a call to
newThread()will return null.- Returns:
- A ThreadFactory that can be used by the function.
-
getChannel
int getChannel()
Get the index of the parallel channel the function is on.If the function is in a parallel region, this method returns a value from 0 to N-1, where N is the
number of channels in the parallel region; otherwise it returns -1.- Returns:
- the index of the parallel channel if the function is executing in a parallel region, or -1 if the function is not executing in a parallel region.
-
getMaxChannels
int getMaxChannels()
Get the total number of parallel channels for the parallel region that the function is in. If the function is not in a parallel region, this method returns 0.- Returns:
- the number of parallel channels for the parallel region that this function is in, or 0 if the function is not in a parallel region.
-
addClassLibraries
void addClassLibraries(java.lang.String[] libraries) throws java.net.MalformedURLExceptionAdd class libraries to the functional class loader. The functional class loader is set as the thread context class loader forthread factory,executorand any method invocation on the function instance.Functions use this method to add class libraries specific to the invocation in a consistent manner. An example is defining the jar files that contain the JDBC driver to be used by the application.
Each element of
librariesis trimmed and then converted into ajava.net.URL. If the element cannot be converted to aURLthen it is assumed to represent a file system path and is converted into anURLrepresenting that path. If the file path is relative the used location is currently undefined, thus use of relative paths are not recommended.
If a file path ends with/*then it is assumed to be a directory and all jar files in the directory with the extension.jaror.JARare added to the function class loader.- Parameters:
libraries- String representations of URLs and file paths to be added into the functional class loader. Ifnullthen no libraries are added to the class loader.- Throws:
java.net.MalformedURLException
-
createCustomMetric
void createCustomMetric(java.lang.String name, java.lang.String description, java.lang.String kind, java.util.function.LongSupplier value)Create a custom metric.
A custom metric allows monitoring of an application through IBM Streams monitoring APIs including Streams console and the REST apis. A metric has a singlelongvalue and has a kind of:counter- A counter metric observes a value that represents a count of an occurrence.gauge- A gauge metric observes a value that is continuously variable with time.time- A time metric represents a point in time. It is recommended that the value represents the number of milliseconds since the 1970/01/01 epoch, i.e. a value consistent withSystem.currentTimeMillis().
The initial value of the metric is set fromvalue.getAsLong()during this method call. Subsequently, periodicallyvalue.getAsLong()will be called to get the current value of the metric so that it can be reported through the monitoring APIs.A lambda expression can be used as the supplier, for example to monitor the length of a queue (or any collection)
itemsa metric can be created fromInitializable.initialize(FunctionContext)as:
The metric will now automatically track the length of the queue (subject to the periodic collection cycle).this.items = new PriorityQueue(); functionContext.createCustomMetric("queuedItems", "Number of queued items.", "gauge", () -> this.items.size());A
java.util.concurrent.atomic.AtomicLongcan be used as a metric's value, for example a counter where no other natural value exists, e.g.:
Subsequently the counter is incremented using:this.nFailedRequests = new AtomicLong(); functionContext.createCustomMetric("nFailedRequests", "Number of failed requests.", "counter", this.nFailedRequests::get);this.nFailedRequests.incrementAndGet();- Parameters:
name- Name of the metric.description- Description of the metric.kind- Kind of the metric.value- function that returns the value of the metric- Throws:
java.lang.IllegalStateException- A metric withnamealready exists orkindis not valid.- Since:
- 1.7
-
getCustomMetricNames
java.util.Set<java.lang.String> getCustomMetricNames()
Get the set of custom metric names created in this context. The set may include additional metrics not created by this function including metric created by the topology framework.- Returns:
- The set of custom metric names created in this context.
- Since:
- 1.7
-
-