为Kafka客户端库指定自定义 classpath

如果需要使用不同版本的 "Kafka客户端,或添加更多由 "Kafka客户端执行的库,如 OAuth 和拦截器处理程序,可以为 "Kafka客户端库指定自定义类路径。

程序

  1. Create the kafka-client-classloader.cp file in the CDC_Kafka_instance_directory/conf directory.
  2. 打开文件并提供自定义 classpath。 格式必须与您的平台相匹配。 例如,在Linux上,您需要提供一个以冒号分隔的列表:
    library1.jar:library2.jar:...:libraryX.jar

    重启 CDCKafka复制引擎实例。

    示例:
    为IBM事件流启用 "OAuth
    1. 使用 CDC 的 Java 版本或更早版本,构建必要的Kafka客户端库和 "OAuth处理程序。 有关详细信息,请参阅 Java 8 中的IBM事件流示例
    2. Modify kafkaconsumer.properties and kafkaproducer.properties in the instance conf directory of CDC to include OAuth connection options for the Kafka cluster. 确保使用Apache Kafka消费者或生产者测试了这些参数。
      示例:
      cat <CDC instance dir>/conf/kafkaconsumer.properties 
      
      bootstrap.servers=<bootstrap servers>
      sasl.mechanism=OAUTHBEARER
      sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="<api key>";
      sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
      sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
      sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
      security.protocol=SASL_SSL
      ssl.protocol=TLSv1.2
      ssl.enabled.protocols=TLSv1.2
      ssl.endpoint.identification.algorithm=HTTPS
      对 "kafkaproducer.properties也使用相同的连接属性。
    3. 在 CDC 的实例 "conf目录下,创建一个名为 "kafka-client-classloader.cp的文件。 包括所有 "OAuth处理程序和 "kafka-client libraries/dependencies处理程序。 完整路径名用分号分隔。
      示例:
      cat <CDC instance dir>/conf/kafka-client-classloader.cp
      <path>/jackson-annotations-2.14.2.jar:<path>/jackson-databind-2.14.2.jar:<path>kafka-clients-3.3.2.jar:<path>/oauth-client-0.1.2.jar:<path>/slf4j-simple-1.7.36.jar:<path>/zstd-jni-1.5.2-1.jar:<path>/jackson-core-2.14.2.jar:<path>/jose4j-0.9.3.jar:<path>/lz4-java-1.8.0.jar:<path>/slf4j-api-1.7.36.jar:<path>/snappy-java-1.1.8.4.jar
    4. 如果启用 "KCOP以 "Avro格式写入审计记录(请参阅启用 KCOP 以 Avro 格式写入审计记录),请更新 "kcop.properties文件以包括 "schema.registry.url和所有以 "schema.registry.property开头的 "serializer.property连接参数。 如果在实例 "conf目录下的 "user-classloader.cp文件中包含模式注册表和Kafka客户端库,那么也应在 "user-classloader.cp而不是 "kafka-client-classloader.cp中包含Kafka客户端库和其他库。
      示例:
      cat <path>/kcop.properties
      audit.jcfs=ENTTYP,CCID
      before.image.prefix=B_
      before.update.record.mode=ALWAYS
      schema.registry.url=<schema registry url>
      serializer.property.basic.auth.credentials.source=USER_INFO
      serializer.property.schema.registry.basic.auth.user.info=<user>:<password>
      注意:

      要传递序列化器属性,请使用 "serializer.property.propertyName。 有关详细信息,请参阅使用IBM Event StreamsVersion 10

      要添加 "OAuth库以连接到 "Avro KCOP的模式注册表,请参阅为 KCOPs 指定自定义类路径

  3. 重启 CDCKafka复制引擎实例。