0%

Go语言数据转发处理框架Benthos基础结构及配置

Go语言数据转发处理框架Benthos基础结构及配置

benthos基本配置

benthos的配置文件格式为yaml, 按照逻辑基本可以划分为三个段落, input, processor, output, 数据自input接收, 经过 processor处理, 通过output产出, 另外还有一些logger等其余配置, 各个常用内容大致如下:

  • input: 数据的输入源
  • processors: 一系列处理器
  • caches: 一种键/值存储,可供某些组件用于诸如重复数据删除或数据连接等应用, 一般使用 cache_resources 配置
  • rate limits: 限速器, 一般使用 rate_limit_resources 配置
  • buffers: 缓冲区, 一般紧接着 input, 用于缓冲输入到下游之间的数据
  • metrics: 审计内容
  • tracers: 追踪器, 用于消息的跟踪

一个基础的benthos配置文件可以参考如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
http: # 为benthos开启一个HTTP服务, 包含一些基础的请求访问, 例如 /ping /ready /version三个api, 可以通过访问/endpoints获取所有api
address: 0.0.0.0:4195
debug_endpoints: false # 是否开启debug模式

# 数据的输入, 这里使用了kafka组件
input:
kafka:
addresses: [ TODO ]
topics: [ foo, bar ]
consumer_group: foogroup

# buffer用于在input之后的缓冲, 可以在此对数据分组或其他操作
buffer:
none: {}

# 管道, 表示数据的处理流程, 数据将会按照processors中数组定义的顺序依次执行
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
- custom_process: {}
- resources: some_processor

# 输出器, 数据最终将通过此组件输出
output:
aws_s3:
bucket: TODO
path: '${! meta("kafka_topic") }/${! json("message.id") }.json'

# 各种resources定义, 以下的命名是benthos的关键字, 不能随便改
# 在这里可以预先定义好input, processor等, 并使用label标记, 随后在对应组件位置使用
input_resources: []
cache_resources: []
processor_resources:
- lable: some_processor
do_something: {}
rate_limit_resources: []
output_resources: []

logger:
level: INFO
static_fields:
'@service': benthos

metrics:
prometheus: {}

shutdown_timeout: 20s
shutdown_delay: ""

http.debug_endpoints 开关开启后, 将包含一下接口

  • /debug/config/json以 JSON 形式返回已加载的配置。
  • /debug/config/yaml以 YAML 形式返回已加载的配置。
  • /debug/pprof/block以 pprof 格式的块配置文件进行响应。
  • /debug/pprof/heap以 pprof 格式的堆配置文件进行响应。
  • /debug/pprof/mutex以 pprof 格式的互斥配置文件进行响应。
  • /debug/pprof/profile以 pprof 格式的 CPU 配置文件进行响应。
  • /debug/pprof/goroutine以 pprof 格式的 goroutine 配置文件进行响应。
  • /debug/pprof/symbol查找请求中列出的程序计数器,并以将程序计数器映射到函数名称的表进行响应。
  • /debug/pprof/trace以二进制形式响应执行跟踪。跟踪持续时间(以秒为单位,由 GET 参数指定),如果未指定,则为 1 秒。
  • /debug/stack返回当前服务堆栈跟踪的快照。

input组件

input组件分为inputbatchInput, 定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// AckFunc 是一个通用函数,由输入组件返回,用于消费消息的确认,每个消息被消费后必须调用一次。
// 这个函数的作用是确保消息来源接收到要么是确认(err 为 nil),要么是一个错误,错误要么作为 nack 传播到上游,要么触发重新投递相同消息的机制。
// 如果你的输入组件实现中没有具体处理 nack 的机制,那么你可以包装你的输入组件实现 AutoRetryNacks,以获得自动重试。
type AckFunc func(ctx context.Context, err error) error

//---------------------------------------------------------------

// Closer 由支持停止和清理其底层资源的组件实现
type Closer interface {
// 关闭组件,直到底层资源被清理或上下文被取消时阻塞。若上下文被取消,则返回一个错误。
Close(ctx context.Context) error
}

//---------------------------------------------------------------

// Input 是由 Benthos 输入实现的接口。对 Read 的调用应当阻塞,直到接收到消息、连接丢失或提供的上下文被取消。
type Input interface {
// 建立与上游服务的连接。在实例化读取器时,将始终首先调用 Connect,并将连续调用,直到返回 nil 错误。
//
// 提供的上下文仅在连接阶段期间保持打开,不应用于建立连接本身的生命周期。
//
// 一旦 Connect 返回 nil 错误,将调用 Read 方法,直到返回 ErrNotConnected 或读取器被关闭。
Connect(context.Context) error

// 从源中读取单个消息,以及在消息可以被确认(成功发送或故意过滤)或否定(未能处理或发送到输出)时调用的函数。
//
// AckFunc 将至少为每条消息调用一次,但没有保证会在何时发生。如果您的输入实现没有特定的机制来处理 nack,您可以将输入实现包装在 AutoRetryNacks 中,以获得自动重试。
//
// 如果此方法返回 ErrNotConnected,则在 Connect 返回 nil 错误之前将不会再次调用 Read。如果返回 ErrEndOfInput,则不再调用 Read,处理管道将优雅地终止。
Read(context.Context) (*Message, AckFunc, error)

Closer
}

//---------------------------------------------------------------

// BatchInput 是一个接口,由 Benthos 输入实现,该输入以批量方式生成
// 消息,在处理和发送整个批次时希望作为一个逻辑组,而不是单个消息。
//
// ReadBatch 的调用应该阻塞,直到准备好处理一批消息、连接丢失或
// 提供的上下文被取消。
type BatchInput interface {
// 建立与上游服务的连接。Connect 在读取器实例化时总是
// 首先被调用,并将持续调用,采用退避策略,直到返回 nil 错误。
//
// 提供的上下文仅在连接阶段持续有效,不应用于建立
// 连接本身的生命周期。
//
// 一旦 Connect 返回 nil 错误,将会调用 Read 方法,直到
// 返回 ErrNotConnected,或关闭读取器。
Connect(context.Context) error

// 从源读取一批消息,以及一个函数,该函数将在整个批次
// 可以被成功确认(已成功发送或故意过滤)或被否认(处理失败或
// 无法发送到输出)时被调用。
//
// AckFunc 将至少为每个消息批次调用一次,但没有保证
// 何时会发生。如果您的输入实现没有特定的机制来处理 nack,
// 则可以使用 AutoRetryNacksBatched 来包装您的输入实现
// 以获得自动重试。
//
// 如果此方法返回 ErrNotConnected,则 ReadBatch 不会
// 再次被调用,直到 Connect 返回 nil 错误。如果返回 ErrEndOfInput,
// 则将不再调用 Read,管道将优雅地终止。
ReadBatch(context.Context) (MessageBatch, AckFunc, error)

Closer
}

自定义的Input组件应该实现以上两种方法之一, 并在包的init方法中注册, 使用 RegisterInputRegisterBatchInput, Input组件的工作流程大致如下:

  1. 不断调用Connect方法, 直到其返回nil
  2. 不断调用Read/ReadBatch方法, 直到其返回 ErrNotConnected, 此时将会重新从第一步开始, 返回ErrEndOfInput则会终止整个流水线, 或者当ctx超时
  3. 在退出时调用 Close方法

一个input组件可参考如下, 其从Redisstream中接收数据并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input:
label: my_redis_input # 这个标签和input_resources中不是一个东西, 这个是方便做metric用的标签

# 使用benthos自带的redis_streams组件
redis_streams:
url: tcp://localhost:6379
streams:
- benthos_stream
body_key: body
consumer_group: benthos_group

# Optional list of processing steps
processors:
- mapping: |
root.document = this.without("links")
root.link_count = this.links.length()

关于input组件所返回的ackfun

ackfunc的调用时机

在Benthos中,消息确认函数(ackfunc)的调用时机与消息处理流程密切相关。当消息成功到达输出目的地并被确认后,ackfunc才会被调用, 这遵循Benthos的严格传递保证机制。

例如,在同步响应场景中,文档明确指出:

然而,重要的是要记住,由于Redpanda Connect的严格传递保证,响应消息实际上不会被返回,直到消息已经到达其输出目的地并且可以进行确认。

错误处理与ackfunc的关系

当处理器中发生错误时,Benthos不会简单地丢弃消息,而是会标记这些消息并继续尝试发送它们:

一些处理器有可能失败的条件, Benthos不会丢弃失败的消息,而是仍然尝试将这些消息发送出去,并且有过滤、恢复或死信队列处理失败消息的机制。

处理器错误与ackfunc

当处理器返回非nil的错误时,消息会被标记为失败,但仍会继续通过管道传递。这些错误不会直接传递到input的ackfunc中,而是会:

  1. 增加相应处理器的错误指标
  2. 生成描述错误的调试级别日志
  3. 为消息添加错误标志
ackfunc捕获链路中错误信息

需要保证一下几点

  1. 中间processor不重置消息
  2. 在output组件中将message中的错误信息返回

broker包装器

在正常情况下, 一个配置文件只允许存在一个Input组件, 如果有多个, 需要使用包装器broker来组合多个输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# All config fields, showing default values
input:
label: ""
broker:
copies: 1 # 如果副本数大于零,则列表将被复制该次数。例如,如果您的输入类型为 foo 和 bar,且“copies”设置为“2”,则最终将得到两个“foo”输入和两个“bar”输入
inputs:
- amqp_0_9: # 这个是官网配置, 总是这就是一个组件
urls:
- amqp://guest:guest@localhost:5672/
consumer_tag: benthos-consumer
queue: benthos-queue
# Optional list of input specific processing steps
processors: # 这里可以紧接一个processor来对数据做处理, 不影响接下来的流程
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
- kafka: # 第二个输入源, kafka
addresses:
- localhost:9092
client_id: benthos_kafka_input
consumer_group: benthos_consumer_group
topics: [ benthos_stream:0 ]
batching: # 可以使用批处理字段为代理配置批处理策略。执行此作时,将合并来自所有子 Importing 的源。某些内置的 inputs 组件不支持基于代理的批处理,并在其文档中指定了这一点。
count: 0 # 指定批次应刷新的消息数量。如果设置为 0 则禁用基于计数的批次。
byte_size: 0 # 指定批次刷新的字节数。如果设置为 0 则禁用基于大小的批次刷新。
period: "" # 无论批次大小如何,都应刷新不完整批次的时间段。其实就是超时时间, 可以使1s 1m, 15s这样的字符串
check: "" # Bloblang 查询应返回一个布尔值,指示消息是否应结束批处理。
processors: [] # No default (optional) # 刷新批次时应用的处理器列表。这允许您以合适的方式聚合和归档批次。请注意,所有生成的消息都将作为单个批次刷新,因此使用这些处理器将批次拆分为更小的批次是无操作的。这里可以接一些简短的处理组件, 例如format之类的

grnrtate组件

使用无需上下文即可执行的Bloblang映射,按给定间隔生成消息

1
2
3
4
5
6
7
8
9
# Config fields, showing default values
input:
label: ""
generate:
mapping: root = "hello world" # No default (required), 用于生成消息
interval: 1s # 时间间隔, 还可以是 1m, 1h, 0,30 */2 * * * *, '@every 1s' TZ=Europe/London 30 3-6,20-23 * * * 的形式
count: 0 # 可选生成的消息数量,如果设置为 0 以上,则会生成指定数量的消息,然后输入将关闭。
batch_size: 1 # 按照指定的时间间隔刷新到每批中应累积的生成消息的数量
auto_replay_nacks: true # 是否应无限期地自动重放输出级别被拒绝(nack)的消息,如果拒绝的原因持续存在,最终会导致背压。如果设置为“否”,false这些消息将被删除。禁用自动重放可以大大提高高吞吐量流的内存效率,因为数据的原始形状可以在消费和更改时立即丢弃。

更多组件参考 https://docs.redpanda.com/redpanda-connect/components/inputs/about/

processor组件

processor组件分为ProcessorBatchProcessor, 定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Processor 是 Benthos 处理器的一种实现,用于处理单个消息。
type Processor interface {
// Process 将一个消息处理成一个或多个结果消息,或在消息无法被处理时返回错误。若返回零条消息且错误为 nil,则表明该消息已被过滤。
//
// 当返回错误时,输入消息将继续沿管道传递但会带有错误标记(通过 *message.SetError),同时会记录指标和日志。失败的消息可以按照 https://docs.redpanda.com/redpanda-connect/configuration/error_handling 中的模式进行处理。
//
// 返回的 Message 类型必须来源于提供的消息,且不可自定义 Message 实例。如需复制提供的消息,请使用 Copy 方法。
Process(context.Context, *Message) (MessageBatch, error)

Closer
}

//---------------------------------------------------------------

// BatchProcessor 是 Benthos 处理器的一种实现,用于处理消息批次,这使得滑动窗口处理得以实现。
//
// 消息批次必须由上游组件(如输入组件、缓冲组件等)创建,否则该处理器将只能处理包含单个消息的批次。
type BatchProcessor interface {
// Process a batch of messages into one或多个结果 batches,若整个批次无法被处理则返回错误。若返回零条消息且错误为 nil,则表明所有消息均被过滤。
//
// 提供的 MessageBatch 不得被修改,如需返回修改后的批次 Tortoise,必须创建一个切片的副本。
//
// 当返回错误时,所有输入消息将继续沿管道传递但会带有错误标记(通过 *message.SetError),同时会记录指标和日志。
//
// 如需向批次中的单个消息添加供下游处理的错误,使用 *message.SetError(err) 方法,并在结果批次中将其返回,同时错误参数为 nil。
//
// 返回的 Message 类型必须来源于所提供的消息,不可自定义 Message 实例。如需复制提供的消息,请使用 Copy 方法。
ProcessBatch(context.Context, MessageBatch) ([]MessageBatch, error)

Closer
}

如果想要实现自定义processor直接实现以上两种方法之一即可, 需要注意的一些点为

  1. 整个流程中, 如果某个或多个ProcessProcessBatch返回了非nilerror, 不会导致数据处理流程中断, 而是会继续流转, 直到最终走到output组件后, 层层返回Ack, 如果想要实现中间一个流程失败, 后续处理器都不处理或做其他处理等可以参考以下写法

    1. Process方法中处理, 存在错误则直接跳过消息处理

      1
      2
      3
      4
      5
      6
      func (p *myPerssor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
      err := msg.GetError() // 从消息中获取错误, 如果获取到的非空, 就直接返回, 跳过该消息的处理
      if err != nil {
      return nil, err
      }
      }
    2. 在配置中通过catchtry处理, 通过catch捕获失败的处理, 或通过try尝试处理

      1
      2
      3
      4
      5
      6
      7
      pipeline:
      processors:
      - resource: foo # 先使用 foo 处理器处理
      - catch: # 如果上一步处理错误, 将会执行 bar 和 baz
      - resource: bar
      - resource: baz
      #当消息离开 catch 块时, 错误标记将会被清除
      1
      2
      3
      4
      5
      6
      pipeline:
      processors:
      - try:
      - resource: processor_1
      - resource: processor_2 # Skip if processor_1 fails
      - resource: processor_3 # Skip if processor_1 or processor_2 fails
      1
      2
      3
      4
      5
      6
      7
      8
      9
      pipeline:
      processors:
      - try:
      - resource: processor_1 # Processor that might fail
      - resource: processor_2 # Processor that might fail
      - resource: processor_3 # Processor that might fail
      - catch:
      - log:
      message: "Processor ${!error_source_label()} failed due to: ${!error()}"
      1
      2
      3
      4
      5
      6
      7
      8
      9
      pipeline:
      processors:
      - resource: processor_1 # Processor that might fail
      - resource: processor_2 # Processor that might fail
      - resource: processor_3 # Processor that might fail
      - catch:
      - mapping: |
      root = this
      root.meta.error = error()
    3. 使用 switch处理器检查错误

      1
      2
      3
      4
      5
      6
      7
      pipeline:
      processors:
      - resource: processor_1 # Processor that might fail
      - switch:
      - check: errored()
      processors:
      - resource: processor_2 # Processes rerouted messages
    4. 更错错误处理等参考 https://docs.redpanda.com/redpanda-connect/configuration/error_handling/

  2. 在整个流程中, 消息始终存在一份, 如果没有特殊情况, 应该将输入修改后直接传出, 不能在消息内直接新建消息返回

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (p *myPerssor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
    err := msg.GetError()
    if err != nil {
    return nil, err
    }

    // 返回消息中存储的结构化数据, 其中 Mut 后缀代表你可以直接对返回值进行修改
    tmpTask, err := msg.AsStructuredMut()
    if err != nil {
    return nil, err
    }

    task := tmpTask.(*types.TaskProcess)
    task.SomeVal = 123

    // 直接返回原始消息
    return service.MessageBatch{msg}, nil

    // 重新set数据
    msg.SetStructuredMut(newTask())
    msg.SetStructuredMut(task)
    return service.MessageBatch{msg}, nil

    // !!! 不能这么干 !!!
    return service.MessageBatch{service.NewMessage([]byte("xxx"))}, nil
    }

processor常见配置

处理器是通过 config 设置的,根据它们在配置中的位置,它们将在特定输入之后(在 input 部分中设置)、所有消息(在 pipeline 部分中设置)或特定输出之前(在 output 部分中设置)立即运行。大多数处理器适用于所有消息,并且可以放置在 pipeline 部分中

1
2
3
4
5
6
7
pipeline:
threads: 1
processors:
- label: my_cool_mapping
mapping: |
root.message = this
root.meta.link_count = this.links.length()

关于 pipeline:

在 Redpanda Connect 配置中,inputoutput之间是一个pipeline部分。本节介绍将应用于所有消息且不绑定到任何特定 input 或 output 的处理器数组。

如果处理器占用大量 CPU,并且不依赖于某个特定的输入或输出,则它们最适合 pipeline 部分。使用 pipeline 部分是有利的,因为它允许您设置并行执行线程的显式数量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
input:
resource: foo

pipeline:
threads: 4 # 如果字段 threads 设置为 -1(默认值),它将自动匹配可用的逻辑 CPU 数量
processors:
- mapping: |
root = this
fans = fans.map_each(match {
this.obsession > 0.5 => this
_ => deleted()
})

output:
resource: bar

也可以使用processor作为输出并配合reject, 前提是processor可以返回错误并且是唯一错误, 例如Redis的插入失败等, 配置如

1
2
3
4
5
6
7
8
9
output:
reject: 'failed to send data: ${! error() }'
processors:
- try:
- redis:
url: tcp://localhost:6379
command: sadd
args_mapping: 'root = [ this.key, this.value ]'
- mapping: root = deleted()

以上配置描述的执行流程为

  1. 如果redis处理器执行成功, 则执行 mapping处理器
  2. 如果redis处理器执行失败, mapping处理器将不会执行, 消息路由到reject输出, 其将会返回一个Nack

switch组件

根据消息的内容有条件地处理消息, 对于每个 switch case,将执行一个 Bloblang查询 ,如果结果为 true(或检查为空),则对消息执行子处理器, 其字段如下

  • [].check 一个bloblang查询, 应返回 true/false, 如果留空,则 case 始终通过。如果检查映射引发错误,则消息将被标记为失败 ,并且不会针对任何其他case进行测试。
  • [].processors 处理器集合, 将要对消息执行处理的处理器列表, 将会按顺序处理
  • [].fallthrough 布尔值 如果此 case 通过消息,是否还应执行下一个 case
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pipeline:
processors:
- switch:
- check: this.user.name.first != "George"
processors:
- metric:
type: counter
name: MessagesWeCareAbout

- processors:
- metric:
type: gauge
name: GeorgesAnger
value: ${! json("user.anger") }
- mapping: root = deleted()

branch组件

branch组件可以通过bloblang mapping创建新的请求消息, 对请求消息执行一些列处理器后, 将结果映射回原始消息, 可以用于在替换内容时需要保留原始消息内容, 字段如下

  • request_map 字符串, 描述如何创建分支内的消息, 如果留空则表示将原始消息复制一份传进来
  • processors array, 处理器列表
  • result_map 字符串, 用于描述将branch处理器中一系列processor处理完毕后的消息映射回原始消息, 如果留空则原始消息不会改变

如果 request_map 失败,则不会执行子处理器。如果子处理器本身导致 (未捕获的) 错误,则不会执行 result_map。如果 result_map 失败,则消息将保持不变


更多组件参照

https://docs.redpanda.com/redpanda-connect/components/processors/about/

Output组件

output组件为消息最终的输出端, 在 Redpanda Connect 配置的根目录中仅配置了一个输出。但是,输出可以是在所选代理模式下组合多个输出的broker ,也可以是用于多路复用不同输出的 switch, 其结构定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Output 是一个由 Benthos 实现的接口,用于支持单条消息的写入操作。每个调用 Write 的次数
// 安排应阻塞直到消息成功或不成功发送,或上下文被取消。
//
// 支持并行调用的最大并发数由输出组件的构造函数提供的 MaxInFlight 参数
// 指示。
type Output interface {
// 与下游服务建立连接。Connect 总是会在输出组件实例化时被首次调用,
// 并会循环调用(带有回退机制)直到返回 nil 错误。
//
// 提供的上下文仅在连接阶段有效,不应用于控制连接本身的生命周期。
//
// 当 Connect 返回 nil 错误后,Write 方法会被调用,直到返回 ErrNotConnected
// 错误或输出组件被关闭。
Connect(context.Context) error

// Write 将消息写入目标,若传递失败则返回相应的错误。
//
// 若该方法返回 ErrNotConnected,则表示输出组件已断开连接,此时 Write
// 方法不会再被调用,直到 Connect 返回 nil 错误。
Write(context.Context, *Message) error

Closer
}

// -----------------------------------------

// BatchOutput 是一个由 Benthos 实现的接口,用于支持批量消息的写入操作。每个调用 WriteBatch 的次数
// 安排应阻塞直到该批次中的所有消息成功或不成功发送,或上下文被取消。
//
// 支持并行调用的最大并发数由输出组件的构造函数提供的 MaxInFlight 参数
// 指示。
type BatchOutput interface {
// 连接方法,如返回 nil 则表示连接成功,否则可能会重复调用并会进行回退
// 尝试,直到成功结束连接操作
Connect(context.Context) error

// WriteBatch 将消息批次写入目标,若传递失败则返回相应的错误。
//
// 若该方法返回 ErrNotConnected,则表示输出组件已断开连接,此时 WriteBatch
// 方法不会再被调用,直到 Connect 返回 nil 错误。
WriteBatch(context.Context, MessageBatch) error

Closer
}

output的一些机制

背压机制 (Back pressure)

输出端的压力将会传递到上游, 例如把水管的出口堵死, 入口就冲不进水, 当output被阻塞时(可能是网络问题或者是什么怪问题), 整个流水线将会停止, 直到output变得正常

重试机制

output无法发送消息时(可以是output本身的问题, 或者上游某个步骤失败走到了这里),错误将传递回input,根据协议,它将被作为 Nack 推送回源(例如 AMQP),或者将无限期地重新尝试提交直到成功(例如 Kafka)。也可以使用retey包装器让其无限重试, 直到成功, broker也可以这么操作, 常见用法如

1
2
3
4
5
6
7
8
9
10
# All config fields, showing default values
output:
label: ""
retry:
max_retries: 0 # 最大重试次数
backoff:
initial_interval: 500ms # 退避时间的初始等待时间
max_interval: 3s # 最大退避时间
max_elapsed_time: 0s # 整个退避操作的整体超时时间, 如果为0则不限制
output: null # No default (required) # output组件

死信队列 (Dead letter queues)

可以使用fallback包装器指定当原输出目标失败时的回退输出, 并按照给定好的顺序依次执行, 例如

1
2
3
4
5
6
7
8
9
output:
fallback: # 表示如果 aws_sqs 失败, 则使用 http_client 输出
- aws_sqs:
url: https://sqs.us-west-2.amazonaws.com/TODO/TODO
max_in_flight: 20

- http_client:
url: http://backup:1234/dlq
verb: POST

多路复用输出 (Multiplexing outputs)

插值多路复用

配置中支持直接使用配置插值, 例如

1
2
3
4
output:
kafka:
addresses: [ TODO:6379 ]
topic: ${! meta("target_topic") } # 使用消息的 meta 中存放的 target_topic 作为 kafka 输出 topic
switch多路复用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
output:
switch:
cases:
- check: this.type == "foo"
output:
amqp_1:
urls: [ amqps://guest:guest@localhost:5672/ ]
target_address: queue:/the_foos

- check: this.type == "bar"
output:
gcp_pubsub:
project: dealing_with_mike
topic: mikes_bars

# 上面都没匹配上, 就会走到这里
- output:
redis_streams:
url: tcp://localhost:6379
stream: everything_else
processors:
- mapping: |
root = this
root.type = this.type.not_null() | "unknown"

output相关组件及包装器参照 https://docs.redpanda.com/redpanda-connect/components/outputs/about/

-------------本文结束感谢您的阅读-------------