Selectively retrieving task results
Selective task output retrieval enables a IBM® Spectrum Symphony client to request task results by specifying task IDs, in order to control the amount of memory consumed on the client host.
Limitations
- This feature only applies to clients that receive results synchronously.
- This feature is only available if the session is created with the FetchResultsDirectly session flag.
- COM API is not supported.
- Multi-threaded use of this feature is not recommended unless the client implements a synchronization scheme to prevent overlapping criteria from being specified in concurrently executing threads; otherwise the client may hang or falsely validate the task output retrieval.
About selective task result retrieval
Before learning how this feature can impact memory management on the client host, it might be helpful to know how results are processed in IBM Spectrum Symphony's default model.
- Default task output retrieval
- In the default model, the IBM Spectrum Symphony Session Manager sends results to the client as soon as they are available. If the client makes a request for results before they arrive, the client code is blocked until the results arrive or the wait period expires. If the results arrive before the client makes a request, the results are queued (cached) in the local session's result set. Since the delivery of results from the Session Manager is not in sync with the retrieval of results, it is possible that too many results could be queued in the API layer; this could cause the client to run out of memory and eventually terminate abnormally. One possible solution is to have the client request a specific number of results directly from the Session Manager; this feature is explained in On-demand results retrieval. This can help to manage memory usage on the client host but the requested results may not be the ones the client is really interested in.
IBM Spectrum Symphonyoffers another option, which enables a client to request specific tasks results via selection criteria. Using this feature, the client gets only the results it wants.
- Selective task output retrieval
- The purpose of selective task output retrieval is twofold: it controls the retrieval of results so that the client is not overloaded with results and runs out of memory, and it allows the client to retrieve only the results it is interested in. This is achieved by having the client retrieve data directly from the Session Manager instead of from the local result cache. To better understand the interaction between the client and the Session Manager, look at the sequence of events:
- Session Manager gets the result from the service and defers dispatching it to the client.
- The client makes an API call to selectively retrieve task results from the Session Manager by supplying the task IDs for the results it wants to process.
- In the non-blocking model (zero timeout), the Session Manager returns all the results (that match the task IDs) available at the time of the API call. If no matching task IDs are found, no results are returned. In the blocking model (infinite timeout), the API call does not return until all the results with matching task IDs are available.
Note: Task results returned to the client are not sorted. For example, if the client retrieves the output of tasks with IDs 1, 2, 3, the results may be returned in a different order.
Client API
Selective task results retrieval can only be achieved through the client API. To use this feature, the client application must perform the following sequence:- Create a session using the FetchResultsDirectly flag to inform the API of the client's intent to retrieve results directly from the IBM Spectrum Symphony Session Manager.
- Send the tasks.
- Retrieve results by specifying the associated task IDs in a selection filter.
- Code samples for the blocking model
- The following code samples demonstrate selective task result retrieval for the blocking model in each supported language. Refer to API reference for more information.
//C++ sample try { ...... // Set up session creation attributes and create the session attributes.setSessionFlags(Session::ReceiveSync|Session::FetchResultsDirectly); SessionPtr sesPtr = conPtr->createSession(attributes); //Create a task ID filter object TaskIdFilter filter; // Now we will send some messages to our service for (int taskCount = 0; taskCount < tasksToSend; taskCount++) { ...... // send tasks TaskInputHandlePtr input = sesPtr->sendTaskInput(attrTask); // add specific task ID into task ID filter if (taskCount == 5) { filter.addId(input->getId()); } } //Block until all filtered task outputs are ready. EnumItemsPtr enumOutput = sesPtr->fetchTaskOutput(filter); //handle the task outputs ...... }//Java sample try { ...... // Set up session creation attributes and create the session attributes.setSessionFlags(Session.RECEIVE_SYNC | Session.FETCH_RESULTS_DIRECTLY); session = connection.createSession(attributes); //Create a task ID filter object TaskIdFilter filter = new TaskIdFilter(); // Now we will send some messages to our service for (int taskCount = 0; taskCount < tasksToSend; taskCount++) { ...... // send tasks TaskInputHandle input = session.sendTaskInput(attrTask); // add specific task ID into task ID filter if (taskCount == 5) { filter.addId(input.getId()); } } //Block until all filtered task outputs are ready. EnumItems enumOutput = session.fetchTaskOutput(filter); //handle the task outputs ...... }//C# sample try { ...... // Set up session creation attributes and create the session attributes.SessionFlags = SessionFlags.ReceiveSync | SessionFlags.FetchResultsDirectly; session = connection.CreateSession(attributes); //Create a task ID filter object TaskIdFilter filter = new TaskIdFilter(); // Now we will send some messages to our service for (int taskCount = 0; taskCount < tasksToSend; taskCount++) { ...... // send tasks TaskInputHandle input = session.SendTaskInput(attrTask); // add specific task ID into task ID filter if (taskCount == 5) { filter.AddId(input.Id); } } //Block until all filtered task outputs are ready. EnumItems enumOutput = session.FetchTaskOutput(filter); //handle the task outputs ......} - Code samples for the non-blocking model
- The following code samples demonstrate selective task result retrieval for the non-blocking model in each supported language. Refer to API reference for more information.
//C++ sample try { ...... // Set up session creation attributes and create the session attributes.setSessionFlags(Session::ReceiveSync|Session::FetchResultsDirectly); SessionPtr sesPtr = conPtr->createSession(attributes); //Create a task ID filter object TaskIdFilter filter; // Now we will send some messages to our service for (int taskCount = 0; taskCount < tasksToSend; taskCount++) { ...... // send tasks TaskInputHandlePtr input = sesPtr->sendTaskInput(attrTask); // add specific task ID into task ID filter if (taskCount == 5) { filter.addId(input->getId()); } } while (!filter.isSatisfied()) { EnumItemsPtr enumOutput = sesPtr->fetchTaskOutput(filter, 0 /* non-blocking */); //handle the task output ...... // Since the current thread is not blocked waiting for all results, we can do // something else in between fetch attempts to make use of the current thread. } }//Java sample try { ...... // Set up session creation attributes and create the session attributes.setSessionFlags(Session.RECEIVE_SYNC | Session.FETCH_RESULTS_DIRECTLY); session = connection.createSession(attributes); //Create a task ID filter object TaskIdFilter filter = new TaskIdFilter(); // Now we will send some messages to our service for (int taskCount = 0; taskCount < tasksToSend; taskCount++) { ...... // send tasks TaskInputHandle input = session.sendTaskInput(attrTask); // add specific task ID into task ID filter if (taskCount == 5) { filter.addId(input.getId()); } } while (!filter.isSatisfied()) { EnumItems enumOutput = session.fetchTaskOutput(filter, 0 /* non-blocking */); //handle the task output ...... // Since the current thread is not blocked waiting for all results, we can do // something else in between fetch attempts to make use of the current thread. } }//C# sample try { ...... // Set up session creation attributes and create the session attributes.SessionFlags = SessionFlags.ReceiveSync | SessionFlags.FetchResultsDirectly; session = connection.CreateSession(attributes); //Create a task ID filter object TaskIdFilter filter = new TaskIdFilter(); // Now we will send some messages to our service for (int taskCount = 0; taskCount < tasksToSend; taskCount++) { ...... // send tasks TaskInputHandle input = session.SendTaskInput(attrTask); // add specific task ID into task ID filter if (taskCount == 5) { filter.AddId(input.Id); } } while (!filter.IsSatisfied) { EnumItems enumOutput = session.FetchTaskOutput(filter, 0 /* non-blocking */); //handle the task output ...... // Since the current thread is not blocked waiting for all results, we can do // something else in between fetch attempts to make use of the current thread. } }