Configuring an Apache Kafka SASL Consumer Connection

About this task

When you configure Adapter for Apache Kafka , you specify information that Integration Server uses to connect to an Apache Kafka system. You can configure Adapter for Apache Kafka connections manually using the Integration Server Administrator screen.

To configure SASL Consumer connection

Procedure

  1. In the Adapters menu of Integration Server Administrator's navigation area, click IBM webMethods Adapter for Apache Kafka.
  2. In the Connections screen, click Configure New Connection.
  3. In the Connection Types screen, click Apache Kafka® SASL Consumer Connection to display the Configure Connection Type screen.
  4. In the IBM webMethods Adapter for Apache Kafka section, use the following fields:
    Field Description/Action
    Package Package in which to create the connection. You must create the package using Designer before you can specify the package using this parameter. For general information about creating packages, see the IBM webMethods Service Development Help for your release.
    Note: Configure the connection in a user-defined package rather than in the adapter's package. For other important considerations when creating packages for Adapter for Apache Kafka, see Overview of Package Management
    Folder Name Folder in which to create the connection.
    Connection Name Name you want to give to the connection. Connection names cannot have spaces or use special characters reserved by Integration Server and Designer. For more information about the use of special characters in package, folder, and element names, see the IBM webMethods Service Development Help for your release.
  5. In the Connection Properties section, use the following fields:
    Field Description/Action
    Bootstrap Servers The list of servers with the combination of host and port used for establishing the initial connection to the Apache Kafka cluster. This list should be in the form, host1:port1,host2:port2 and so on.
    groupId Defines a unique string that identifies the consumer group that this consumer connection belongs to.
    Client ID Defines the ID string to pass to the Kafka server when a request is made.
    Key Deserializer Class Defines the deserializer class for key that implements org.apache.kafka.common.serialization.Deserializer interface.
    Value Deserializer Class Defines the deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface. The default value is, com.wm.adapter.wmkafka.idata.IDataDeserializer
    Deserializer Error Handler Specify the custom flow or Java service to receive the consumer records with exceptions which occur when deserializing. The Deserializer Error Handler field value is used when the watt.adapter.kafka.use.error.handling.deserializer configuration parameter is enabled by setting the value to true. For more information, see watt.adapter.kafka.use.error.handling.deserializer and wm.adapter.wmkafka.serde.specifications:deserializerErrorHandler.
    Enable Auto Commit Select one of the following values for this field to enable the auto commit.
    • If the value is set to true, the consumer’s offset is periodically committed.
    • If the value is set to false, the adapter commits the consumer’s offsets.
    Auto Commit Interval(ms) Specifies the frequency in milliseconds that the consumer offsets are auto-committed to Kafka, if enable.auto.commit is set to true.
    Security Protocol Defines the protocol which is used to communicate with brokers. The valid values are:
    • SASL_PLAINTEXT
    • SASL_SSL
    • SSL
    Kerberos Service Name Defines the Kerberos principal name that Apache Kafka runs as. The name can be defined either in Apache Kafka's JAAS Config or in Apache Kafka's configuration.
    JAAS Config Specify the JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
    Retype JAAS Config Retype the JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
    Truststore Alias Specify the alias name created for truststore using the path, IS/security/keystore/Create Truststore Alias.
    Keystore Alias Specify the alias name created for keystore using the path, IS/security/keystore/Create Keystore Alias.
    Session TimeOut(ms) Specifies the timeout used to detect consumer failures while using the Kafka's group management facility.
    Auto Offset Reset Select one of the following values if no initial offset value is set in Apache Kafka:
    • earliest - Automatically reset the offset to the earliest offset
    • latest - Automatically reset the offset to the latest offset
    • none - Throws exceptions to the consumer if no previous offset is found for the consumer's group
    Kafka Version Apache Kafka version that the consumer connects. The following are the supported values:
    • v9. Apache Kafka 0.9.X.XX.
    • v10. Apache Kafka 0.10.X.XX.
    • v11+. Apache Kafka 0.11.X.XX and later.
    Other Properties Specifies the Kafka related properties for additional configuration. Use the following format: propertyName1=value;propertName2=value
    • Be sure to add the JAAS entry with name KafkaClient for Kerberos authentication, if JAAS configuration is not defined at the connection level and the Security Protocol is SASL_PLAINTEXT/SASL_SSL. An example to add the JAAS entry in the Integration Server_directory /instances/instance_name/config/is_jaas.cnf file is as follows:
      KafkaClient { 
      com.sun.security.auth.module.Krb5LoginModule required 
      useKeyTab=true 
      useTicketCache=true 
      renewTicket=true 
      keyTab="/directory1/directory2/Keytab_file.keytab" 
      serviceName="kafka" 
      principal="xxxxxxxxxxxxxxxxxxxxxx"; 
      }; 
    • If JAAS Config is configured both at connection level and at is_jaas.cnf file, then the connection level JAAS Config will be considered. For example, keyTab="/directory1/directory2/Keytab_file.keytab".
  6. In the Connection Management Properties section, use the following fields:
    Field Description/Action
    Enable Connection Pooling Enables the connection to use connection pooling. For more information about connection pooling, see Adapter Connections .
    Note: If you plan to enable connection pooling in a clustered environment, consider the connection pool size.
    Minimum Pool Size If connection pooling is enabled, this field specifies the number of connections to create when the connection is enabled. The adapter will keep open the number of connections you configure here regardless of whether these connections become idle.
    Maximum Pool Size If connection pooling is enabled, this field specifies the maximum number of connections that can exist at one time in the connection pool.
    Pool Increment Size If connection pooling is enabled, this field specifies the number of connections by which the pool will be incremented if connections are needed, up to the maximum pool size.
    Block Timeout If connection pooling is enabled, this field specifies the number of milliseconds that Integration Server will wait to obtain a connection with the database before it times out and returns an error. For example, you have a pool with Maximum Pool Size of 20. If you receive 30 simultaneous requests for a connection, 10 requests will be waiting for a connection from the pool. If you set the Block Timeout to 5000, the 10 requests will wait for a connection for 5 seconds before they time out and return an error. If the services using the connections require 10 seconds to complete and return connections to the pool, the pending requests will fail and return an error message stating that no connections are available. If you set the Block Timeout value too high, you may encounter problems during error conditions. If a request contains errors that delay the response, other requests will not be sent. This setting should be tuned in conjunction with the Maximum Pool Size to accommodate such bursts in processing.
    Expire Timeout If connection pooling is enabled, this field specifies the number of milliseconds that an inactive connection can remain in the pool before it is closed and removed from the pool. The connection pool will remove inactive connections until the number of connections in the pool is equal to the Minimum Pool Size. The inactivity timer for a connection is reset when the connection is used by the adapter.

    If you set the Expire Timeout value too high, you may have a number of unused inactive connections in the pool. This consumes local memory and a connection on your backend resource. This could have an adverse effect if your resource has a limited number of connections.

    If you set the Expire Timeout value too low, performance could degrade because of the increased activity of creating and closing connections. This setting should be tuned in conjunction with the Minimum Pool Size to avoid excessive opening/closing of connections during normal processing.
    Startup Retry Count Number of times that the system should attempt to initialize the connection pool at startup if the initial attempt fails. The default value is 0.
    Startup Backoff Timeout Number of seconds that the system should wait between attempts to initialize the connection pool.
  7. Click Save Connection.
    The created connection will appear in the adapter's Connections screen and in Designer. You can enable a connection only if the parameters for the connection are valid.
    Note: This connection type will be available from Fix 2 and later.