Pulsar消费者提交与消费机制小结
消费偏移量与提交偏移量
Pulsar中消费者的消费偏移量(consumption offset)和提交偏移量(committed offset)并不是同一个概念。
- 消费偏移量(Consumption Offset)
- 消费偏移量指的是消费者当前正在消费的消息的偏移量。它表示消费者已经从消息流中读取到的位置。
- 消费偏移量是消费者在消费消息时记录的当前位置,但它并不一定意味着这些消息已经被处理完毕或确认。
- 提交偏移量(Committed Offset)
- 提交偏移量是消费者已经成功处理并确认的消息的偏移量。它表示消费者已经处理完这些消息,并且可以安全地认为这些消息不会被再次消费。
- 提交偏移量通常是通过消费者显式地调用提交偏移量的API来完成的,例如
acknowledge
或commit
方法。
区别:
- 消费偏移量是消费者当前正在处理的消息的位置,而提交偏移量是消费者已经处理完毕并确认的消息的位置。
- 提交偏移量通常是消费偏移量的一个子集,表示已经处理完毕的消息。
在Pulsar默认情况下, 消费者进程不重启, 无论消费者以怎样的顺序提交消息, 都不会影响当前消费的偏移量
ackTimeout与DLQ
1、ackTimeout - Pulsar消费者的配置项, 默认情况下是关闭状态
ackTimeout
默认值为0
- 如果
ackTimeout
未设置或设置为0
,Pulsar 不会自动重新投递未确认的消息。 - 这意味着,如果消费者消费了消息但没有提交确认(
ack
),这些消息会一直保留在消费者的未确认队列中,直到消费者显式地提交确认或重新投递。
- 如果
- 未确认消息的处理
- 如果消费者一直不提交确认,未确认的消息会占用 Pulsar 的内存或磁盘资源(取决于 Pulsar 的存储模式)。
- 如果未确认的消息过多,可能会导致资源耗尽,影响系统性能。
go语言的Pulsar客户端不支持该选项
git Issues:
- https://github.com/streamnative/pulsar-client-go/issues/44
- https://github.com/apache/pulsar-client-go/pull/197
- Go客户端不支持ack timeout功能,这是有意为之的。
- negativeAck被认为是解决问题的首选方法,而ackTimeout被视为遗留功能。
- 未明确说明未来会支持该选项
2、DLQ - Dead Letter Queue 死信队列
死信队列用于处理那些多次重试后仍然无法成功处理的消息。当消息的重试次数超过配置的最大重试次数时,Pulsar 会将这些消息发送到死信队列,避免无限重试。
go语言启用死信队列
1 | // 配置死信队列策略 |
死信队列的工作流程
- 消息重试
- 消费者消费消息后,如果处理失败,可以使用
Nack
或ackTimeout
触发消息重试。 - 消息会被重新投递给消费者,直到达到
MaxRedeliverCount
指定的最大重试次数。
- 消费者消费消息后,如果处理失败,可以使用
- 发送到死信队列
- 如果消息的重试次数超过
MaxRedeliverCount
,Pulsar 会将消息发送到指定的死信队列 Topic。 - 死信队列中的消息可以被单独处理,例如记录日志、人工干预等。
- 如果消息的重试次数超过
- 避免无限重试
- 通过死信队列,可以避免消息无限重试,防止资源浪费。
git Issues: https://github.com/streamnative/pulsar-client-go/issues/173
go客户端与Java客户端对死信队列中 MaxRedeliverCount 配置存在差异:
在Pulsar的DLQPolicy中,
MaxDeliveries
属性的含义不明确,导致Java和Go客户端的实现和测试中存在差异。关键点讨论:
- Java客户端定义:
MaxDeliveries
表示消息在被发送到死信队列之前将重新传递的最大次数。- Go客户端定义:
MaxDeliveries
表示消息在被发送到死信队列之前将传递的最大次数。- 测试中的差异
- Java客户端测试中,
Nack
和reconsumeLater
的预期结果不同,Nack
测试期望得到redelivery+1
条消息。- Go客户端测试中,
Nack
测试的预期结果是总传递次数,与Java客户端的reconsumeLater
测试预期不同。- 命名问题:Go客户端的命名可能存在问题,因为
MaxDeliveries
实际上用于提供最大重新传递次数。总结:
MaxDeliveries
属性的含义在不同客户端和测试中不一致,导致混淆。需要明确其定义并统一各客户端的实现和测试预期。Java客户端的例子
假设我们设置
MaxDeliveries = 3
,即消息最多重新传递3次。
- 第一次传递:消息被传递给消费者。
- 第一次重试:消费者
Nack
消息,消息被重新传递。- 第二次重试:消费者再次
Nack
消息,消息再次被重新传递。- 第三次重试:消费者再次
Nack
消息,消息再次被重新传递。- 第四次传递:如果消费者再次
Nack
消息,消息将被发送到死信队列(DLQ)。总结:
MaxDeliveries = 3
表示消息最多被重新传递3次。- 总共传递次数为4次(1次原始传递 + 3次重试)。
Go客户端的例子
同样设置
MaxDeliveries = 3
,即消息最多传递3次。
- 第一次传递:消息被传递给消费者。
- 第二次传递:消费者
Nack
消息,消息被重新传递。- 第三次传递:消费者再次
Nack
消息,消息被重新传递。- 第四次传递:如果消费者再次
Nack
消息,消息将被发送到死信队列(DLQ)。总结:
MaxDeliveries = 3
表示消息最多被传递3次。- 总共传递次数为3次(1次原始传递 + 2次重试)。
差异对比
客户端 MaxDeliveries
含义总共传递次数 Java 重新传递次数 4次 Go 总传递次数 3次
结论
Java客户端的
MaxDeliveries
表示“重新传递的次数”,而Go客户端的MaxDeliveries
表示“总传递的次数”。这种差异导致了在测试和实际使用中的混淆。
消费者的消费行为
1、消费者的消费模式
在默认情况下,Pulsar 消费者的工作模式是基于 消息确认(Acknowledgment, Ack) 的。消费者的消费行为可以分为以下几个阶段:
- 消费消息:消费者从 Pulsar 的 Topic 中拉取消息。
- 处理消息:消费者处理消息。
- 确认消息:消费者调用
Ack
确认消息,表示消息已被成功处理。
2、消息确认机制
- 已确认的消息:消费者调用
Ack
的消息会被标记为已确认,Pulsar 会从消费者的未确认队列中移除这些消息。 - 未确认的消息:消费者未调用
Ack
的消息会保留在未确认队列中。
3、 程序崩溃后的行为
当消费者程序崩溃后重新启动时,Pulsar 消费者会从 未确认的消息 开始消费。
例如
- 生产者生产了
1, 2, 3, 4, 5
消费者全部消费成功, 但仅仅提交了1, 3, 4, 5
, - 没有提交
2
的时候, 程序崩溃重启, 同时假设生产者在5
之后并未生产消息 - 消费者重新启动之后消费的顺序将是
2
, 不会重新消费3, 4, 5
, 因为他们是已经确认的消息
或
- 生产者生产了
1, 2, 3, 4, 5
消费者全部消费成功, 但仅仅提交了1, 3, 4, 5
, - 没有提交
2
的时候, 程序崩溃重启, 同时假设生产者在5
之后生产了6, 7, 8
- 消费者重新启动之后消费的顺序将是
2, 6, 7, 8
, 不会重新消费3, 4, 5
, 因为他们是已经确认的消息
可能会影响以上行为的配置项如下
配置项 | 作用 | 默认值 | 影响 |
---|---|---|---|
ackTimeout |
未确认消息的超时时间,触发自动重新投递。 | 0 | 如果设置为非零值,未确认的消息会在超时后自动重新投递。 |
ackTimeoutTickTime |
检查未确认消息的时间间隔。 | 1 秒 | 影响 ackTimeout 的检查频率。 |
negativeAckRedeliveryDelay |
Nack 触发重新投递的延迟时间。 |
1 分钟 | 影响 Nack 触发重新投递的时间。 |
replicateSubscriptionState |
是否将订阅状态复制到其他集群。 | false | 影响跨集群故障切换时的未确认消息处理。如果设置为 false ,订阅状态不会复制,重启后只会从本地未确认的消息开始消费 |
maxUnackedMessagesPerConsumer |
每个消费者允许的最大未确认消息数量。 | 50000 | 影响消费者的未确认消息数量限制。 |
enableBatchIndexAcknowledgment |
是否启用批量消息的索引级别确认。 | false | 影响批量消息的确认粒度。如果设置为 true ,消费者可以确认批量消息中的部分消息,而不是整个批量消息 |
autoUpdatePartitions |
是否自动更新分区信息。 | true | 影响消费者对新分区的订阅行为。如果设置为 false ,消费者不会订阅新创建的分区,重启后可能会丢失新分区的消息。 |