0%

Pulsar消费者提交与消费机制小结

Pulsar消费者提交与消费机制小结

消费偏移量与提交偏移量

Pulsar中消费者的消费偏移量(consumption offset)和提交偏移量(committed offset)并不是同一个概念。

  1. 消费偏移量(Consumption Offset)
    • 消费偏移量指的是消费者当前正在消费的消息的偏移量。它表示消费者已经从消息流中读取到的位置。
    • 消费偏移量是消费者在消费消息时记录的当前位置,但它并不一定意味着这些消息已经被处理完毕或确认。
  2. 提交偏移量(Committed Offset)
    • 提交偏移量是消费者已经成功处理并确认的消息的偏移量。它表示消费者已经处理完这些消息,并且可以安全地认为这些消息不会被再次消费。
    • 提交偏移量通常是通过消费者显式地调用提交偏移量的API来完成的,例如acknowledgecommit方法。

区别:

  • 消费偏移量是消费者当前正在处理的消息的位置,而提交偏移量是消费者已经处理完毕并确认的消息的位置。
  • 提交偏移量通常是消费偏移量的一个子集,表示已经处理完毕的消息。

在Pulsar默认情况下, 消费者进程不重启, 无论消费者以怎样的顺序提交消息, 都不会影响当前消费的偏移量

ackTimeout与DLQ

1、ackTimeout - Pulsar消费者的配置项, 默认情况下是关闭状态

  • ackTimeout 默认值为 0
    • 如果 ackTimeout 未设置或设置为 0,Pulsar 不会自动重新投递未确认的消息。
    • 这意味着,如果消费者消费了消息但没有提交确认(ack),这些消息会一直保留在消费者的未确认队列中,直到消费者显式地提交确认或重新投递。
  • 未确认消息的处理
    • 如果消费者一直不提交确认,未确认的消息会占用 Pulsar 的内存或磁盘资源(取决于 Pulsar 的存储模式)。
    • 如果未确认的消息过多,可能会导致资源耗尽,影响系统性能。

go语言的Pulsar客户端不支持该选项

git Issues:

  1. Go客户端不支持ack timeout功能,这是有意为之的。
  2. negativeAck被认为是解决问题的首选方法,而ackTimeout被视为遗留功能。
  3. 未明确说明未来会支持该选项

2、DLQ - Dead Letter Queue 死信队列

死信队列用于处理那些多次重试后仍然无法成功处理的消息。当消息的重试次数超过配置的最大重试次数时,Pulsar 会将这些消息发送到死信队列,避免无限重试。

go语言启用死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
// 配置死信队列策略
deadLetterPolicy := pulsar.DeadLetterPolicy{
MaxRedeliverCount: 3, // 最大重试次数, 如果消息的重试次数超过此值,Pulsar 会将消息发送到死信队列。
DeadLetterTopic: "my-dead-letter-topic", // 死信队列的 Topic, 如果未指定,Pulsar 会自动生成一个死信队列 Topic。
}

// 创建消费者
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
DeadLetterPolicy: &deadLetterPolicy, // 启用死信队列
})

死信队列的工作流程

  1. 消息重试
    • 消费者消费消息后,如果处理失败,可以使用 NackackTimeout 触发消息重试。
    • 消息会被重新投递给消费者,直到达到 MaxRedeliverCount 指定的最大重试次数。
  2. 发送到死信队列
    • 如果消息的重试次数超过 MaxRedeliverCount,Pulsar 会将消息发送到指定的死信队列 Topic。
    • 死信队列中的消息可以被单独处理,例如记录日志、人工干预等。
  3. 避免无限重试
    • 通过死信队列,可以避免消息无限重试,防止资源浪费。

git Issues: https://github.com/streamnative/pulsar-client-go/issues/173

go客户端与Java客户端对死信队列中 MaxRedeliverCount 配置存在差异:

在Pulsar的DLQPolicy中,MaxDeliveries属性的含义不明确,导致Java和Go客户端的实现和测试中存在差异。

关键点讨论:

  1. Java客户端定义MaxDeliveries表示消息在被发送到死信队列之前将重新传递的最大次数。
  2. Go客户端定义MaxDeliveries表示消息在被发送到死信队列之前将传递的最大次数。
  3. 测试中的差异
    • Java客户端测试中,NackreconsumeLater的预期结果不同,Nack测试期望得到redelivery+1条消息。
    • Go客户端测试中,Nack测试的预期结果是总传递次数,与Java客户端的reconsumeLater测试预期不同。
  4. 命名问题:Go客户端的命名可能存在问题,因为MaxDeliveries实际上用于提供最大重新传递次数。

总结:
MaxDeliveries属性的含义在不同客户端和测试中不一致,导致混淆。需要明确其定义并统一各客户端的实现和测试预期。

Java客户端的例子

假设我们设置MaxDeliveries = 3,即消息最多重新传递3次。

  1. 第一次传递:消息被传递给消费者。
  2. 第一次重试:消费者Nack消息,消息被重新传递。
  3. 第二次重试:消费者再次Nack消息,消息再次被重新传递。
  4. 第三次重试:消费者再次Nack消息,消息再次被重新传递。
  5. 第四次传递:如果消费者再次Nack消息,消息将被发送到死信队列(DLQ)。

总结

  • MaxDeliveries = 3表示消息最多被重新传递3次。
  • 总共传递次数为4次(1次原始传递 + 3次重试)。

Go客户端的例子

同样设置MaxDeliveries = 3,即消息最多传递3次。

  1. 第一次传递:消息被传递给消费者。
  2. 第二次传递:消费者Nack消息,消息被重新传递。
  3. 第三次传递:消费者再次Nack消息,消息被重新传递。
  4. 第四次传递:如果消费者再次Nack消息,消息将被发送到死信队列(DLQ)。

总结

  • MaxDeliveries = 3表示消息最多被传递3次。
  • 总共传递次数为3次(1次原始传递 + 2次重试)。

差异对比

客户端 MaxDeliveries 含义 总共传递次数
Java 重新传递次数 4次
Go 总传递次数 3次

结论

Java客户端的MaxDeliveries表示“重新传递的次数”,而Go客户端的MaxDeliveries表示“总传递的次数”。这种差异导致了在测试和实际使用中的混淆。

消费者的消费行为

1、消费者的消费模式

在默认情况下,Pulsar 消费者的工作模式是基于 消息确认(Acknowledgment, Ack) 的。消费者的消费行为可以分为以下几个阶段:

  1. 消费消息:消费者从 Pulsar 的 Topic 中拉取消息。
  2. 处理消息:消费者处理消息。
  3. 确认消息:消费者调用 Ack 确认消息,表示消息已被成功处理。

2、消息确认机制

  • 已确认的消息:消费者调用 Ack 的消息会被标记为已确认,Pulsar 会从消费者的未确认队列中移除这些消息。
  • 未确认的消息:消费者未调用 Ack 的消息会保留在未确认队列中。

3、 程序崩溃后的行为

当消费者程序崩溃后重新启动时,Pulsar 消费者会从 未确认的消息 开始消费。

例如

  • 生产者生产了 1, 2, 3, 4, 5消费者全部消费成功, 但仅仅提交了 1, 3, 4, 5,
  • 没有提交2的时候, 程序崩溃重启, 同时假设生产者在5之后并未生产消息
  • 消费者重新启动之后消费的顺序将是2, 不会重新消费3, 4, 5, 因为他们是已经确认的消息

  • 生产者生产了 1, 2, 3, 4, 5消费者全部消费成功, 但仅仅提交了 1, 3, 4, 5,
  • 没有提交2的时候, 程序崩溃重启, 同时假设生产者在5之后生产了6, 7, 8
  • 消费者重新启动之后消费的顺序将是2, 6, 7, 8, 不会重新消费3, 4, 5, 因为他们是已经确认的消息

可能会影响以上行为的配置项如下

配置项 作用 默认值 影响
ackTimeout 未确认消息的超时时间,触发自动重新投递。 0 如果设置为非零值,未确认的消息会在超时后自动重新投递。
ackTimeoutTickTime 检查未确认消息的时间间隔。 1 秒 影响 ackTimeout 的检查频率。
negativeAckRedeliveryDelay Nack 触发重新投递的延迟时间。 1 分钟 影响 Nack 触发重新投递的时间。
replicateSubscriptionState 是否将订阅状态复制到其他集群。 false 影响跨集群故障切换时的未确认消息处理。如果设置为 false,订阅状态不会复制,重启后只会从本地未确认的消息开始消费
maxUnackedMessagesPerConsumer 每个消费者允许的最大未确认消息数量。 50000 影响消费者的未确认消息数量限制。
enableBatchIndexAcknowledgment 是否启用批量消息的索引级别确认。 false 影响批量消息的确认粒度。如果设置为 true,消费者可以确认批量消息中的部分消息,而不是整个批量消息
autoUpdatePartitions 是否自动更新分区信息。 true 影响消费者对新分区的订阅行为。如果设置为 false,消费者不会订阅新创建的分区,重启后可能会丢失新分区的消息。
-------------本文结束感谢您的阅读-------------