Creating a Kafka channel for publishing MDM data

Before you can synchronize MDM data using the Kafka Processor, you must create and enable a channel for publishing messages to Kafka topics.

About this task

The InfoSphere® MDM Notification Framework uses the Apache Kafka API to enable publish/subscribe and point-to-point domains to send messages to listener applications or middleware. The framework provides a mechanism that enables you to define your own notifications with customized content.

The nt13 notification type exists in the NOTIFICATIONTYPE table for publishing InfoSphere MDM messages into Kafka entity topics.

You must create and enable a Kafka channel for publishing messages to Kafka topics.

Procedure

  1. Enable the notification framework by running the following query.
    update configelement set value='true', LAST_UPDATE_DT=CURRENT_TIMESTAMP where name='/IBM/DWLCommonServices/Notifications/enabled'
  2. Enable Kafka as a messaging application for InfoSphere MDM by running the following database scripts on the MDM database.
    1. update NOTIFICATIONTYPE set expiry_dt = null, last_update_dt= current_timestamp where NOTIF_TYPE = <'nt13'>;
    2. update notifchannel set expiry_dt = null, last_update_dt= current_timestamp where channel_id = <15>;
    3. update JMSChannel set expiry_dt =  null, last_update_dt= current_timestamp where channel_id = <15>;
    4. Optionally, if you want to update the default entity ID topic in the JMSChannel table (com.ibm.mdm.runtime.entityid), run the following query.
      update jmschannel set JMS_DESTINATION='kafka/com.ibm.mdm.runtime.entityid’, LAST_UPDATE_DT=CURRENT_TIMESTAMP where channel_id=<15>;
  3. Define the Kafka-specific properties in the WebSphere Application Server environment resource provider (Kafka resource environment provider).
    A default Kafka resource environment is created on WebSphere Application Server during InfoSphere MDM installation.
    Note: For information on how to create a custom Kafka resource environment provider, see the Apache Kafka documentation.

    The KafkaResourceEnvironmentProvider setting is in the WebSphere Application Server Integration Solutions console (admin console) under Resources > Resource Environment > Resource environment providers > KafkaResourceEnvironmentProvider > Resource environment entries > KafkaResourceReference > Custom properties.

    Tip: You can define new Kafka connection properties as required by your runtime topology. Be sure to restart the server after editing the properties.
    There are two main configuration scenarios for the Kafka resource environment provider:
    • SSL security is enabled. This scenario requires certain properties to be defined in the Kafka resource environment provider. Additionally, the corresponding certificates must be installed on the server.
    • SSL security is not enabled. This scenario requires fewer properties to be defined in the Kafka resource provider.
    If SSL security is enabled:
    1. Create and install certificates on the given server, as appropriate for your SSL security configuration.
    2. Define the following key-value pairs for the Kafka environment resource provider. Provide values that match your particular deployment specifications.
      Table 1. Environment resource properties with SSL enabled
      Key Values Description
      acks
      • all
      • 1
      • 2
      The number of acknowledgments required from the available Kafka brokers and servers.
      enableSSLMessaging
      • true
      • false
      Whether SSL messaging is enabled (true) or disabled (false).
      security.protocol SASL_SSL Set the security protocol to use with this deployment to SASL.
      ssl.truststore.location /opt/IBM/kafka_211/bin/kafka.client.truststore.jks The location of the SSL trust store.
      ssl.truststore.password <trust store password> The SSL trust store password.
      ssl.keystore.location <path to SSL key store> The location of the SSL key store file.
      ssl.keystore.password <key store password> The SSL key store password.
      ssl.key.password <key password> The password of the private key in the key store file.
      sasl.mechanism PLAIN Set the SASL mechanism type to PLAIN.
      bootstrap.servers <host name or IP address>:<port>

      For example: localhost:9092

      The Kafka server/broker for this deployment. If multiple servers/brokers are required, provide the host/port pairs as a comma-separated list.

      If the Kafka servers are running in the same system where the MDM instance of WebSphere Application Server is running, then use localhost as the host name.

      key.serializer com.ibm.mdm.common.kafka.RuntimeJacksonSerializer The name of the runtime key serializer for this deployment.
      value.serializer com.ibm.mdm.common.kafka.RuntimeJacksonSerializer The name of the runtime value serializer for this deployment.
      syncMode
      • sync
      • async
      Defines whether the Kafka brokers/servers publish messages synchronously or asynchronously.

      If the value is sync, then InfoSphere MDM requests are handled synchronously and will wait for the Kafka server to publish the message to the Kafka topic. If any errors occur during this time, then the request fails and the log is updated.

      If the value is async, then InfoSphere MDM requests are handled asynchronously and will not wait for the response. If any errors occur, the log will be updated, but the MDM request does not return a failure message.

    If SSL security is not enabled:
    Define the following key-value pairs for the Kafka environment resource provider. Provide values that match your particular deployment specifications.
    Table 2. Environment resource properties with SSL disabled
    Key Values Description
    acks
    • all
    • 1
    • 2
    The number of acknowledgements required from the available Kafka brokers and servers.
    enableSSLMessaging
    • true
    • false
    Whether SSL messaging is enabled (true) or disabled (false).
    bootstrap.servers <host name or IP address>:<port>

    For example: localhost:9092

    The Kafka server/broker for this deployment. If multiple servers/brokers are required, provide the host/port pairs as a comma-separated list.

    If the Kafka servers are running in the same system where the MDM instance of WebSphere Application Server is running, then use localhost as the host name.

    key.serializer com.ibm.mdm.common.kafka.RuntimeJacksonSerializer The name of the runtime key serializer for this deployment.
    value.serializer com.ibm.mdm.common.kafka.RuntimeJacksonSerializer The name of the runtime value serializer for this deployment.
    syncMode
    • sync
    • async
    Defines whether the Kafka brokers/servers publish messages synchronously or asynchronously.

    If the value is sync, then InfoSphere MDM requests are handled synchronously and will wait for the Kafka server to publish the message to the Kafka topic. If any errors occur during this time, then the request fails and the log is updated.

    If the value is async, then InfoSphere MDM requests are handled asynchronously and will not wait for the response. If any errors occur, the log will be updated, but the MDM request does not return a failure message.

  4. Verify that the shared library for the Kafka channel is properly defined.
    1. Open the WebSphere Application Server Integrated Solutions console (admin console).
    2. Navigate to Environment > Shared library > kafkaConnector.
      The JAR file defined in the classpath contains two classes, KafkaConfigFactory.class and KafkaConfig.class. The Kafka channel uses these classes to pick up the configuration defined in the Kafka resource environment provider, and uses it for Kafka notifications.
    3. Validate this configuration from the admin console by navigating to Resources > Resource environment entries > KafkaResourceReference, and checking the value of the Referenceables field.
  5. Test the Kafka channel's ability to publish InfoSphere MDM messages to an entity topic.

    Test the end-to-end flow for one InfoSphere MDM transaction such as addPerson.

    1. Ensure that the required database queries run properly, the Kafka resource environment provider is present, and all of the mandatory properties are defined with valid values.
    2. Ensure that the Zookeeper server is up and running, and all Kafka brokers are up and running.
    3. Run the following commands, for example, to enable Kafka message broadcasts for party entities:
      INSERT INTO CDCONDITIONVALTP VALUES (600,3,'addPerson','action type addPerson’, CURRENT_TIMESTAMP);
      INSERT INTO EXTENSIONSET VALUES (500,'AddKafkaBroadcastExt for addParty','Broadcast add transaction','com.ibm.mdm.common.coreextensionset.AddKafkaBroadcastExt',null,1,'N',2,'N',1,CURRENT_TIMESTAMP,null);
      INSERT INTO EXTSETCONDVAL VALUES (550,21,500, CURRENT_TIMESTAMP, NULL);
      INSERT INTO EXTSETCONDVAL VALUES (551,600,500, CURRENT_TIMESTAMP, NULL);
      

      These commands enable AddKafkaBroadcastExt to create and publish message on party persistence truncations. The query shows an example of how the Kafka message broadcast is enabled for the addPerson transaction.

      Tip: Using similar queries, you can enable broadcasts for other party related transactions such as updateParty, addPersonName, addOrganizationName, addPartyIdentification, addPartyContactMethod, addPartyAddress, addAddress, updatePersonName, updateOrganizationName, updatePartyIdentification, updatePartyContactMethod, updatePartyAddress, correctAddress, correctPartyAddress, inactivateParty, updatePartyPendingCDCRequest, and updatePersonName.
    4. Use a REST client to submit the request. In the URL, provide the MDM REST end point, such as
      http://<MDM host or IP>:<port>/com.ibm.mdm.server.ws.restful/resources/MDMWSRESTful
      Note: In the WebSphere Application Server Integrated Solutions console (admin console), under the ports link, ensure that WC_defaulthost is set.
    5. Provide proper authentication in the REST client.
    6. Send a PUT request with the addPerson transaction request.
    7. Verify that the response returns with a valid entity ID. Compare it to the entity ID value in the entity id topic.

What to do next

If you encounter problems with the Kafka channel, review the following troubleshooting tips:
  • Ensure that the Zookeeper and Kafka servers are up and running.
  • Ensure that the Kafka resource environment provider has the correct host name and port number for the Kafka server. This value is held in the boostrap.servers property.
  • Ensure that the SSL security settings are correct:
    • If SSL security is enabled, then ensure that the necessary certificates are in place and the mandatory properties are all configured with valid values (as described earlier in this topic).
    • If SSL security is not enabled, then ensure that the mandatory properties are configured with valid values (as described earlier in this topic).
  • Ensure that the entity topic has been created with the correct configuration, including replication factor and partitions. For details on creating topics, see Synchronizing MDM data at runtime.
  • Ensure that the shared library is in place. Check this in the WebSphere Application Server Integrated Solutions console (admin console), under Environment > Shared library.
  • For information about specific Kafka errors, see the Apache Kafka documentation.