Benthos框架消息与Ack机制探索
Benthos中Input
组件或batchInput
组件在Read消息时均会要求返回一个ackfunc, 用于在消息通过output
组件后执行ack, 在正常使用的场景下存在以下需要注意的点
- 无论中间发生了什么, 消息一定会抵达
output
组件, 在其处理完毕后执行input
, 及时中间处理流程返回了非nil的error
- 对于
input
组件, 每个消息都会在处理完毕后调用一次跟随消息一起返回的Ack
- 对于
batchInput
组件, 将会等待到所返回的那一批消息都处理完毕后, 调用跟随那批消息返回的ack
- 处理过程中返回的错误或
output
组件返回的错误都会传递到ackFunc中, 作为参数
- 消息处理的中间过程不能生成新的消息, 即不能使用
service.NewMessage
来创建新消息,使用消息自带的Copy
来创建新的消息
- 无论
input
组件生产了一个消息还是一批消息, 无论中间发生了聚合还是拆分, 只有那次生产的消息都被output处理了, ack才会被执行
针对处理过程中消息的拆分和聚合, 可以做如下验证, 单Input
做拆分, Input组件每秒产生一条消息为数字字符串1-100
Processor
组件将每条消息拆分成一个或多个, 同时在ackfunc中输出确认的消息, 同时为了保证整个处理串行, 将会限制input
组件的max_in_flight
为1, 这将会导致系统只允许存在一个未确认消息, 上一个消息没有被确认前, 不会进行Read操作
在后续示例与验证中, 原始的数据为数字字符串1-100
聚合操作将会将多个字符串使用 + 连接
拆分操作将会按照拆分个数将字符串分为 x_0, x_1, x_2 … 等字符串
input
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { if ctx.Err() != nil { return nil, nil, ctx.Err() }
d, err := r.l.Pop(ctx) if err != nil { return nil, nil, err }
msg := service.NewMessage(d) return msg, func(ctx context.Context, err error) error { if err != nil { r.log.Infof("get err %v", err) } fmt.Printf("ack input: %s\n", string(d)) return nil }, nil }
|
拆分的Processor代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| func (m *myUnArchive) Process(_ context.Context, message *service.Message) (service.MessageBatch, error) { d, err := message.AsBytes() if err != nil { return nil, err }
var newMsg msgType err = json.Unmarshal(d, &newMsg) if err != nil { return nil, err }
res := make(service.MessageBatch, 0) for i := 0; i < m.split; i++ { other := newMsg other.Message += "_" + strconv.Itoa(i) otherMsg := message.Copy() otherMsg.SetStructuredMut(other) res = append(res, otherMsg) }
return res, nil }
|
配置文件如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| input: rdb_input: max_in_flight: 1 processors: - sleep: duration: 1s - bloblang: | root.message = this.string() root.my_meta.get_time = now() meta ttime = now()
pipeline: processors: - my_unarchive: split: 3
output: my_stdout: max_in_flight: 100
|
最终可以发现结果为, 无论拆分几条, 只有所有被拆分的消息都被output输出, 才会执行ack
1 2 3 4 5 6 7 8 9 10 11 12
| {"message":"1_0","my_meta":{"get_time":"2025-05-13T17:29:46.270812981+08:00"}} {"message":"1_2","my_meta":{"get_time":"2025-05-13T17:29:46.270812981+08:00"}} {"message":"1_1","my_meta":{"get_time":"2025-05-13T17:29:46.270812981+08:00"}} ack input: 1 {"message":"2_0","my_meta":{"get_time":"2025-05-13T17:29:47.271699651+08:00"}} {"message":"2_2","my_meta":{"get_time":"2025-05-13T17:29:47.271699651+08:00"}} {"message":"2_1","my_meta":{"get_time":"2025-05-13T17:29:47.271699651+08:00"}} ack input: 2 {"message":"3_0","my_meta":{"get_time":"2025-05-13T17:29:48.272672903+08:00"}} {"message":"3_1","my_meta":{"get_time":"2025-05-13T17:29:48.272672903+08:00"}} {"message":"3_2","my_meta":{"get_time":"2025-05-13T17:29:48.272672903+08:00"}} ack input: 3
|
对于聚合操作进行如下编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| func (m *myArchive) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) { var archiveMsg msgType msgs := make([]string, 0) err := batch.WalkWithBatchedErrors(func(i int, message *service.Message) error { d, err := message.AsBytes() if err != nil { return err }
var newMsg msgType err = json.Unmarshal(d, &newMsg) if err != nil { return err }
msgs = append(msgs, newMsg.Message) archiveMsg.Meta = newMsg.Meta return nil }) if err != nil { return nil, err }
res := strings.Join(msgs, "+") archiveMsg.Message = res batch[0].SetStructured(archiveMsg) return []service.MessageBatch{{batch[0]}}, nil }
|
组合聚合和拆分操作
1 2 3 4 5 6 7
| pipeline: processors: - my_unarchive: split: 3 - my_archive: {} - my_unarchive: split: 2
|
输出如下
1 2 3 4 5 6 7 8 9
| {"message":"1_0+1_1+1_2_0","my_meta":{"get_time":"2025-05-13 17:43:46"}} {"message":"1_0+1_1+1_2_1","my_meta":{"get_time":"2025-05-13 17:43:46"}} ack input: 1 {"message":"2_0+2_1+2_2_0","my_meta":{"get_time":"2025-05-13 17:43:47"}} {"message":"2_0+2_1+2_2_1","my_meta":{"get_time":"2025-05-13 17:43:47"}} ack input: 2 {"message":"3_0+3_1+3_2_0","my_meta":{"get_time":"2025-05-13 17:43:48"}} {"message":"3_0+3_1+3_2_1","my_meta":{"get_time":"2025-05-13 17:43:48"}} ack input: 3
|
可以发现, 不论中间发生了什么过程, 最终都会在本次input产生的消息处理完毕后再ack
对于batchInput
, 可以调整配置文件如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| input: rdb_input_batch: max_in_flight: 1 each: 5 processors: - sleep: duration: 1s - bloblang: | root.message = this.string() root.my_meta.get_time = now() meta ttime = now()
pipeline: processors: - my_archive: {}
output: my_stdout: max_in_flight: 100
|
最终输出如下, 仍然按照上述规则, 所有input当次产生的处理完后再ack
1 2 3 4 5 6 7 8
| {"message":"1+2+3+4+5","my_meta":{"get_time":"2025-05-13 17:46:12"}} ack inputs: 1 2 3 4 5 {"message":"6+7+8+9+10","my_meta":{"get_time":"2025-05-13 17:46:17"}} ack inputs: 6 7 8 9 10 {"message":"11+12+13+14+15","my_meta":{"get_time":"2025-05-13 17:46:22"}} ack inputs: 11 12 13 14 15 {"message":"16+17+18+19+20","my_meta":{"get_time":"2025-05-13 17:46:27"}} ack inputs: 16 17 18 19 20
|
拆分操作配置如下, 每条消息都会拆分两条出来
1 2 3 4
| pipeline: processors: - my_unarchive: split: 2
|
输出为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| {"message":"1_0","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"3_1","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"4_0","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"4_1","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"5_0","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"5_1","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"1_1","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"2_1","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"2_0","my_meta":{"get_time":"2025-05-13 17:49:31"}} {"message":"3_0","my_meta":{"get_time":"2025-05-13 17:49:31"}} ack inputs: 1 2 3 4 5 {"message":"7_0","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"9_0","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"6_1","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"9_1","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"10_1","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"7_1","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"8_0","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"6_0","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"8_1","my_meta":{"get_time":"2025-05-13 17:49:36"}} {"message":"10_0","my_meta":{"get_time":"2025-05-13 17:49:36"}} ack inputs: 6 7 8 9 10
|
再次组合聚合和拆分
1 2 3 4 5 6 7 8
| pipeline: processors:
- my_unarchive: split: 2 - my_archive: {} - my_unarchive: split: 3
|
输出如下
1 2 3 4 5 6 7 8
| {"message":"1_0+1_1+2_0+2_1+3_0+3_1+4_0+4_1+5_0+5_1_0","my_meta":{"get_time":"2025-05-13 17:51:34"}} {"message":"1_0+1_1+2_0+2_1+3_0+3_1+4_0+4_1+5_0+5_1_1","my_meta":{"get_time":"2025-05-13 17:51:34"}} {"message":"1_0+1_1+2_0+2_1+3_0+3_1+4_0+4_1+5_0+5_1_2","my_meta":{"get_time":"2025-05-13 17:51:34"}} ack inputs: 1 2 3 4 5 {"message":"6_0+6_1+7_0+7_1+8_0+8_1+9_0+9_1+10_0+10_1_1","my_meta":{"get_time":"2025-05-13 17:51:39"}} {"message":"6_0+6_1+7_0+7_1+8_0+8_1+9_0+9_1+10_0+10_1_2","my_meta":{"get_time":"2025-05-13 17:51:39"}} {"message":"6_0+6_1+7_0+7_1+8_0+8_1+9_0+9_1+10_0+10_1_0","my_meta":{"get_time":"2025-05-13 17:51:39"}} ack inputs: 6 7 8 9 10
|
可以发现, 中间无论经过怎样的折腾, 只要按照Benthos的方式处理消息 即处理过程不创建新的消息
, Benthos都可以保证生产的消息处理完之后才调用ack