Creating Kafka integrations

A Kafka integration can provide either event or log data. Log data can be used to establish a baseline of normal behavior and then identify anomalies. Event data enables the analysis and processing of different types of alerts and events. Anomalies that are identified can be correlated with alerts and events and published to your ChatOps interface to help you determine the cause and resolution of a problem.

Custom integrations can route only a single source of data at one time. Unlike the custom integration type, you can use the Kafka integration type to collect log data and event data from different systems. Then, you can route that information through a forwarding agent, such as an Apache Kafka topic like Sysdig or FluentBit.

You can also use the Kafka integration to enable training with an offline, historical data set. For more information about using Kafka topics for offline training, see Importing offline log data.

Notes:

  • The Kafka replication factor is set to one replica by default. If you are implementing a production deployment of IBM Cloud Pak for AIOps, you might lose data if your Kafka pods fail or restart. If the data collection is enabled in your Kafka integration when the Kafka pods go down, you might experience a gap in the data that your integration generated during that down period.
  • The maximum Kafka message size limit in Cloud Pak for AIOps is 1048588 bytes.
  • If you frequently send in messages that are close to the maximum size, it can adversely impact the performance.

For more information about working with Kafka integrations, see the following sections:

Creating Kafka integrations

Unlike other integration types, you can use Kafka integrations to push properly formatted historical training data to IBM Cloud Pak for AIOps. To create a Kafka integration, complete the following steps:

  1. Log in to IBM Cloud Pak for AIOps console.

  2. Expand the navigation menu (four horizontal bars), then click Define > Integrations.

  3. On the Integrations page, click Add integration.

  4. From the list of available integrations, find and click the Kafka tile.

    Note: If you do not immediately see the integration that you want to create, you can filter the tiles by type of integration. Click the type of integration that you want in the Category section.

  5. On the side-panel, review the instructions and when ready to continue, click Get started.

  6. On the Add integration page, define the general integration details:

    • Name: The display name of your Kafka integration.

    • Description: An optional description for the Kafka integration.

    • Kafka partitions: The number of Kafka partitions. The default value of 1 might be suitable for importing a few log records. However, if you intend to import many records, increase this number. The range is 1 to 500. For example, for a proof of content (PoC) deployment where you need to import a large data set, you can use 48 Kafka partitions.

    • JSON processing option: Select a JSON processing option.

      • None: The default option. The JSON is not processed or modified.

      • Flatten: This option flattens the JSON object by removing the opening and closing braces.

      • Filter: This option extracts the JSON object and replaces it with an empty string.

    Kafka integration
    Figure. Create Kafka integration

    For more information about the options, see Managing embedded JSON.

  7. Click Next.

Entering Field mapping information

Unlike other integration types, you can specify what kind of information you want to run through your Kafka integration. For example, if you want to collect data from multiple custom sources, you can set them up as separate Kafka integrations.

  1. In the Field mapping section, set the Data source to specify the type of data incoming from the Kafka integration. You can select Events or Logs.

  2. Specify the Mapping type, which creates the field mapping for your specified log type.

    • Mapping types for Events include None, PagerDuty, and NOI.

    • Mapping types for Logs are None, ELK, Falcon LogScale, Mezmo, Custom, and Splunk.

    • If you have log data that is already normalized, choose None.

    • If your log data is in one of the supported log formats, such as ELK or Falcon LogScale, choose the corresponding log format.

    • For field mapping for Custom types, see Creating custom integrations.

      Kafka integration
      Figure. Field mapping type

  3. The Topic name for a Kafka integration is predefined. Note this topic and send your data to it.

  4. For log integrations, complete the following settings:

    1. Enter the Maximum number of logs per second rounded to the nearest thousand. The maximum number of logs per second is collected in increments of 1,000 up to 25,000 and is rounded up to the nearest thousand sent from Kafka to IBM Cloud Pak for AIOps.

    2. Set the Field mapping mode to Live data for initial AI training for initial training or to Live data for continuous AI training and anomaly detection.

      Kafka integration
      Figure. Field mapping

    3. In the Mapping section, if you use a Mapping type other than None, verify whether the JSON document that is automatically assigned to this field is valid. The validity depends on the structure of your log records. For example, for a PoC deployment, an example default Field mapping for Kafka can resemble the following mapping:

        {
          "codec": "elk",
          "message_field": "@message",
          "log_entity_types": "@hostname, @bundleName, @context.Environment",
          "instance_id_field": "@properties.processtype",
          "rolling_time": 10,
          "timestamp_field": "@timestamp"
        }
      

      If you chose None for the Mapping type, each log record in your log files conforms to the following format. Set both of the application_id and application_group_id values to "1000", and make sure that a 13-digit timestamp value is assigned to the timestamp field:

      {
          "timestamp": 1581549409000,
          "utc_timestamp": "2020-02-12 23:16:49.000",
          "instance_id": "calico-node-jn2d2",
          "application_group_id": "1000",
          "application_id": "1000",
          "features": [],
          "meta_features": [],
          "level": 1,
          "message": "[64] ipsets.go 254: Resyncing ipsets with dataplane. family=\"inet\"",
          "entities": {
              "pod": "calico-node-jn2d2",
              "cluster": null,
              "container": "calico-node",
              "node": "kube-bmgcm5td0stjujtqv8m0-ai4itsimula-wp16cpu-00002c34"
          },
          "type": "StandardLog"
      }
      

      Important: The Events data that is collected must follow the Kafka integration normalized Event schema. For more information, see Normalized mapping rules.

  5. Click Next.

Note: When importing events the JSON structure for an event must be on a single line in the file. For more information about the structure for importing event data, see Normalized Mapping Rules.

Entering AI training and log data information

  1. If you want to enable data collection for AI training and log data, switch the Data collection toggle to On.

    Kafka integration
    Figure. AI training

  2. Click Next.

  3. The UI displays the Resource requirements page.

    Kafka integration
    Figure. Resource requirements

    Note: If the Data Source is set to Events, you will see a message stating that you need to enable logs data collection to see the resource management overview. You can skip this page when events are enabled.

    On the Resource requirements page, you can review the slot usage for your log integrations to see if there are enough slots to fully support the integration for multizone high availability.

    If you set the Data collection toggle to On, you will see the resource management overview.

    • If your current usage and other usage are less than the provisioned slots, but the HA slots exceed the provisioned slots, you will be able to create the integration, but will see a warning that you do not have enough slots. The integration will not have multizone high availability.

    • If your projected usage exceeds the provisioned slots, you will not be able to create the integration because you do not have enough slots on your system for log data integrations.

    • If your total slots, including HA slots, are within the provisioned slots, the integration will have multizone high availability.

      Note: HA operation assumes high availability for three zones.

    If you set the Data collection toggle to Off, you will see a message stating that you need to enable logs data collection to see the resource management overview. When data collection is off, no slots are used by that integration.

  4. Click Done to save your integration.

You created a Kafka integration in your instance, and you can now use the Kafka integration as the basis for your offline AI model training. After you create your integration, enable the data collection to connect your integration with the AI of IBM Cloud Pak for AIOps.

Importing offline log data

Before you proceed with the following steps, make sure that the data flow is enabled for the Kafka Ops integration that you previously defined. You can import offline log data into IBM Cloud Pak for AIOps in two ways. Both mechanisms publish the offline log data to the Kafka topic that is associated with the Kafka Ops integration.

Using a Kafka utility to publish log data

  1. Install a Kafka utility, such as the kcat utility.

  2. Log in to the Red Hat® OpenShift® cluster where IBM Cloud Pak for AIOps is installed.

  3. Run the following commands from the namespace where IBM Cloud Pak for AIOps is installed:

    oc extract secret/iaf-system-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
    
    export sasl_password=$(oc get secret cp4waiops-cartridge-kafka-auth-0 --template={{.data.password}} | base64 --decode);
    
    export BROKER=$(oc get routes iaf-system-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'):443
    
  4. Publish the log data to the Kafka topic. This example uses kcat. If you also use kcat, update the variables and run the following commands:

    export KAFKA_TOPIC=<kafka topic>
    
    export LOG_FILE=<log file>
    
    kcat -X security.protocol=SASL_SSL -X ssl.ca.location=ca.crt -X sasl.mechanisms=SCRAM-SHA-512 -X sasl.username=cp4waiops-cartridge-kafka-auth-0 -X sasl.password=$sasl_password -X enable.ssl.certificate.verification=false -b $BROKER -P -t $KAFKA_TOPIC -l $LOG_FILE
    

Copying log data into the Kafka cluster

Use this option only if you can't use a Kafka utility such as kcat for your environment. Complete the following steps to publish the log data to the corresponding Kafka topic by copying the log data into the Kafka cluster:

  1. Run the following commands from the namespace where IBM Cloud Pak for AIOps is installed to obtain the Kafka password and Kafka broker. Record both values, which you need in a following step.

    export kafka_password=$(oc get secret cp4waiops-cartridge-kafka-auth-0 --template={{.data.password}} | base64 --decode)
    
    export kafka_broker=$(oc get routes iaf-system-kafka-bootstrap -o=jsonpath='{.status.ingress[0].host}{"\n"}'):443
    
    echo $kafka_password
    
    echo $kafka_broker
    
  2. Copy the log file to the Kafka cluster:

    oc cp <log file> iaf-system-kafka-0:/tmp
    
  3. Check that the file is copied to the Kafka cluster from the namespace where IBM Cloud Pak for AIOps is installed:

    oc exec -it iaf-system-kafka-0 -- bash
    
    ls /tmp/<log file>
    
    exit
    
  4. Get the ca.cert certificate file:

    oc extract secret/iaf-system-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
    
  5. Convert ca.crt to Java keystore format (JKS):

    keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt
    
  6. Copy the truststore file to the Kafka cluster:

    oc cp truststore.jks iaf-system-kafka-0:/tmp
    
  7. Give bash access to the Kafka cluster again:

    oc exec -it iaf-system-kafka-0 -- bash
    
  8. Create a file within the Kafka pod that is called producer.properties. The contents of the producer.properties file can resemble the following properties. Update the variable placeholders before you use this content. The ssl.truststore.location value points to the JSK file that you created earlier.

    security.protocol=SASL_SSL
    ssl.truststore.location=/tmp/truststore.jks
    ssl.truststore.password=password
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.Kafka.common.security.scram.ScramLoginModule required \
    username="cp4waiops-cartridge-kafka-auth-0" \
    password="<kafka password>"
    
  9. From the pod, run the Kafka producer script to publish the log data to the corresponding Kafka topic. Update the variable placeholders:

    /opt/Kafka/bin/Kafka-console-producer.sh --broker-list <kafka broker> --producer.config producer.properties --topic <kafka topic> < <log file>
    

Enabling and disabling Kafka integrations

If you didn't enable your data collection during creation, you can enable your integration after. You can also disable a previously enabled integration the same way. If you selected Live data for initial AI training when you created your integration, disable the integration before AI model training. To enable or disable a created integration, complete the following steps:

  1. Log in to IBM Cloud Pak for AIOps console.

  2. Expand the navigation menu (four horizontal bars), then click Define > Integrations.

  3. On the Manage integrations tab of the Integrations page, click the Kafka integration type.

  4. Click the integration that you want to enable or disable.

  5. Go to the AI training and log data section. Set Data collection to On or Off to enable or disable data collection. Disabling data collection for an integration does not delete the integration.

You enabled or disabled your integration. For more information about deleting a integration, see Deleting Kafka integrations.

Editing Kafka integrations

After you create your integration, you can edit the integration. For example, if you want to refine the field mapping for a Kafka integration from a Falcon LogScale source, you can edit it. To edit a integration, complete the following steps:

  1. Log in to IBM Cloud Pak for AIOps console.

  2. Expand the navigation menu (four horizontal bars), then click Define > Integrations.

  3. Click the Kafka integration type on the Manage integrations tab of the Integrations page.

  4. On the Kafka integrations page, click the name of the integration that you want to edit. Alternatively, you can click the options menu (three vertical dots) for the integration and click Edit. The integration configuration opens.

  5. Edit your integration as required. Click Save when you are done editing.

Your integration is now edited. If your application was not previously enabled or disabled, you can enable or disable the integration directly from the interface. For more information about enabling and disabling your integration, see Enabling and disabling Kafka integrations. For more information about deleting a integration, see Deleting Kafka integrations.

Deleting Kafka integrations

If you no longer need your Kafka integration and want to not only disable it, but delete it entirely, you can delete the integration from the console.

Note: You must disable data collection before you delete your integration. For more information about disabling data collection, see Enabling and disabling Kafka integrations.

To delete a integration, complete the following steps:

  1. Log in to IBM Cloud Pak for AIOps console.

  2. Expand the navigation menu (four horizontal bars), then click Define > Integrations.

  3. Click the Kafka integration type on the Manage integrations tab of the Integrations page.

  4. On the Kafka integrations page, click the options menu (three vertical dots) for the integration that you want to delete and click Delete.

  5. Enter the name of the integration to confirm that you want to delete your integration. Then, click Delete.

Your integration is deleted.