Enabling InfoSphere MDM to send notifications to Apache Kafka

As an alternate to the standard Notification Framework, which uses either the default WebSphere JMS or WebSphere MQ for sending notifications, you can enable InfoSphere® MDM to send notifications to Apache Kafka topics.

Before you begin

Important: If you install InfoSphere MDM 11.6.0.11 or above, Kafka is installed as part of the InfoSphere MDM installation.

After installation, you can enable InfoSphere MDM 11.6.0.11 to support more recently released versions of Kafka than the one included in the base InfoSphere MDM installation. To enable support for Kafka 2.6, 2.7, or 2.8, complete some important configuration changes. For more information, see Configuring InfoSphere MDM to support Apache Kafka 2.6, 2.7, or 2.8.

  • Notifications must be enabled in the MDM operational server. To enable notifications, set the value of the /IBM/DWLCommonServices/Notifications/enabled configuration element to true:
    update configelement set value='true',last_update_dt=current_timestamp where name='/IBM/DWLCommonServices/Notifications/enabled';
  • Apache Zookeeper must be running in the InfoSphere MDM environment.

About this task

Restriction: In the current implementation of this feature within InfoSphere MDM, the Apache Kafka listener can only listen to one Kafka topic at a time. To work around this limitation, publish all suspected duplicate-related notifications (nt1,nt2, and nt3) to a single Kafka topic.

Procedure

  1. Update the MDM database to send notifications to Kafka topics.
    The following example commands assume that you want to send all notifications (nt1 to nt11) to Kafka topics and each message will be sent to a different topic.
    Tip: If you prefer, you can configure the database to send all notifications to the same topic instead.
    update notifchannel set implem_classname='com.dwl.base.notification.KafkaChannel',last_update_dt=current_timestamp where channel_id in (1,2,3,4,5,6,7,8,9,10,11);
    
    
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.A1PartySelected' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=1;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.SuspectIdentification' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=2;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.AutoSuspectReIdentification' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=3;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.ManualSuspectEntryAdjustment' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=4;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.ElementChange' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=5;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.DeletePartyHistory' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=6;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.PendingCDCRequest' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=7;
    update jmschannel set jms_destination='com.ibm.mdm.notification.EMTopic' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=8;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.UpdateProductSuspects' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=9;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.DeleteProductSuspects' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=10;
    update jmschannel set jms_destination='com.ibm.mdm.notification.topic.AddProductSuspects' , jms_conn_factory='kafka/KafkaSAMResourceReference' , last_update_dt=current_timestamp where channel_id=11;
    
  2. Create the Kafka topics.
    The following example commands assume that this implementation will use a separate topic for each notification type.
    1. Go to the <KAFKA_HOME_FOLDER>/bin folder.
      Note: The <KAFKA_HOME_FOLDER> is created as part of the InfoSphere MDM installation.
    2. If it has not been done in advance, then define a host name and port number for Zookeeper other than the default (localhost:2181).
    3. Run the following commands to create the Kafka topics:
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.A1PartySelected
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.SuspectIdentification
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.AutoSuspectReIdentification
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.ManualSuspectEntryAdjustment
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.ElementChange
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.DeletePartyHistory
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.PendingCDCRequest
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.EMTopic
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.UpdateProductSuspects
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.DeleteProductSuspects
      kafka-topics.sh --create --zookeeper <zookeeper_host>:<zookeeper_port> --replication-factor 1 --partitions 4 --topic com.ibm.mdm.notification.topic.AddProductSuspects
      
    The MDM database is now configured to send notifications to Kafka topics.

What to do next

When you have enabled InfoSphere MDM to send notifications to Kafka, you must also configure the notification subscribers, such as the IBM Stewardship Center (through IBM Business Process Manager), to receive them. For information about configuring IBM Stewardship Center and BPM to receive notifications through Kafka, see Enabling BPM to receive InfoSphere MDM notifications through Apache Kafka.