Package com.ibm.streams.operator.control

Java primitive operator interface to the Job Control Plane.

See: Description

Package com.ibm.streams.operator.control Description

Java primitive operator interface to the Job Control Plane. The Job Control Plane supports the exchange of control or management information between operators in an SPL application. The Job Control Plane is a job scoped JMX (Java Management Extensions) MBean server. Java primitive operators can create management beans, known as MBeans, in this server to exchange control information. Once an operator creates an MBean, other operators and external JMX clients, can interact with it through its attributes, operations and notifications.

An MBean is defined by a Java interface that provides the definitions of its attributes and operations. MBeans are fully described in the Javadoc overview for the javax.management package provided by the Java platform.

Operator Use of MBeans

MBeans created by operators are used for two purposes:
  • External management and monitoring of an application.
  • Exchange of control information between operators.
It is recommended that management beans are not used to for application data, such as storing data arriving in streams.

Connecting an Operator to the Job Control Plane

A Java primitive operator connects to the JCP using the ControlPlaneContext.connect(Controllable) method and the Controllable interface. In the setup() method, the primitive operator use the Job Control Plane MBean server through the passed in MBeanServerConnection reference. Typically operators create management beans and add notification handlers against management beans created by the Job Control Plane or other operators. The management beans are typically specific to the operator or the application, but may be any compliant management bean.

External Management and Monitoring of an Application

To allow external management an operator implements a management bean that provides control over the operator's or the application's behavior. This bean will typically have attributes or operations that external JMX clients can modify or invoke, which then result in notifications. The operator creates an instance of the management bean in the Job Control Plane and handles the notifications to control the operator's behavior. For example a switch operator could create a FlowMXBean that allows external control of if the switch is open or closed.

For monitoring, the operator updates the MBean when its state changes, by updating attributes or invoking operator to reflect state changes. For example an operator that self-selected an analysis algorithm based upon various factors (such as load) might create an MBean that indicated the current algorithm. When the operator decided to change its algorithm, it would issue an update to the MBean to indicate the new algorithm.
The MBean may also issue a notification when its state is updated, to allow any monitoring application to react to the operator's state change.

Exchange of Control Information Between Operators

Operators within a job may exchange information using MBeans instead of using streams as a control mechanism. Some example uses are:
  • An MBean issues a notification when invoked by an operator that is overloaded. This indicates to upstream operators that they need to produce less data by increasing the threshold for data of interest.
  • A set of coordinating source operators only start submitting tuples when they have all been correctly initialized. Since operators may be distributed across multiple hosts an MBean is used to provide a check-in location and issues a notification when all expected operators have checked-in. This allows a stateful operator to be pre-loaded from an input stream, such as a lookup operator to be preloaded from a database, and only accept tuples for lookup when its lookup table is fully loaded.

Tips on using the Job Control Plane

Creating MBeans

If a single operator is responsible for creating an MBean then the code will usually check to see if the MBean is already registered. This is to support the case where the operator restarts after a failure, leaving the MBean in the Job Control Plane.
  
  public void setup(MBeanServerConnection mbs, OperatorContext context) throws Exception {
      ObjectName name = ...;
      // Create the instance of com.example.MyOp.MyControlMXBean but
      // only if it is not already registered
      if (!mbs.isRegistered(name))
          mbs.createMBean("com.example.MyOp.MyControl", name);
  }
  
 

In some cases multiple cooperating operators require a single MBean for coordination. Since the order of initialization is not defined across operators and may be asynchronous in a distributed application, one approach is for all operators to try and create the MBean with only the first being successful. For example:

  
  public void setup(MBeanServerConnection mbs, OperatorContext context) throws Exception {
      ObjectName name = ...;
      // Create the instance of com.example.MyOp.CoordinateMXBean but
      // only if it is not already registered
      if (!mbs.isRegistered(name)) {
          // may still fail as multiple operators may see it
          // as not registered and then go on to try and register it.
          try {
              mbs.createMBean("com.example.MyOp.Coordinate", name);
          } catch (InstanceAlreadyExistsException e) {
              // another operator managed to create it first
              // that is ok, just use that one.
          }
          
          // Use the CoordinateMXBean through a proxy
          CoordinateMXBean cm = JMX.newProxy(mbs, name, CoordinateMXBean.class);
          ...
      }
  }
  
 

Since:
InfoSphere® Streams Version 4.0