为Kafka客户端库指定自定义 classpath
如果需要使用不同版本的 "Kafka客户端,或添加更多由 "Kafka客户端执行的库,如 OAuth 和拦截器处理程序,可以为 "Kafka客户端库指定自定义类路径。
程序
- Create the
kafka-client-classloader.cp
file in theCDC_Kafka_instance_directory/conf
directory. - 打开文件并提供自定义 classpath。 格式必须与您的平台相匹配。 例如,在Linux上,您需要提供一个以冒号分隔的列表:
library1.jar:library2.jar:...:libraryX.jar
重启 CDCKafka复制引擎实例。
示例:- 为IBM事件流启用 "
OAuth
库 - 使用 CDC 的 Java 版本或更早版本,构建必要的Kafka客户端库和 "
OAuth
处理程序。 有关详细信息,请参阅 Java 8 中的IBM事件流示例。 - Modify
kafkaconsumer.properties
andkafkaproducer.properties
in the instanceconf
directory of CDC to includeOAuth
connection options for the Kafka cluster. 确保使用Apache Kafka消费者或生产者测试了这些参数。示例:对 "cat <CDC instance dir>/conf/kafkaconsumer.properties bootstrap.servers=<bootstrap servers> sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="<api key>"; sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys security.protocol=SASL_SSL ssl.protocol=TLSv1.2 ssl.enabled.protocols=TLSv1.2 ssl.endpoint.identification.algorithm=HTTPS
kafkaproducer.properties
也使用相同的连接属性。 - 在 CDC 的实例 "
conf
目录下,创建一个名为 "kafka-client-classloader.cp
的文件。 包括所有 "OAuth
处理程序和 "kafka-client libraries/dependencies
处理程序。 完整路径名用分号分隔。示例:cat <CDC instance dir>/conf/kafka-client-classloader.cp <path>/jackson-annotations-2.14.2.jar:<path>/jackson-databind-2.14.2.jar:<path>kafka-clients-3.3.2.jar:<path>/oauth-client-0.1.2.jar:<path>/slf4j-simple-1.7.36.jar:<path>/zstd-jni-1.5.2-1.jar:<path>/jackson-core-2.14.2.jar:<path>/jose4j-0.9.3.jar:<path>/lz4-java-1.8.0.jar:<path>/slf4j-api-1.7.36.jar:<path>/snappy-java-1.1.8.4.jar
- 如果启用 "
KCOP
以 "Avro
格式写入审计记录(请参阅启用 KCOP 以 Avro 格式写入审计记录),请更新 "kcop.properties
文件以包括 "schema.registry.url
和所有以 "schema.registry.property
开头的 "serializer.property
连接参数。 如果在实例 "conf
目录下的 "user-classloader.cp
文件中包含模式注册表和Kafka客户端库,那么也应在 "user-classloader.cp
而不是 "kafka-client-classloader.cp
中包含Kafka客户端库和其他库。示例:cat <path>/kcop.properties audit.jcfs=ENTTYP,CCID before.image.prefix=B_ before.update.record.mode=ALWAYS schema.registry.url=<schema registry url> serializer.property.basic.auth.credentials.source=USER_INFO serializer.property.schema.registry.basic.auth.user.info=<user>:<password>
注意:要传递序列化器属性,请使用 "
serializer.property.propertyName
。 有关详细信息,请参阅使用IBM Event StreamsVersion 10。要添加 "
OAuth
库以连接到 "Avro KCOP
的模式注册表,请参阅为 KCOPs 指定自定义类路径。
- 使用 CDC 的 Java 版本或更早版本,构建必要的Kafka客户端库和 "
- 为IBM事件流启用 "
- 重启 CDCKafka复制引擎实例。