Configuring Kafka

This topic provides prerequisites and configuration information to enable Kafka Visibility in Sterling B2B Integrator.

You must install and configure Kafka and Zookeeper cluster externally to enable the Kafka Visibility feature. The number of nodes and configuration for Kafka and Zookeeper cluster is based on the organization load requirements.

Prerequisites

The following are the prerequisite tasks to enable Kafka:
  • Zookeeper cluster installation - To be installed externally on a minimum of 3 nodes. For more information, see ZooKeeper Getting Started Guide.
  • Kafka cluster installation - To be installed externally with a minimum of 3 brokers in the cluster. For more information, see Quickstart.

After the installation of Zookeeper and Kafka cluster, you must create a Kafka Topic, where EDI events are published by Sterling B2B Integrator.

Creating Kafka Topic

You must create a Kafka Topic with the correct replication factor and partition value based on number of nodes in the cluster. The recommended partition value is 20 and above.

Example:
Create a topic:
./kafka-topics.sh --create --bootstrap-server <ip>:<port> --replication-factor 3 --partitions 20 --topic <topic_name>

Verify the topic:
./kafka-topics.sh --list --zookeeper localhost:2181

Configuring

To enable the Kafka Visibility feature, you need to configure the Kafka properties and Kafka brokers defined in the property files:
  • eventsnosql.properties.in - contains list of Kafka visibility properties.
    The following properties are used:
    • visibility.nosql.enabled - set this to true to enable Kafka. The default value is false.
    • topic - name of the topic Sterling B2B Integrator cluster publishes the messages to.
    
    /* Sample eventsnosql.properties.in file */
    
    ## PROPERTY_FILE_NAME
    ## eventnnosql.properties.in (for initialization)
    ## eventnosql.properties (for operations)
    ## PROPERTY_FILE_DESCRIPTION
    ##
    ## PROPERTY_START
    ## PROPERTY_NAME: visibility.nosql.enabled
    ## PROPERTY_TYPE: boolean
    ## PROPERTY_DESCRIPTION
    ## Enable visibility based on external nosql store events. 
    ## True is to enable and false to disable. Default is false
    ## 
    visibility.nosql.enabled=false
    ## PROPERTY_END
    ## PROPERTY_START
    ## PROPERTY_NAME: adapters.enabled
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Comma separated list of adapters to enable nosql visibility
    adapters.enabled=
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: correlation.enabled
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Comma separated list of components whose correlation to enable nosql visibility
    correlation.enabled=EDI_X12,EDI_EDIFACT,CUSTOM
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: messagingsystem.type
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Type of the messaging system. E.g. kafka
    messagingsystem.type=kafka
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: messagingsystem.producerclass
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Fully qualified name of the producer class for configured messagingsystem type
    ## This class should implement the interface INoSqlProducer
    messagingsystem.producerclass=com.sterlingcommerce.woodstock.visibility.KafkaNoSqlProducer
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: num.of.producer
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ##Number of producer to create. Default 1
    num.of.producers=1
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: num.of.sync.producers 
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ##Number of synchronous producer to create. Default 1
    num.of.sync.producers=5
    ## PROPERTY_START
    
    ## PROPERTY_START
    ## PROPERTY_NAME: send.correlation.async 
    ## PROPERTY_TYPE: boolean
    ## PROPERTY_DESCRIPTION
    ##Correlation should be sent using async API true/fals
    send.correlation.async=true
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: events.serialization.format
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## JSON/XML.  Serialization to be used for events class
    events.serialization.format=JSON
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: topic
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Value to be used for defining topics unique to this B2BI cluster
    topic=b2bivisibility
    ## PROPERTY_END
    
  • kafka.properties - contains properties used by Kafka producer when connecting Kafka brokers such as Kafka broker list, key/value serializer, and so on.

    The property bootstrap.servers is used, which provides the initial list of Kafka brokers to connect.

    
    /* Sample kafka.properties file */
    
    ## PROPERTY_FILE_NAME
    ## kafka.properties.in (for initialization)
    ## kafka.properties (for operations)
    ## PROPERTY_FILE_DESCRIPTION
    ## All properties (property names, value types) in this file are specific to Kafa
    ## PROPERTY_START
    ## PROPERTY_NAME: bootstrap.servers
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Comma separated list of initial kafka brokers to connect
    bootstrap.servers=&KAFKA_BROKER_INITIAL_LIST;
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: key.serializer
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Key serializer class to be used
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: value.serializer
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Value serialzer class to be used
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: client.id
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## An id string to pass to the server when making requests. Application name that will be
    ## used by server side for logging
    client.id=B2BEventsVisibility
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: acks
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Producer will wait for message to be acknowledge from broker.
    ## 0 - producer will not wait for acknowledge
    ## 1 - producer will wait for response from leader
    ## all (-1) - producer will wait till messages reaches all brokers in the cluster
    acks=1
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: retries
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## number of times the producer should retry sending a message before throwing an exception
    retries=3
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: retry.backoff.ms
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## Delay between each retry of message. Default is 100ms. Set this parameter to change the
    ## default value
    ##retry.backoff.ms=100
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: batch.size
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## The amount of memory in bytes that will be used for each batch
    ## Default from kafka is 16 mb (16384). Current value set it 256mb
    batch.size=262144
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: linger.ms
    ## PROPERTY_TYPE: long
    ## PROPERTY_DESCRIPTION
    ## The amount of time to wait for additional messages before sending the current batch
    ## Default is 0, setting it to 1 second
    linger.ms=1000
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: buffer.memory
    ## PROPERTY_TYPE: long
    ## PROPERTY_DESCRIPTION
    ## The total bytes of memory the producer can use to buffer records waiting to be sent to the ## broker (server)
    buffer.memory=33554432
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: max.block.ms
    ## PROPERTY_TYPE: long
    ## PROPERTY_DESCRIPTION
    ## Kafka producer will block for this time after the buffer.memory limit is reached
    ## uncomment this to change the default value of 60000
    #max.block.ms=60000
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: retry.backoff.ms
    ## PROPERTY_TYPE: long
    ## PROPERTY_DESCRIPTION
    ## The amount of time to wait before attempting to retry a failed request
    ## uncomment this to change the default value of 100
    #retry.backoff.ms=100
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: delivery.timeout.ms
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## An upper bound on the time to report success or failure after a call to send() returns. The ##value of this config should be greater than or equal to the sum of request.timeout.ms # # ##and linger.ms. Uncomment this to change the default
    #delivery.timeout.ms=120000
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: request.timeout.ms
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## The maximum amount of time the client will wait for the response of a request
    ## uncomment this to change the default value of 100
    #request.timeout.ms=30000
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: max.in.flight.requests.per.connection
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## The maximum number of unacknowledged requests the client will send on a single connection ## before blocking. Uncomment this to change the default
    #max.in.flight.requests.per.connection=5
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME: max.request.size
    ## PROPERTY_TYPE: int
    ## PROPERTY_DESCRIPTION
    ## The maximum size of a request in bytes. Uncomment this to change the default
    #max.request.size=1048576
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:security.protocol 
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## To enable SSL communication between kafka client and kafka broker
    #security.protocol=SSL
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.truststore.location
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Path for ssl truststore
    #ssl.truststore.location=
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.truststore.password
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## password for ssl truststore
    #ssl.truststore.password=
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.keystore.location
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Path for ssl keystore
    #ssl.keystore.location=
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.keystore.password
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Password for SSL key store
    #ssl.keystore.password=
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.key.password
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## (optional) Password for ssl keystore
    #ssl.key.password=
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.enabled.protocols
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## List down all supported TLS versions
    #ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.truststore.type
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Format of trustore file
    #ssl.truststore.type=JKS
    ## PROPERTY_END
    
    ## PROPERTY_START
    ## PROPERTY_NAME:ssl.keystore.type
    ## PROPERTY_TYPE: String
    ## PROPERTY_DESCRIPTION
    ## Format of keystore format
    #ssl.keystore.type=JKS
    ## PROPERTY_END
    
Note: You can change the settings in kafka.properties file by editing the corresponding kafka.properties.in file.
  • Run setupfiles.sh to reflect the properties.
  • Stop Sterling B2B Integrator and run one of the following commands.
    
    UNIX or Linux:
    From the install_dir/install/bin directory, run setupfiles.sh.
    
    Windows:
    From the install_dir\install\bin directory, run setupfiles.cmd.
    
Alternatively, you can enable Kafka in 2 other ways:
  • Editing customer_overrides.properties file:

    The following properties need to be overridden to enable Kafka visibility and set the Kafka broker configuration.

    events_nosql.visibility.nosql.enabled=true. Property key for eventsnosql.properties is events_nosql.

    nosql_prop.bootstrap.servers={broker host:port list}. Property key for kafka.properties is nosql_prop.

  • Using Customization UI or REST APIs:
    1. Log in to the Customization UI.
    2. Click the PropertyFile tab and click the customer_overrides link.
    3. Click the Property tab and click Create Property.
    4. Add the following Property Key and its corresponding value.
      
      events_nosql.visibility.nosql.enabled=true
      nosql_prop.bootstrap.servers={broker host:port list}
      Property Key for Kafka properties (kafka.properties) is nosql_prop.
      Proeperty Key for eventsnosql.properties is events_nosql.
      
    5. Click Save Property.

    Kafka Visibility feature is now enabled in Sterling B2B Integrator.

Other configurations

Setting tracking level

To enable Kafka, you must set enveloping.TRACKING_LEVEL=full. The default tracking_level is full. If the tracking_level is set to none or basic, the events are not pushed to Kafka.

The system displays the following error message in logs, if the tracking_level is not correctly set.

Kafka visibility is enabled and TRACKING_LEVEL = None or Basic
Kafka visibility requires TRACKING_LEVEL = Full. EDI Events will not be pushed to Kafka

If Kafka is disabled, the above message does not appear for tracking_level set to none or basic. Since the tracking_level is computed for every workflow execution, these message appear in the logs upon every execution.

Note: When Kafka is enabled, setting tracking_level to none or basic is a wrong setting.

Enabling Document Tracking

You must enable Document Tracking on EDI BPs. This enables you to correlate the event generated in the flow through the tracking ID. Custom correlations are defined in the BP without enabling Document Tracking. Then the correlation event does not have the tracking ID, if Document Tracking is not enabled.