Set up IBM Event Streams
Learn how to set up the solution including IBM Event Streams, Kafka Cluster, Kafka Connect, and Connectors.
There are different options available to create a Kafka environment with IBM Event Streams:
-
Use the Operator details view of the IBM Event Streams operator create the required resources
-
Create a new
Integrationinstance of typeKafka clustervia the IBM Cloud Pak for Integration UI / Platform UI
After creating the Kafka cluster/EventStreams resource you can
either:
-
Manage resources in the IBM Event Streams Operator overview
-
Use the IBM Event Stream UI
In the following procedures both user interfaces are used to create and manage the Kafka cluster.
Set up Kafka Connect
-
Create a Kafka cluster instance (also known as an Event Stream resource).
-
Go to the Cloud Pak for Integration UI.

Click Integration instances:

Click Create an instance, select Event-Streams, click Next, select Development, click Next, accept the license, click Create.
Note: Do not select the option:minimal without security, as this will lead to connection issues when following this guide. -
ALTERNATIVE workflow:
-
As Administrator, go to Operators > Installed Operators and find
IBM Event Streams. Some blue links are displayed in the Provided APIs column. Click the Event Stream link. -
Click Create EventStreams.
-
Switch from Form view to YAML view and select the Samples tab in the right panel.
-
Click Development Sample > Try it.Note: The options presented under samples are similar to the options you get when you create the EventStreams resource via the Cloud Pak for Integration UI.
-
Press Create.
-
-
-
Create a KafkaConnect environment.
The KafkaConnect needs to be able to connect to MongoDB, so it must include the libraries for it to do so. To make this work a custom docker image is built that includes the required
.jarfiles for theMongoDbSourceConnectorandMongoDbSinkConnector.The steps in this procedure require that you already have an internal image registry setup within OpenShift and that you can push/pull to it from the Bastion node.
-
The Dockerfile for the
Kafka Connect environmentcan be downloaded from the IBM Event Streams UI Toolbox. To get there, open the IBM Event Streams UI:
Go to Toolbox:

Select
Set up a Kafka Connect environmentand proceed with theSet up:
Download the Kafka Connect ZIP file, move it to the bastion and unzip it to a folder called kafkaconnect:

unzip kafkaconnect.zip -d kafkaconnect -
Download the
mongo-kafka-connect-1.10.1-all.jarfile from org/mongodb/kafka/mongo-kafka-connect/1.10.1 to the bastion.curl https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.10.1/mongo-kafka-connect-1.10.1-all.jar --output mongo-kafka-connect-1.10.1-all.jar -
Get the
mongo-kafka-connect-1.10.1-all.jarfile from org/mongodb/kafka/mongo-kafka-connect/1.10.1Note: When a newer version is available you might want to try it.Note: Check out the instructions about which jar file to download in Connector catalog. When we tried the jar file from here, KafkaConnect did not find the required library methods. -
Copy the
mongo-kafka-connect-1.10.1-all.jarfile to themy-pluginsfolder of the unpacked zip archive:cp mongo-kafka-connect-1.10.1-all.jar kafkaconnect/my-plugins/ -
As the Dockerfile pulls an image from
cp.icr.ioyou need to make sure that you have access to this public container registry. You should be able to log in into the container registry with the entitlement key used during the Cloud Pak for Integration setup (see IBM Cloud Pak for Data instructions). Make sure that you are able to log in to the internal image registry of your OpenShift deployment.-
Build and tag the image:
# authenticate podman login cp.icr.io -u cp -p "${ENTITLENMENTKEY}" # cd into the directory with the Dockerfile podman build . -t my-connect-cluster-image-1 -
Push the new image to the internal image registry. When the
oc get routecommand is not working check out: How to expose the registry:# Get the route to the internal OpenShift image registry HOST=$(oc get route default-route -n openshift-image-registry --template='{{ .spec.host }}') # Login to the internal docker image registry podman login -u admin -p $(oc whoami -t) ${HOST} # Push the image NAMESPACE="openshift" IMAGE_NAME="my-connect-cluster-image-1:latest" LOCAL_IMAGE_NAME="localhost/${IMAGE_NAME}" REMOTE_IMAGE_NAME="${IMAGE_NAME}" podman push ${LOCAL_IMAGE_NAME} ${HOST}/${NAMESPACE}/${REMOTE_IMAGE_NAME}
-
-
Generate credentials so that the KafkaConnect environment can connect to the Kafka cluster. In the
IBM Event StreamUI: Click the tileConnect to this cluster.
-
Then click
Generate SCRAM credentials.
Select the most liberal options. The credential name used in the following is:
my-credentials.
Select All topicsand click next for the access.
Select All consumer groupand click next.
Select
All transactional IDsand click next.
Note: This automatically creates a Kafka user. -
The connection to the Kafka cluster is secured by TLS, so you need to trust the CA certificate used by the
Kafka cluster/Event Stream cluster.-
To find the certificate, go to Installed Operators, find IBM Event Streams, then click Event Streams. Click the Event Stream resource and go to the YAML view. The certificate is under kafkaListeners. Copy the certificate to a local plain-text file.

-
Create a new secret by clicking Workloads, Secrets. Create a
Key/value secretthen enter the following information:-
Name:
tls-cert-of-development-external -
Key:
tls.crt -
Value: DRAG AND DROP the plain-text file containing the certificate here.
-
-
Click Create.
-
-
In the folder with the Dockerfile you also find a
kafka-connect.yamlfile. Make a backup of the file and edit it. Change it according to your environment. Compare your edits with the following file to match the details:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnect metadata: name: my-kafka-connect-external-bootstrap annotations: eventstreams.ibm.com/use-connector-resources: "true" spec: replicas: 1 bootstrapServers: development-kafka-bootstrap-cloudpack4integration.apps.ocp0.sa.boe:443 image: default-route-openshift-image-registry.apps.ocp0.sa.boe/openshift/my-connect-cluster-image-1:latest template: pod: imagePullSecrets: [] metadata: annotations: eventstreams.production.type: CloudPakForIntegrationNonProduction productID: 2a79e49111f44ec3acd89608e56138f5 productName: IBM Event Streams for Non Production productVersion: 11.2.1 productMetric: VIRTUAL_PROCESSOR_CORE productChargedContainers: my-connect-cluster-connect cloudpakId: c8b82d189e7545f0892db9ef2731b90d cloudpakName: IBM Cloud Pak for Integration productCloudpakRatio: "2:1" config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 tls: trustedCertificates: - secretName: tls-cert-of-development-external certificate: tls.crt authentication: type: scram-sha-512 username: my-credentials passwordSecret: secretName: my-credentials password: password -
Apply the edited yaml file with:
oc project cloudpack4integration oc apply -f kafka-connect.yamlNote: If the image couldn't be pulled because of permissions, refer to the official OpenShift documentation for information about how to allow the image pull.
-
-
To create the Source Connector that listens for changes, apply the
source-connector.yamlwithoc apply -f source-connector.yaml:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: eventstreams.ibm.com/cluster: my-kafka-connect-external-bootstrap spec: class: com.mongodb.kafka.connect.MongoSourceConnector tasksMax: 3 config: connection.uri: mongodb://myUserAdmin:password@ibm-mongodb-enterprise-helm-service.mongodb-test-1.svc.cluster.local:27017 database: "warehouse" collection: "inventory" topic.prefix: "mongo" copy.existing: true key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false publish.full.document.only: true pipeline: > [{"$match":{"operationType":{"$in":["insert","update","replace"]}}},{"$project":{"_id":1,"fullDocument":1,"ns":1,"documentKey":1}}] -
To create the Sink Connector that writes the changes to an external MongoDB instance, apply the
sink-connector.yamlwithoc apply -f sink-connector.yaml:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: eventstreams.ibm.com/cluster: my-kafka-connect-external-bootstrap spec: class: com.mongodb.kafka.connect.MongoSinkConnector tasksMax: 3 config: connection.uri: 'mongodb://mongodb.example.com:27017' database: "shop" collection: "inventory" # comma separated list topics: "mongo.warehouse.inventory" post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: falseNote: If you don't have an external MongoDB instance you can also use the same MongoDB that was used in the SourceConnector to test if the connectors work properly. Change the connection.uri according to your need. -
Verify that the connectors are working properly.
-
Connect to the MongoDB database on the OpenShift side and run these commands to test if the connectors work:
mongo -u myUserAdmin -p password --authenticationDatabase admin use warehouse db.inventory.insertOne( { name:"game-console-1"} ) db.inventory.insertOne( { name:"game-console-2"} )After the insert command you should get an acknowledgement:

The topics are created in the IBM Event Streams UI under Topics.

You should be able to see the messages from above when clicking on the topic:

-
On the remote MongoDB side, the entries are created:
mongo -u myUserAdmin -p password --authenticationDatabase admin use shop show collections db.inventory.find()The output should look similar to the following:

-