Benthos框架消息与Ack机制探索
Benthos中Input
组件或batchInput
组件在Read消息时均会要求返回一个ackfunc, 用于在消息通过output
组件后执行ack, 在正常使用的场景下存在以下需要注意的点
- 无论中间发生了什么, 消息一定会抵达
output
组件, 在其处理完毕后执行input
, 及时中间处理流程返回了非nil的error - 对于
input
组件, 每个消息都会在处理完毕后调用一次跟随消息一起返回的Ack - 对于
batchInput
组件, 将会等待到所返回的那一批消息都处理完毕后, 调用跟随那批消息返回的ack - 处理过程中返回的错误或
output
组件返回的错误都会传递到ackFunc中, 作为参数 - 消息处理的中间过程不能生成新的消息, 即不能使用
service.NewMessage
来创建新消息,使用消息自带的Copy
来创建新的消息 - 无论
input
组件生产了一个消息还是一批消息, 无论中间发生了聚合还是拆分, 只有那次生产的消息都被output处理了, ack才会被执行
针对处理过程中消息的拆分和聚合, 可以做如下验证, 单Input
做拆分, Input组件每秒产生一条消息为数字字符串1-100
Processor
组件将每条消息拆分成一个或多个, 同时在ackfunc中输出确认的消息, 同时为了保证整个处理串行, 将会限制input
组件的max_in_flight
为1, 这将会导致系统只允许存在一个未确认消息, 上一个消息没有被确认前, 不会进行Read操作