交易信息传递 Kafka

Kafka 支持以事务方式共同运行的消费者和生产者,这种功能允许使用 Kafka 节点的消息流提供完全相同的消息传递语义。

交易性和 Kafka 生产者

当信息以非事务方式发布到 Kafka 时,所有消费者都能立即看到,并且发布后无法撤销。
当事务性 Kafka 生产者将信息发布到 Kafka 时,并不能立即提供给所有消费者。 除非发布信息的交易已提交,否则所有消费者都无法使用该信息。 如果事务没有提交,而是回滚了,则消息会被丢弃。 消息流中的多个 Kafka 生产者节点可通过提供匹配的连接和事务属性,在同一个 Kafka 事务中发布消息,可能发布到不同的主题。 然后可以提交事务,使所有报文都可用;或者回滚事务,即所有报文都不可用。 在 IBM® App Connect Enterprise 环境中,当消息流调用成功结束时,事务会自动提交;如果消息流以异常结束,则事务会回滚。 要使 Kafka 生产节点以事务方式运行,您必须在 KafkaProducer 节点的 “事务性 ”选项卡上设置以下属性。
  • 事务方式

    将此值设为 "是", 可在 Kafka 事务中以事务方式发布信息。 将此值设为 " ",报文将以非交易方式发布。 将此值设为 " 自动" ,可根据正在发布的邮件的 "属性 "文件夹中的 " 事务 "属性值在事务性和非事务性之间动态切换。 有关属性文件夹的更多信息,请参阅信息树

  • 交易 ID

    如果要以事务方式发布信息,则必须使用此值。 它唯一标识了发布报文的事务。 如果在同一流程中使用多个 KafkaProducer 节点,并希望在同一事务中发布所有消息,则所有 KafkaProducer 节点必须使用相同的 Transactional Id 属性值。 如果 KafkaProducer 节点使用不同的关键配置属性值,如 bootstrap serversclient Id ,则第二个 KafkaProducer 节点就会失败,并抛出异常,说明不兼容的细节。 您不能在不同的消息流中为 Transactional Id 使用相同的值,因为事务的作用域在消息流实例中。 如果您的消息流使用了其他实例,那么您指定的 Transactional Id 会以线程标识符为后缀,以区分同一消息流的两个线程。 如果在同一流程中有两个或多个 KafkaProducerTransactional Id 节点,则会为每个 Transactional Id 创建一个新事务。 每个事务在流程结束时都会独立提交或回滚。

  • 事务超时

    Kafka 如果从事务开始到最终提交或回滚之间的时间超过此值,就会自动将事务标记为回滚。

交易性和 Kafka 消费者

Kafka 消费者交易性由两个不同的部分组成,这两个部分经过配置,可以独立使用,但通常一起使用,以提供最高级别的信息可靠性。
  • 提交信息偏移量

    KafkaConsumer 节点收到信息时,通常会将其位置保存到 Kafka ,因此如果节点停止并重新启动,它可以在停止的位置继续处理信息。 节点上的 Commmit message offset in Kafka 属性 KafkaConsumer 节点上的属性可控制保存消费者位置的方式和时间。

    • 选择 "无 "时 KafkaConsumer 节点不会将其位置保存回 Kafka。 每次启动信息流时,都会根据节点的 " 基本 "选项卡上 " 默认信息偏移 "属性的配置重新确定消费者位置。

    • 关于传播

      当选择 "On propagate(传播时 )"时,节点会将消费者位置保存回。 KafkaConsumer 节点会将消费者位置保存回 Kafka ,然后才会将收到的信息传播到信息流中的下一个信息流节点。 如果流程调用成功完成,或以错误结束,或服务器重新启动,收到的报文不会被流程重新处理。 该选项提供 "最多一次 "级别的信息可靠性。

    • 关于同步传播

      该选项与 "On propagate(传播 )"类似,但在将接收到的信息传播到信息流中的下一个节点之前,无需等待消费者位置保存完成。 这可以显著提高信息吞吐量,尤其是在网络延迟成为问题的情况下,因为消费者位置的保存是与信息流对信息的处理同时进行的。 该选项不再保证 "最多一次 "级别的信息可靠性,但仍能提供较高的故障恢复能力。

    • 交易方面

      该选项可将消费者位置保存为交易的一部分。 启用该选项后,在信息传播到信息流之前,消费者位置会保存到事务中的 Kafka。 然后,在流程结束时提交事务,或在发生故障时回滚事务。 如果交易回滚,同样的信息会重新发送给消费者。

  • ISOLATION LEVEL
    KafkaConsumer 节点有一个名为 Isolation level 的属性,它可以控制向消费者发送哪些信息。
    • 读取未提交

      此值是 IBM App Connect Enterprise KafkaConsumer 节点上的默认值。 使用此值的消费者会在信息发布时收到信息,而无需等待发布者事务提交。 这意味着,如果事务被回滚, KafkaConsumer 节点可能会收到一条随后被丢弃的消息。

    • 已读取

      KafkaConsumer 配置此值的节点不会接收尚未提交的报文。 如果到达主题分区中未提交的报文,它们就会阻塞,直到报文提交或回滚。

    Isolation Level 属性也可用于 KafkaRead 节点上,操作方式相同。

事务性信息传递注意事项

  • 处理中毒信息

    Kafka 没有处理中毒报文的内置支持,因此报文会不断地重新传送给消费者,直到成功处理为止。 如果一个 KafkaConsumer 节点从一个以上的分区接收报文,那么在回滚之后,就不能保证下一个传送到流中的报文就是被回滚的报文。 下一条信息可能来自不同的分区。

    您可以通过配置重试选项卡上的值,在尝试重新处理报文前引入延迟,从而控制回滚报文的重新处理速度。 配置适当的重试值可以防止因临时问题无法处理报文而消耗过多的 CPU。

  • 请求 → 回复信息传递模式

    对于请求→回复等信息传递模式,信息流通常以一个 KafkaConsumer 节点开始,以 KafkaProducer 节点完成。 在这种情况下,最好是消费和产生后续信息都发生,或者两者都不发生。 在 Transactional Id 中使用相同的值 KafkaConsumer 节点和 KafkaProducer 节点上使用相同的值,可确保两者都出现或两者都不出现。 对于 KafkaConsumer 节点和 KafkaProducer 节点,如果它们对``属性使用 Transactional Id 相同的值,系统会对其进行验证;若发现这些值不兼容, IBM App Connect Enterprise 将抛出异常。

  • 在同一信息流中混合 Kafka 和非 Kafka 事务

    IBM App Connect Enterprise Kafka 节点不支持两阶段提交。 如果消息流同时使用了事务性 Kafka 节点和其他事务性节点(例如 IBM MQ 或数据库交互),则会创建多个事务。 每个事务都会在消息流结束时独立提交或回滚。