Go语言数据转发处理框架Benthos
基础结构及配置
benthos基本配置
benthos的配置文件格式为yaml
, 按照逻辑基本可以划分为三个段落, input
, processor
, output
, 数据自input
接收, 经过 processor
处理, 通过output
产出, 另外还有一些logger
等其余配置, 各个常用内容大致如下:
input
: 数据的输入源processors
: 一系列处理器caches
: 一种键/值存储,可供某些组件用于诸如重复数据删除或数据连接等应用, 一般使用cache_resources
配置rate limits
: 限速器, 一般使用rate_limit_resources
配置buffers
: 缓冲区, 一般紧接着input
, 用于缓冲输入到下游之间的数据metrics
: 审计内容tracers
: 追踪器, 用于消息的跟踪
一个基础的benthos
配置文件可以参考如下
1 | http: # 为benthos开启一个HTTP服务, 包含一些基础的请求访问, 例如 /ping /ready /version三个api, 可以通过访问/endpoints获取所有api |
http.debug_endpoints 开关开启后, 将包含一下接口
/debug/config/json
以 JSON 形式返回已加载的配置。/debug/config/yaml
以 YAML 形式返回已加载的配置。/debug/pprof/block
以 pprof 格式的块配置文件进行响应。/debug/pprof/heap
以 pprof 格式的堆配置文件进行响应。/debug/pprof/mutex
以 pprof 格式的互斥配置文件进行响应。/debug/pprof/profile
以 pprof 格式的 CPU 配置文件进行响应。/debug/pprof/goroutine
以 pprof 格式的 goroutine 配置文件进行响应。/debug/pprof/symbol
查找请求中列出的程序计数器,并以将程序计数器映射到函数名称的表进行响应。/debug/pprof/trace
以二进制形式响应执行跟踪。跟踪持续时间(以秒为单位,由 GET 参数指定),如果未指定,则为 1 秒。/debug/stack
返回当前服务堆栈跟踪的快照。
input
组件
input
组件分为input
和batchInput
, 定义如下
1 | // AckFunc 是一个通用函数,由输入组件返回,用于消费消息的确认,每个消息被消费后必须调用一次。 |
自定义的Input
组件应该实现以上两种方法之一, 并在包的init
方法中注册, 使用 RegisterInput
或 RegisterBatchInput
, Input
组件的工作流程大致如下:
- 不断调用
Connect
方法, 直到其返回nil
- 不断调用
Read
/ReadBatch
方法, 直到其返回ErrNotConnected
, 此时将会重新从第一步开始, 返回ErrEndOfInput
则会终止整个流水线, 或者当ctx超时 - 在退出时调用
Close
方法
一个input
组件可参考如下, 其从Redis
的stream
中接收数据并处理
1 | input: |
关于
input
组件所返回的ackfunackfunc的调用时机
在Benthos中,消息确认函数(ackfunc)的调用时机与消息处理流程密切相关。当消息成功到达输出目的地并被确认后,ackfunc才会被调用, 这遵循Benthos的严格传递保证机制。
例如,在同步响应场景中,文档明确指出:
然而,重要的是要记住,由于Redpanda Connect的严格传递保证,响应消息实际上不会被返回,直到消息已经到达其输出目的地并且可以进行确认。
错误处理与ackfunc的关系
当处理器中发生错误时,Benthos不会简单地丢弃消息,而是会标记这些消息并继续尝试发送它们:
一些处理器有可能失败的条件, Benthos不会丢弃失败的消息,而是仍然尝试将这些消息发送出去,并且有过滤、恢复或死信队列处理失败消息的机制。
处理器错误与ackfunc
当处理器返回非nil的错误时,消息会被标记为失败,但仍会继续通过管道传递。这些错误不会直接传递到input的ackfunc中,而是会:
- 增加相应处理器的错误指标
- 生成描述错误的调试级别日志
- 为消息添加错误标志
ackfunc捕获链路中错误信息
需要保证一下几点
- 中间processor不重置消息
- 在output组件中将message中的错误信息返回
broker
包装器
在正常情况下, 一个配置文件只允许存在一个Input
组件, 如果有多个, 需要使用包装器broker
来组合多个输入
1 | # All config fields, showing default values |
grnrtate
组件
使用无需上下文即可执行的Bloblang映射,按给定间隔生成消息
1 | # Config fields, showing default values |
更多组件参考 https://docs.redpanda.com/redpanda-connect/components/inputs/about/
processor
组件
processor组件分为Processor
和BatchProcessor
, 定义如下
1 | // Processor 是 Benthos 处理器的一种实现,用于处理单个消息。 |
如果想要实现自定义processor
直接实现以上两种方法之一即可, 需要注意的一些点为
整个流程中, 如果某个或多个
Process
或ProcessBatch
返回了非nil
的error
, 不会导致数据处理流程中断, 而是会继续流转, 直到最终走到output
组件后, 层层返回Ack
, 如果想要实现中间一个流程失败, 后续处理器都不处理或做其他处理等可以参考以下写法在
Process
方法中处理, 存在错误则直接跳过消息处理1
2
3
4
5
6func (p *myPerssor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
err := msg.GetError() // 从消息中获取错误, 如果获取到的非空, 就直接返回, 跳过该消息的处理
if err != nil {
return nil, err
}
}在配置中通过
catch
或try
处理, 通过catch
捕获失败的处理, 或通过try
尝试处理1
2
3
4
5
6
7pipeline:
processors:
- resource: foo # 先使用 foo 处理器处理
- catch: # 如果上一步处理错误, 将会执行 bar 和 baz
- resource: bar
- resource: baz
#当消息离开 catch 块时, 错误标记将会被清除1
2
3
4
5
6pipeline:
processors:
- try:
- resource: processor_1
- resource: processor_2 # Skip if processor_1 fails
- resource: processor_3 # Skip if processor_1 or processor_2 fails1
2
3
4
5
6
7
8
9pipeline:
processors:
- try:
- resource: processor_1 # Processor that might fail
- resource: processor_2 # Processor that might fail
- resource: processor_3 # Processor that might fail
- catch:
- log:
message: "Processor ${!error_source_label()} failed due to: ${!error()}"1
2
3
4
5
6
7
8
9pipeline:
processors:
- resource: processor_1 # Processor that might fail
- resource: processor_2 # Processor that might fail
- resource: processor_3 # Processor that might fail
- catch:
- mapping: |
root = this
root.meta.error = error()使用
switch
处理器检查错误1
2
3
4
5
6
7pipeline:
processors:
- resource: processor_1 # Processor that might fail
- switch:
- check: errored()
processors:
- resource: processor_2 # Processes rerouted messages更错错误处理等参考 https://docs.redpanda.com/redpanda-connect/configuration/error_handling/
在整个流程中, 消息始终存在一份, 如果没有特殊情况, 应该将输入修改后直接传出, 不能在消息内直接新建消息返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func (p *myPerssor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
err := msg.GetError()
if err != nil {
return nil, err
}
// 返回消息中存储的结构化数据, 其中 Mut 后缀代表你可以直接对返回值进行修改
tmpTask, err := msg.AsStructuredMut()
if err != nil {
return nil, err
}
task := tmpTask.(*types.TaskProcess)
task.SomeVal = 123
// 直接返回原始消息
return service.MessageBatch{msg}, nil
// 重新set数据
msg.SetStructuredMut(newTask())
msg.SetStructuredMut(task)
return service.MessageBatch{msg}, nil
// !!! 不能这么干 !!!
return service.MessageBatch{service.NewMessage([]byte("xxx"))}, nil
}
processor
常见配置
处理器是通过 config 设置的,根据它们在配置中的位置,它们将在特定输入之后(在 input 部分中设置)、所有消息(在 pipeline 部分中设置)或特定输出之前(在 output 部分中设置)立即运行。大多数处理器适用于所有消息,并且可以放置在 pipeline 部分中
1 | pipeline: |
关于
pipeline
:在 Redpanda Connect 配置中,
input
和output
之间是一个pipeline
部分。本节介绍将应用于所有消息且不绑定到任何特定 input 或 output 的处理器数组。如果处理器占用大量 CPU,并且不依赖于某个特定的输入或输出,则它们最适合 pipeline 部分。使用 pipeline 部分是有利的,因为它允许您设置并行执行线程的显式数量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 input:
resource: foo
pipeline:
threads: 4 # 如果字段 threads 设置为 -1(默认值),它将自动匹配可用的逻辑 CPU 数量
processors:
- mapping: |
root = this
fans = fans.map_each(match {
this.obsession > 0.5 => this
_ => deleted()
})
output:
resource: bar
也可以使用processor
作为输出并配合reject
, 前提是processor
可以返回错误并且是唯一错误, 例如Redis
的插入失败等, 配置如
1 | output: |
以上配置描述的执行流程为
- 如果
redis
处理器执行成功, 则执行mapping
处理器 - 如果
redis
处理器执行失败,mapping
处理器将不会执行, 消息路由到reject
输出, 其将会返回一个Nack
switch
组件
根据消息的内容有条件地处理消息, 对于每个 switch case,将执行一个 Bloblang
查询 ,如果结果为 true(或检查为空),则对消息执行子处理器, 其字段如下
[].check
一个bloblang
查询, 应返回true
/false
, 如果留空,则 case 始终通过。如果检查映射引发错误,则消息将被标记为失败 ,并且不会针对任何其他case进行测试。[].processors
处理器集合, 将要对消息执行处理的处理器列表, 将会按顺序处理[].fallthrough
布尔值 如果此 case 通过消息,是否还应执行下一个 case
1 | pipeline: |
branch
组件
branch
组件可以通过bloblang mapping
创建新的请求消息, 对请求消息执行一些列处理器后, 将结果映射回原始消息, 可以用于在替换内容时需要保留原始消息内容, 字段如下
request_map
字符串, 描述如何创建分支内的消息, 如果留空则表示将原始消息复制一份传进来processors
array, 处理器列表result_map
字符串, 用于描述将branch
处理器中一系列processor
处理完毕后的消息映射回原始消息, 如果留空则原始消息不会改变
如果 request_map
失败,则不会执行子处理器。如果子处理器本身导致 (未捕获的) 错误,则不会执行 result_map
。如果 result_map
失败,则消息将保持不变
更多组件参照
https://docs.redpanda.com/redpanda-connect/components/processors/about/
Output
组件
output
组件为消息最终的输出端, 在 Redpanda Connect 配置的根目录中仅配置了一个输出。但是,输出可以是在所选代理模式下组合多个输出的broker
,也可以是用于多路复用不同输出的 switch
, 其结构定义如下
1 | // Output 是一个由 Benthos 实现的接口,用于支持单条消息的写入操作。每个调用 Write 的次数 |
output
的一些机制
背压机制 (Back pressure)
输出端的压力将会传递到上游, 例如把水管的出口堵死, 入口就冲不进水, 当output
被阻塞时(可能是网络问题或者是什么怪问题), 整个流水线将会停止, 直到output
变得正常
重试机制
当output
无法发送消息时(可以是output
本身的问题, 或者上游某个步骤失败走到了这里),错误将传递回input
,根据协议,它将被作为 Nack 推送回源(例如 AMQP),或者将无限期地重新尝试提交直到成功(例如 Kafka)。也可以使用retey
包装器让其无限重试, 直到成功, broker
也可以这么操作, 常见用法如
1 | # All config fields, showing default values |
死信队列 (Dead letter queues)
可以使用fallback
包装器指定当原输出目标失败时的回退输出, 并按照给定好的顺序依次执行, 例如
1 | output: |
多路复用输出 (Multiplexing outputs)
插值多路复用
配置中支持直接使用配置插值, 例如
1 | output: |
switch
多路复用
1 | output: |
output
相关组件及包装器参照 https://docs.redpanda.com/redpanda-connect/components/outputs/about/