0%

Benthos 框架消息与Ack机制探索

Benthos框架消息与Ack机制探索

Benthos中Input组件或batchInput组件在Read消息时均会要求返回一个ackfunc, 用于在消息通过output组件后执行ack, 在正常使用的场景下存在以下需要注意的点

  1. 无论中间发生了什么, 消息一定会抵达output组件, 在其处理完毕后执行input, 及时中间处理流程返回了非nil的error
  2. 对于input组件, 每个消息都会在处理完毕后调用一次跟随消息一起返回的Ack
  3. 对于batchInput组件, 将会等待到所返回的那一批消息都处理完毕后, 调用跟随那批消息返回的ack
  4. 处理过程中返回的错误或output组件返回的错误都会传递到ackFunc中, 作为参数
  5. 消息处理的中间过程不能生成新的消息, 即不能使用service.NewMessage来创建新消息,使用消息自带的Copy来创建新的消息
  6. 无论input组件生产了一个消息还是一批消息, 无论中间发生了聚合还是拆分, 只有那次生产的消息都被output处理了, ack才会被执行

针对处理过程中消息的拆分和聚合, 可以做如下验证, 单Input做拆分, Input组件每秒产生一条消息为数字字符串1-100

Processor组件将每条消息拆分成一个或多个, 同时在ackfunc中输出确认的消息, 同时为了保证整个处理串行, 将会限制input组件的max_in_flight为1, 这将会导致系统只允许存在一个未确认消息, 上一个消息没有被确认前, 不会进行Read操作

在后续示例与验证中, 原始的数据为数字字符串1-100

聚合操作将会将多个字符串使用 + 连接

拆分操作将会按照拆分个数将字符串分为 x_0, x_1, x_2 … 等字符串

input代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}

d, err := r.l.Pop(ctx)
if err != nil {
return nil, nil, err
}

msg := service.NewMessage(d)
return msg, func(ctx context.Context, err error) error {
if err != nil {
r.log.Infof("get err %v", err)
}
fmt.Printf("ack input: %s\n", string(d))
return nil
}, nil
}

拆分的Processor代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *myUnArchive) Process(_ context.Context, message *service.Message) (service.MessageBatch, error) {
d, err := message.AsBytes()
if err != nil {
return nil, err
}

var newMsg msgType
err = json.Unmarshal(d, &newMsg)
if err != nil {
return nil, err
}

res := make(service.MessageBatch, 0)
for i := 0; i < m.split; i++ {
other := newMsg
other.Message += "_" + strconv.Itoa(i)
otherMsg := message.Copy() // !! 使用cpoy方法创建新的消息
otherMsg.SetStructuredMut(other)
res = append(res, otherMsg)
}

return res, nil
}

配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input:
rdb_input:
max_in_flight: 1
processors:
- sleep:
duration: 1s
- bloblang: |
root.message = this.string()
root.my_meta.get_time = now()
meta ttime = now()

pipeline:
processors:
- my_unarchive:
split: 3

output:
my_stdout:
max_in_flight: 100

最终可以发现结果为, 无论拆分几条, 只有所有被拆分的消息都被output输出, 才会执行ack

1
2
3
4
5
6
7
8
9
10
11
12
{"message":"1_0","my_meta":{"get_time":"2025-05-13T17:29:46.270812981+08:00"}}
{"message":"1_2","my_meta":{"get_time":"2025-05-13T17:29:46.270812981+08:00"}}
{"message":"1_1","my_meta":{"get_time":"2025-05-13T17:29:46.270812981+08:00"}}
ack input: 1
{"message":"2_0","my_meta":{"get_time":"2025-05-13T17:29:47.271699651+08:00"}}
{"message":"2_2","my_meta":{"get_time":"2025-05-13T17:29:47.271699651+08:00"}}
{"message":"2_1","my_meta":{"get_time":"2025-05-13T17:29:47.271699651+08:00"}}
ack input: 2
{"message":"3_0","my_meta":{"get_time":"2025-05-13T17:29:48.272672903+08:00"}}
{"message":"3_1","my_meta":{"get_time":"2025-05-13T17:29:48.272672903+08:00"}}
{"message":"3_2","my_meta":{"get_time":"2025-05-13T17:29:48.272672903+08:00"}}
ack input: 3

对于聚合操作进行如下编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *myArchive) ProcessBatch(ctx context.Context, batch service.MessageBatch) ([]service.MessageBatch, error) {
var archiveMsg msgType
msgs := make([]string, 0)
err := batch.WalkWithBatchedErrors(func(i int, message *service.Message) error {
d, err := message.AsBytes()
if err != nil {
return err
}

var newMsg msgType
err = json.Unmarshal(d, &newMsg)
if err != nil {
return err
}

msgs = append(msgs, newMsg.Message)
archiveMsg.Meta = newMsg.Meta
return nil
})
if err != nil {
return nil, err
}

res := strings.Join(msgs, "+")
archiveMsg.Message = res
batch[0].SetStructured(archiveMsg) // !! 使用原始的batch[0]消息承接
return []service.MessageBatch{{batch[0]}}, nil
}

组合聚合和拆分操作

1
2
3
4
5
6
7
pipeline:
processors:
- my_unarchive: # 先拆分成三个, 在聚合成一个, 再拆分成两个
split: 3
- my_archive: {}
- my_unarchive:
split: 2

输出如下

1
2
3
4
5
6
7
8
9
{"message":"1_0+1_1+1_2_0","my_meta":{"get_time":"2025-05-13 17:43:46"}}
{"message":"1_0+1_1+1_2_1","my_meta":{"get_time":"2025-05-13 17:43:46"}}
ack input: 1
{"message":"2_0+2_1+2_2_0","my_meta":{"get_time":"2025-05-13 17:43:47"}}
{"message":"2_0+2_1+2_2_1","my_meta":{"get_time":"2025-05-13 17:43:47"}}
ack input: 2
{"message":"3_0+3_1+3_2_0","my_meta":{"get_time":"2025-05-13 17:43:48"}}
{"message":"3_0+3_1+3_2_1","my_meta":{"get_time":"2025-05-13 17:43:48"}}
ack input: 3

可以发现, 不论中间发生了什么过程, 最终都会在本次input产生的消息处理完毕后再ack

对于batchInput, 可以调整配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
input:
rdb_input_batch:
max_in_flight: 1
each: 5
processors:
- sleep:
duration: 1s
- bloblang: |
root.message = this.string()
root.my_meta.get_time = now()
meta ttime = now()

pipeline:
processors:
- my_archive: {} # 聚合 batchInput的输出为一个

output:
my_stdout:
max_in_flight: 100

最终输出如下, 仍然按照上述规则, 所有input当次产生的处理完后再ack

1
2
3
4
5
6
7
8
{"message":"1+2+3+4+5","my_meta":{"get_time":"2025-05-13 17:46:12"}}
ack inputs: 1 2 3 4 5
{"message":"6+7+8+9+10","my_meta":{"get_time":"2025-05-13 17:46:17"}}
ack inputs: 6 7 8 9 10
{"message":"11+12+13+14+15","my_meta":{"get_time":"2025-05-13 17:46:22"}}
ack inputs: 11 12 13 14 15
{"message":"16+17+18+19+20","my_meta":{"get_time":"2025-05-13 17:46:27"}}
ack inputs: 16 17 18 19 20

拆分操作配置如下, 每条消息都会拆分两条出来

1
2
3
4
pipeline:
processors:
- my_unarchive:
split: 2

输出为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{"message":"1_0","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"3_1","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"4_0","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"4_1","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"5_0","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"5_1","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"1_1","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"2_1","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"2_0","my_meta":{"get_time":"2025-05-13 17:49:31"}}
{"message":"3_0","my_meta":{"get_time":"2025-05-13 17:49:31"}}
ack inputs: 1 2 3 4 5
{"message":"7_0","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"9_0","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"6_1","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"9_1","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"10_1","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"7_1","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"8_0","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"6_0","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"8_1","my_meta":{"get_time":"2025-05-13 17:49:36"}}
{"message":"10_0","my_meta":{"get_time":"2025-05-13 17:49:36"}}
ack inputs: 6 7 8 9 10

再次组合聚合和拆分

1
2
3
4
5
6
7
8
pipeline:
processors:
# 对每条条消息拆分成两条, 再把所有消息聚合为一条, 接着在拆分成3条消息
- my_unarchive:
split: 2
- my_archive: {}
- my_unarchive:
split: 3

输出如下

1
2
3
4
5
6
7
8
{"message":"1_0+1_1+2_0+2_1+3_0+3_1+4_0+4_1+5_0+5_1_0","my_meta":{"get_time":"2025-05-13 17:51:34"}}
{"message":"1_0+1_1+2_0+2_1+3_0+3_1+4_0+4_1+5_0+5_1_1","my_meta":{"get_time":"2025-05-13 17:51:34"}}
{"message":"1_0+1_1+2_0+2_1+3_0+3_1+4_0+4_1+5_0+5_1_2","my_meta":{"get_time":"2025-05-13 17:51:34"}}
ack inputs: 1 2 3 4 5
{"message":"6_0+6_1+7_0+7_1+8_0+8_1+9_0+9_1+10_0+10_1_1","my_meta":{"get_time":"2025-05-13 17:51:39"}}
{"message":"6_0+6_1+7_0+7_1+8_0+8_1+9_0+9_1+10_0+10_1_2","my_meta":{"get_time":"2025-05-13 17:51:39"}}
{"message":"6_0+6_1+7_0+7_1+8_0+8_1+9_0+9_1+10_0+10_1_0","my_meta":{"get_time":"2025-05-13 17:51:39"}}
ack inputs: 6 7 8 9 10

可以发现, 中间无论经过怎样的折腾, 只要按照Benthos的方式处理消息 即处理过程不创建新的消息, Benthos都可以保证生产的消息处理完之后才调用ack

-------------本文结束感谢您的阅读-------------