交易信息传递 Kafka
Kafka 支持以事务方式共同运行的消费者和生产者,这种功能允许使用 Kafka 节点的消息流提供完全相同的消息传递语义。
交易性和 Kafka 生产者
当事务性 Kafka 生产者将信息发布到 Kafka 时,并不能立即提供给所有消费者。 除非发布信息的交易已提交,否则所有消费者都无法使用该信息。 如果事务没有提交,而是回滚了,则消息会被丢弃。 消息流中的多个 Kafka 生产者节点可通过提供匹配的连接和事务属性,在同一个 Kafka 事务中发布消息,可能发布到不同的主题。 然后可以提交事务,使所有报文都可用;或者回滚事务,即所有报文都不可用。 在 IBM® App Connect Enterprise 环境中,当消息流调用成功结束时,事务会自动提交;如果消息流以异常结束,则事务会回滚。 要使 Kafka 生产节点以事务方式运行,您必须在 KafkaProducer 节点的 “事务性 ”选项卡上设置以下属性。
- 事务方式
将此值设为 "是", 可在 Kafka 事务中以事务方式发布信息。 将此值设为 " 否 ",报文将以非交易方式发布。 将此值设为 " 自动" ,可根据正在发布的邮件的 "属性 "文件夹中的 " 事务 "属性值在事务性和非事务性之间动态切换。 有关属性文件夹的更多信息,请参阅信息树。
- 交易 ID
如果要以事务方式发布信息,则必须使用此值。 它唯一标识了发布报文的事务。 如果在同一流程中使用多个 KafkaProducer 节点,并希望在同一事务中发布所有消息,则所有 KafkaProducer 节点必须使用相同的 Transactional Id 属性值。 如果 KafkaProducer 节点使用不同的关键配置属性值,如 bootstrap servers 或 client Id ,则第二个 KafkaProducer 节点就会失败,并抛出异常,说明不兼容的细节。 您不能在不同的消息流中为 Transactional Id 使用相同的值,因为事务的作用域在消息流实例中。 如果您的消息流使用了其他实例,那么您指定的 Transactional Id 会以线程标识符为后缀,以区分同一消息流的两个线程。 如果在同一流程中有两个或多个 KafkaProducerTransactional Id 节点,则会为每个 Transactional Id 创建一个新事务。 每个事务在流程结束时都会独立提交或回滚。
- 事务超时
Kafka 如果从事务开始到最终提交或回滚之间的时间超过此值,就会自动将事务标记为回滚。
交易性和 Kafka 消费者
- 提交信息偏移量
当 KafkaConsumer 节点收到信息时,通常会将其位置保存到 Kafka ,因此如果节点停止并重新启动,它可以在停止的位置继续处理信息。 节点上的 Commmit message offset in Kafka 属性 KafkaConsumer 节点上的属性可控制保存消费者位置的方式和时间。
- 无
选择 "无 "时 KafkaConsumer 节点不会将其位置保存回 Kafka。 每次启动信息流时,都会根据节点的 " 基本 "选项卡上 " 默认信息偏移 "属性的配置重新确定消费者位置。
- 关于传播
当选择 "On propagate(传播时 )"时,节点会将消费者位置保存回。 KafkaConsumer 节点会将消费者位置保存回 Kafka ,然后才会将收到的信息传播到信息流中的下一个信息流节点。 如果流程调用成功完成,或以错误结束,或服务器重新启动,收到的报文不会被流程重新处理。 该选项提供 "最多一次 "级别的信息可靠性。
- 关于同步传播
该选项与 "On propagate(传播 )"类似,但在将接收到的信息传播到信息流中的下一个节点之前,无需等待消费者位置保存完成。 这可以显著提高信息吞吐量,尤其是在网络延迟成为问题的情况下,因为消费者位置的保存是与信息流对信息的处理同时进行的。 该选项不再保证 "最多一次 "级别的信息可靠性,但仍能提供较高的故障恢复能力。
- 交易方面
该选项可将消费者位置保存为交易的一部分。 启用该选项后,在信息传播到信息流之前,消费者位置会保存到事务中的 Kafka。 然后,在流程结束时提交事务,或在发生故障时回滚事务。 如果交易回滚,同样的信息会重新发送给消费者。
- 无
- ISOLATION LEVELKafkaConsumer 节点有一个名为 Isolation level 的属性,它可以控制向消费者发送哪些信息。
- 读取未提交
此值是 IBM App Connect Enterprise KafkaConsumer 节点上的默认值。 使用此值的消费者会在信息发布时收到信息,而无需等待发布者事务提交。 这意味着,如果事务被回滚, KafkaConsumer 节点可能会收到一条随后被丢弃的消息。
- 已读取
KafkaConsumer 配置此值的节点不会接收尚未提交的报文。 如果到达主题分区中未提交的报文,它们就会阻塞,直到报文提交或回滚。
Isolation Level 属性也可用于 KafkaRead 节点上,操作方式相同。
- 读取未提交
事务性信息传递注意事项
- 处理中毒信息
Kafka 没有处理中毒报文的内置支持,因此报文会不断地重新传送给消费者,直到成功处理为止。 如果一个 KafkaConsumer 节点从一个以上的分区接收报文,那么在回滚之后,就不能保证下一个传送到流中的报文就是被回滚的报文。 下一条信息可能来自不同的分区。
您可以通过配置重试选项卡上的值,在尝试重新处理报文前引入延迟,从而控制回滚报文的重新处理速度。 配置适当的重试值可以防止因临时问题无法处理报文而消耗过多的 CPU。
- 请求 → 回复信息传递模式
对于请求→回复等信息传递模式,信息流通常以一个 KafkaConsumer 节点开始,以 KafkaProducer 节点完成。 在这种情况下,最好是消费和产生后续信息都发生,或者两者都不发生。 在 Transactional Id 中使用相同的值 KafkaConsumer 节点和 KafkaProducer 节点上使用相同的值,可确保两者都出现或两者都不出现。 对于 KafkaConsumer 节点和 KafkaProducer 节点,如果它们对``属性使用 Transactional Id 相同的值,系统会对其进行验证;若发现这些值不兼容, IBM App Connect Enterprise 将抛出异常。
- 在同一信息流中混合 Kafka 和非 Kafka 事务
IBM App Connect Enterprise Kafka 节点不支持两阶段提交。 如果消息流同时使用了事务性 Kafka 节点和其他事务性节点(例如 IBM MQ 或数据库交互),则会创建多个事务。 每个事务都会在消息流结束时独立提交或回滚。