0%

Benthos 框架下的限速器降级方案探索

Benthos 框架下的限速器降级方案探索

rate_limit限速器

用于限制Benthos中并行组件(或跨实例)之间的共享资源使用, 一般使用resources配置, 如下

1
2
3
4
5
rate_limit_resources:  # 固有字段, 表示限速器的资源
- label: my_limite # 限速器资源标签
local: # 本地限速器, 只能在单个实例内部生效
count: 500 # 表示每秒500次处理
interval: 1s

一些内部组件支持直接在配置中带上rate_limit配置, 例如http_client, 其原理是在组件内部通过*service.ResourcesAccessRateLimit方法直接获取对应资源

1
2
3
4
5
input:
http_client:
url: TODO
verb: GET
rate_limit: my_limite

通过这种方式使用速率限制,可以保证输入仅以每秒 500 个请求的速率轮询 HTTP 源, 其内部实现大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func init() {
service.RegisterBatchInput("http_client", httpClientInputSpec(), newHttpClient)
}

func httpClientInputSpec() *service.ConfigSpec {
// 给出一个接受 rate_limit 配置的字段
return service.NewConfigSpec().service.NewStringField("rate_limit")
}

func newHtpClient(conf *service.ParsedConfig, mgr *service.Resources) (*httpClientInput, error) {
// 拿到限速器的标签
rate_limit, _ := conf.FieldString("rate_time")

// 检查是否在 rate_limit_resources 中注册对应标签的限速器
if rate_limit != "" {
if !mgr.HasRateLimit(rate_limit) {
return nil, fmt.Errorf("rate_limit resources %v not found", rate_limit)
}
}
}

// 假设这里是发送HTTP请求的方法
func (h *httpClient) Send(ctx context.Content) ([]byte, error) {
// ...
if !h.waitForAccess(ctx) {
if ctx.Err != nil {
return nil, ctx.Err
}
return nil, errTimeout
}
// ...
}

// 真正调用限速器
func (h *httpClient) waitForAccess(ctx) bool {
if h.rateLimit == "" {
return true
}
for {
var period time.Duration
var err error
// 获取限速器实例
if rerr := h.mgr.AccessRateLimit(ctx, h.rateLimit, func(rl service.RateLimit) {
period, err = rl.Access(ctx)
}); rerr != nil {
err = rerr
}
if err != nil {
h.log.Errorf("Rate limit error: %v\n", err)
period = time.Second
}

if period > 0 { // 等待一定时间
select {
case <-time.After(period):
case <-ctx.Done():
return false
}
} else {
return true
}
}
}

如果组件内本身不提供rate_limit, 或不想在组件内写上这个配置并在初始化的时候使用, 可以通过使用processor结合限速器, 利用背压机制, 即下游阻塞上游也阻塞, 配置如下

1
2
3
4
5
6
7
input:
csv:
paths:
- ./foo.csv
processors: # 在Input组件csv之后紧接着一个 rate_limit 处理器, 利用背压机制间接限速 input
- rate_limit:
resource: my_limit

以上所有限速器均为本地实现, 无法跨实例使用, 如果有次需求需要使用Redis或其他方案做限速器, 例如基于Benthosconnect的实现 https://github.com/redpanda-data/connect/blob/main/internal/impl/redis/rate_limit.go, 不过需要注意其限速器只是限制速率, 功能上类似漏桶, 如果需要控制QPS需要另行实现令牌桶版本的限速器

如何动态调整限速器的限速值

一个基本的限速器配置如下(框架原生)

1
2
3
4
5
rate_limit_resources:  # 固有字段, 表示限速器的资源
- label: my_limite # 限速器资源标签
local: # 本地限速器, 只能在单个实例内部生效
count: 500 # 一下表示每秒500次处理
interval: 1s

如果想要动态的修改其中的 count字段, 可以考虑一下方法

  1. 使用配置热重载的方式

    启动Benthos二进制时为框架加上 -w--watcher, 让其可以在配置文件发生变化的时候自动重载配置, 使用此种方式时, 将会全量读取新的配置文件, 并且等待所有未确认消息处理完毕并确认后, 重新载入配置并重载组件. 但是需要注意, 如果更新配置文件后存在配置错误将会导致Benthos服务不可用, 其将会持续读取配置文件并解析配置错误, 并不会沿用原有配置文件

  2. 使用流模式 streams 启动, 通过 HTTP 动态修改配置

    配置中设置好api的端口, 或使用默认端口 4195

    1
    2
    3
    http: # 为benthos开启一个HTTP服务, 包含一些基础的请求访问, 例如 /ping /ready /version三个api, 可以通过访问/endpoints获取所有api
    address: 0.0.0.0:4195
    debug_endpoints: false # 是否开启debug模式

    使用流模式启动

    1
    ./BenthosApp streams ./streams_config/*.yaml

    获取所有流

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    curl http://localhost:4195/streams # 获取所有流, 一个配置文件为一个流

    # 更新指定流的配置, 需要传入完整的配置
    curl -X PUT http://localhost:4195/streams/one_stream --data-binary @- <<EOF
    input:
    xxx
    output:
    xxx
    EOF

    # 查看流的配置
    curl http://localhost:4195/streams/one_stream
    # 修正指定流的配置
    curl -X PATCH http://localhost:4195/streams/one_stream \
    --data-binary @- <<EOF
    {
    "input": {
    "my_input": {
    "timeout": "92s"
    }
    }
    }
    EOF

以上方法, 均是通过修改配置的方式执行, 也就是说如果想要实现降级, 需要配合外部监控平台, 通过监控pod状态或一些指标, 来从外部对Benthos的限速器进行修改, 那是否可能存在一种方法可以在运行时在程序内部自行修改呢?

通过查阅Benthos相关文档可以得知, 当组件作为资源被引用时, 具备可重用性, 每种命名资源只会创建一个实例, 可以在多个位置使用, 原文如下 https://docs.redpanda.com/redpanda-connect/configuration/resources/

1
2
3
4
5
Resources are components within Redpanda Connect that are declared with a unique label and can be referenced any number of times within a configuration. Only one instance of each named resource is created, but it is safe to use it in multiple places as they can be shared without consequence.
资源是 Redpanda Connect 中的组件,它们使用唯一标签声明,并且可以在配置中引用任意次数。每个命名资源只会创建一个实例,但可以安全地在多个位置使用,因为它们可以共享而不会产生任何后果。

Some components such as caches and rate limits can only be created as a resource.
某些组件,如缓存和速率限制,只能作为资源创建。

以上说明可以简单的通过编写一个Benthos框架的代码验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
input:
broker:
inputs: # 使用 broker 包裹两个完全一致的输入, 使用同一个标签的限速器
- rdb_input: {}
processors:
- rate_limit:
resource: mu_local_limiter
- bloblang: | # 给每条消息加上来源和当前时间
root.message = this.string()
root.source = "input_11111"
root.meta.process_time = now()
- rdb_input: {}
processors:
- rate_limit:
resource: mu_local_limiter
- bloblang: |
root.message = this.string()
root.source = "input_22222"
root.meta.process_time = now()

# 自定义了一个 mu_local_limiter, 实现完全与框架原生的一致, 但是在初始化阶段打印了一个日志
rate_limit_resources:
- label: mu_local_limiter
mut_rate_limit_local: # 限速为每秒一条
maxcount: 1
interval: 1s

其中 input 组件为简单的从Redis中RPop内容, 大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}

// 这里直接没有考虑Redis为空的情况, 在程序启动前给Redis提前填充了数据
d, err := r.l.Pop(ctx)
if err != nil {
return nil, nil, err
}

msg := service.NewMessage(d)
return msg, func(ctx context.Context, err error) error {
return nil
}, nil
}

限速器组件实现大致如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func init() {
err := service.RegisterRateLimit("mut_rate_limit_local",
limiterConfig(), newMutRateLimitLocal)
if err != nil {
panic(err)
}
}

func newMutRateLimitLocal(conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error) {
// ...
limiter := &my_limit{
log: mgr.Logger(),
}
// 这里输出一下日志
limiter.log.Infof("I am Init !!!!!!")
return limiter, nil
}

主程序启动前提前填充redis

1
2
3
4
5
6
7
8
9
func main() {
l := lists.GetLists()
// 按顺序填充 1-100
for i := 0; i < 100; i++ {
_ = l.Push(context.Background(), []byte(strconv.Itoa(i)))
}

service.RunCLI(context.Background())
}

程序启动后观察日志输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{"@service":"benthos","benthos_version":"v4.48.0","level":"info","msg":"Running main config from specified file","path":"config.yaml"}
{"@service":"benthos","label":"mu_local_limiter","level":"info","msg":"I am Init !!!!!!","path":"root.rate_limit_resources"}
{"@service":"benthos","level":"info","msg":"Listening for HTTP requests at: http://0.0.0.0:4195"}
{"@service":"benthos","label":"","level":"info","msg":"Input type rdb_input is now active","path":"root.input.broker.inputs.0"}
{"@service":"benthos","label":"","level":"info","msg":"Input type rdb_input is now active","path":"root.input.broker.inputs.1"}
{"@service":"benthos","label":"","level":"info","msg":"Output type stdout is now active","path":"root.output"}
{"@service":"benthos","level":"info","msg":"Launching a Benthos instance, use CTRL+C to close"}
{"message":"1","meta":{"process_time":"2025-04-28T20:04:51.181221041+08:00"},"source":"input_11111"}
{"message":"3","meta":{"process_time":"2025-04-28T20:04:52.181650517+08:00"},"source":"input_11111"}
{"message":"4","meta":{"process_time":"2025-04-28T20:04:53.181898361+08:00"},"source":"input_11111"}
{"message":"0","meta":{"process_time":"2025-04-28T20:04:54.18314283+08:00"},"source":"input_22222"}
{"message":"5","meta":{"process_time":"2025-04-28T20:04:55.183107806+08:00"},"source":"input_11111"}
{"message":"6","meta":{"process_time":"2025-04-28T20:04:56.183515111+08:00"},"source":"input_11111"}
{"message":"2","meta":{"process_time":"2025-04-28T20:04:57.183863343+08:00"},"source":"input_22222"}
{"message":"7","meta":{"process_time":"2025-04-28T20:04:58.184574384+08:00"},"source":"input_22222"}
{"message":"8","meta":{"process_time":"2025-04-28T20:04:59.18518018+08:00"},"source":"input_11111"}

可以得出以下现象:

  1. 根据消息的乱序输出, 且来源都不一致, 说明两个input在同时工作
  2. 在乱序的情况下, 消息保持每秒一条的速率
  3. rate_limit组件的I am Init !!!!!! 日志仅输出了一次

    由此说明rate_limit组件作为资源加载时, 全局单例, 根据这个特性, 可以设计出一个速率可变的限速器, 其应该继承框架原本的rate_limit方法, 同时额外暴露一些修改限速值的方法, 包内直接单例模式, 例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
var gLimit mutRateLimitLocal

type mutRateLimitLocal struct {
log *service.Logger
mut sync.Mutex
curSize int
lastRefresh time.Time

minSize int
benchSize int
maxSize int
period time.Duration
}

// 继承自 Benthos 的 RateLimit
func (m *mutRateLimitLocal) Access(ctx context.Context) (time.Duration, error) {
// ...
}

// 继承自 Benthos 的 RateLimit
func (m *mutRateLimitLocal) Close(_ context.Context) error {
return nil
}

// 增加限速值
func Upgrade() {
gLimit.mut.Lock()
defer gLimit.mut.Unlock()

ori := gLimit.benchSize
gLimit.benchSize *= 2
if gLimit.benchSize > gLimit.maxSize {
gLimit.benchSize = gLimit.maxSize
}

gLimit.log.Infof("limiter upgrade from %d to %d", ori, gLimit.benchSize)
}

// 降低限速值
func DownGrade() {
gLimit.mut.Lock()
defer gLimit.mut.Unlock()

ori := gLimit.benchSize
gLimit.benchSize /= 2
if gLimit.benchSize < gLimit.minSize {
gLimit.benchSize = gLimit.minSize
}
gLimit.log.Infof("limiter downGrade from %d to %d", ori, gLimit.benchSize)
}

现在进行验证, 将input组件修改为单个组件, 保证数据的顺序性, 并在读取到特定内容时进行升级降级操作, 并观察输出, 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}

d, err := r.l.Pop(ctx)
if err != nil {
return nil, nil, err
}

if string(d) == "5" {
mut_rate_limit_local.Upgrade()
}

if string(d) == "10" {
mut_rate_limit_local.DownGrade()
}

msg := service.NewMessage(d)
return msg, func(ctx context.Context, err error) error {
return nil
}, nil
}

input组件相关配置修改为

1
2
3
4
5
6
7
8
input:
rdb_input:
processors:
- rate_limit:
resource: mu_local_limiter
- bloblang: |
root.message = this.string()
root.meta.process_time = now()

由于数据是顺序输出的, 所以一定会先出现5, 后出现10, 此时启动程序观察输出为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{"message":"1","meta":{"process_time":"2025-04-29T09:45:49.100212661+08:00"}}
{"message":"2","meta":{"process_time":"2025-04-29T09:45:49.100586456+08:00"}}
{"message":"3","meta":{"process_time":"2025-04-29T09:45:50.096771614+08:00"}}
{"message":"4","meta":{"process_time":"2025-04-29T09:45:50.096806286+08:00"}}
{"@service":"benthos","label":"mu_local_limiter","level":"info","msg":"limiter upgrade from 2 to 4","path":"root.rate_limit_resources"}
{"message":"5","meta":{"process_time":"2025-04-29T09:45:51.097508536+08:00"}}
{"message":"6","meta":{"process_time":"2025-04-29T09:45:51.097554408+08:00"}}
{"message":"7","meta":{"process_time":"2025-04-29T09:45:51.097980122+08:00"}}
{"message":"8","meta":{"process_time":"2025-04-29T09:45:51.098075563+08:00"}}
{"@service":"benthos","label":"mu_local_limiter","level":"info","msg":"limiter downGrade from 4 to 2","path":"root.rate_limit_resources"}
{"message":"9","meta":{"process_time":"2025-04-29T09:45:52.098415253+08:00"}}
{"message":"10","meta":{"process_time":"2025-04-29T09:45:52.09846423+08:00"}}
{"message":"11","meta":{"process_time":"2025-04-29T09:45:53.09906257+08:00"}}
{"message":"12","meta":{"process_time":"2025-04-29T09:45:53.099122656+08:00"}}
{"message":"13","meta":{"process_time":"2025-04-29T09:45:54.099771808+08:00"}}
{"message":"14","meta":{"process_time":"2025-04-29T09:45:54.099812208+08:00"}}

可以观察到当读取到5之后, 限速从2升级为4, 消息速率有每秒两条变为每秒四条, 读取到10的时候降级为2, 此后维持2的限速运行.

背压机制对消息处理速率的影响

在Benthos中, 限速器组件能够限速的本质原理是, 通过一个给定的时间间隔, 以及在此时间间隔内, Input组件调用ReadReadBatch(如果是batchInput)的次数, 来限制整体速率, 例如以下限速器配置

1
2
3
4
5
rate_limit_resources:
- label: local_limiter
local:
count: 1
interval: 1s

表示限制每秒调用1次Input组件的Read方法或ReadBatch, 其原理是当调用次数超过一次后, 下次调用将会强制等待到大于给定等待时间间隔为止., 本质上其是依赖Benthos的背压机制来达到对消息进行限速的目的, 那如何验证这一机制? 可以简单的写一段Benthos配置来对其进行验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input:
rdb_input:
max_in_flight: 0 # 表示不限制待确认消息的数量
processors:
- bloblang: |
root.message = this.string()
root.meta.process_time = now()

pipeline:
threads: 1 # 保持每次只处理一个数据
processors:
- sleep: # 任何类型消息在这里将会 sleep 一秒钟
duration: 1s

output:
my_stdout: {}

以上配置下, Benthos的输出为

1
2
3
4
5
6
7
8
{"message":"1","meta":{"process_time":"2025-04-29T14:27:12.227711335+08:00"}}
{"message":"2","meta":{"process_time":"2025-04-29T14:27:12.22788404+08:00"}}
{"message":"3","meta":{"process_time":"2025-04-29T14:27:13.22813145+08:00"}}
{"message":"4","meta":{"process_time":"2025-04-29T14:27:14.228667279+08:00"}}
{"message":"5","meta":{"process_time":"2025-04-29T14:27:15.229376735+08:00"}}
{"message":"6","meta":{"process_time":"2025-04-29T14:27:16.229656828+08:00"}}
{"message":"7","meta":{"process_time":"2025-04-29T14:27:17.230157291+08:00"}}
{"message":"8","meta":{"process_time":"2025-04-29T14:27:18.230574692+08:00"}}

可以观察到, 从第二条消息开始, 每条消息间隔了一秒钟才被取出来, 那如果将 pipeline.threads 参数变大, 则能代表每秒可以取出来更多消息, 这些消息等待一秒后, 才能取出来下一批消息

1
2
pipeline:
threads: 5 # 每次同時处理5个数据

修改后输出同理论一致, 第一秒取出了六条数据, 在第六条数据进入processor阶段时, 前五条数据都在sleep中, 整条链路被阻塞了, 直到下一秒, 前五条消息处理完毕后, 继续处理下五条

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{"message":"2","meta":{"process_time":"2025-04-29T14:30:15.214659776+08:00"}}
{"message":"1","meta":{"process_time":"2025-04-29T14:30:15.214329751+08:00"}}
{"message":"3","meta":{"process_time":"2025-04-29T14:30:15.214761512+08:00"}}
{"message":"4","meta":{"process_time":"2025-04-29T14:30:15.214990298+08:00"}}
{"message":"5","meta":{"process_time":"2025-04-29T14:30:15.215094723+08:00"}}
{"message":"6","meta":{"process_time":"2025-04-29T14:30:15.215260697+08:00"}}
{"message":"7","meta":{"process_time":"2025-04-29T14:30:16.214789636+08:00"}}
{"message":"8","meta":{"process_time":"2025-04-29T14:30:16.215151017+08:00"}}
{"message":"9","meta":{"process_time":"2025-04-29T14:30:16.215217328+08:00"}}
{"message":"10","meta":{"process_time":"2025-04-29T14:30:16.215294076+08:00"}}
{"message":"15","meta":{"process_time":"2025-04-29T14:30:17.215655737+08:00"}}
{"message":"11","meta":{"process_time":"2025-04-29T14:30:16.215366317+08:00"}}
{"message":"12","meta":{"process_time":"2025-04-29T14:30:17.215124804+08:00"}}
{"message":"13","meta":{"process_time":"2025-04-29T14:30:17.215432139+08:00"}}
{"message":"14","meta":{"process_time":"2025-04-29T14:30:17.215565551+08:00"}}
{"message":"20","meta":{"process_time":"2025-04-29T14:30:18.216655332+08:00"}}
{"message":"19","meta":{"process_time":"2025-04-29T14:30:18.216587464+08:00"}}
{"message":"18","meta":{"process_time":"2025-04-29T14:30:18.216449435+08:00"}}
{"message":"17","meta":{"process_time":"2025-04-29T14:30:18.216183862+08:00"}}
{"message":"16","meta":{"process_time":"2025-04-29T14:30:17.215772375+08:00"}}

那这与Input组件的max_in_flight参数有什么关系? Input组件中 max_in_flight 参数表示系统可接受的未确认消息的数量, 可以简单比喻成一个萝卜一个坑, 每次调用ReadReadBatch都会得到一根萝卜, max_in_flight 表示有多少个坑, 当坑里的萝卜没有被拔出来时(消息未被确认), 无法把新的萝卜放进去, 可以通过以下方式简单验证

保持pipeline.threads不变, 修改Input组件的max_in_flight为1, 表示只接受一个未确认的消息, 配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
input:
rdb_input:
max_in_flight: 1
processors:
- bloblang: |
root.message = this.string()
root.meta.process_time = now()

pipeline:
threads: 5
processors:
- sleep:
duration: 1s

此时输出将会是每秒钟一条消息, 因为只能接受一条未确认消息, 而每条消息又会等待1秒钟

1
2
3
4
5
6
7
{"message":"1","meta":{"process_time":"2025-04-29T15:05:40.014691582+08:00"}}
{"message":"2","meta":{"process_time":"2025-04-29T15:05:41.015494086+08:00"}}
{"message":"3","meta":{"process_time":"2025-04-29T15:05:42.016571178+08:00"}}
{"message":"4","meta":{"process_time":"2025-04-29T15:05:43.017364452+08:00"}}
{"message":"5","meta":{"process_time":"2025-04-29T15:05:44.017900917+08:00"}}
{"message":"6","meta":{"process_time":"2025-04-29T15:05:45.018384+08:00"}}
{"message":"7","meta":{"process_time":"2025-04-29T15:05:46.019051633+08:00"}}

同样的, output组件如果出现阻塞, 也会由于背压机制影响到上游的处理效率, 例如以下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
input:
rdb_input:
max_in_flight: 1
processors:
- bloblang: |
root.message = this.string()
root.meta.process_time = now()

pipeline:
threads: 10
processors:
- sleep:
duration: 1s
output:
my_stdout_batch:
max_in_flight: 10
batching:
count: 100 # 当收集到100条消息时处理批次
byte_size: 1048576 # 或当批次大小达到1MB时处理
period: "1s" # 或每秒至少处理一次批次
check: "" # 可选的条件检查
processors: [] # 可选的批次处理器

由于output组件存在1s的超时, 所以整个链路将会变为两每秒钟一条消息

1
2
3
{"message":"1","meta":{"process_time":"2025-04-29T16:04:43.781875208+08:00"}}
{"message":"2","meta":{"process_time":"2025-04-29T16:04:45.782621816+08:00"}}
{"message":"3","meta":{"process_time":"2025-04-29T16:04:47.783167403+08:00"}}

max_in_flight参数在 Input组件和output组件中的差异

输入(Input)组件中的max_in_flight

在输入组件中,max_in_flight参数:

  • 设置一个限制,控制在任何给定时间内可以在Benthos流中流动的待确认消息数量
  • 默认值为0,表示没有限制
  • 一旦消息被确认(acknowledged)或拒绝(nacked),它就不再被视为待处理
  • 如果输入产生逻辑批次,则每个批次被视为对最大值的单次计数, 即限制的是ReadReadBatch的调用次数而并非具体几条消息
  • 警告:如果此字段限制的消息数量低于输出级别的批处理阈值,则输出级别的批处理策略将会停滞
输出(Output)组件中的max_in_flight

在输出组件中,max_in_flight参数:

  • 控制在给定时间内可以有多少消息或消息批次同时处理
  • 增加此值可以提高吞吐量

利用背压机制制作限速器

在上面的探索中, 已经大致搞清了Benthos框架原生的限速器的工作原理, 即利用框架的背压机制, 在下游制造压力, 传导到上游进而限制上游消息的读取速率, 根据这一原理, 可以实现这样一个processor组件:

  1. 不处理消息, 将消息原样返回
  2. 记录消息个数或Input组件调用次数
  3. 如果大于指定速率, 就sleep, 将压力传导至上游Input组件
  4. 监测一些指标, 当达到某些条件时降低限速, 反之提高限速

而这个组件可以放在

  1. 紧跟着Input组件之后, 可以通过消息数, ReadReadBatch的调用数, 系统的CPU占用/内存占用等不依赖具体消息内容的指标来进行降级升级
  2. 放在Processor组件的末端或中端, 可以通过消息设置的meta信息中处理耗时, 失败次数等与消息内容相关的指标进行升降级
  3. 同时放在InputProcessor中, 对多个指标进行监测

而为了保证限速器的可重用性, 最好是将其放在resources中, 通过 label在需要使用的地方引用, 同时通过这种方式也可以保证限速器全局唯一, 如果需要多个限速器, 则创建多个label即可, 限速器应该使用Processor实现而非batchProcessor来确保其对消息限速的粒度为单条消息

Benthos框架在batch组件和非batch组件会自动转换, 例如当Input是batch, Processor为非batch的时候, 此时Input每次都会读取一批消息, 一条一条的交给Processor, 直到所有消息都处理完毕, Input才会读取下一批消息, 所以使用非batch的Processor可以更好的控制单条消息速率

Example: 利用背压机制结合对消息或CPU的监控制作的动态限速器

利用背压机制, 结合 golang.org/x/time/rate 制作一个基于令牌桶的限速器, 通过监测过去一段时间内的消息中特定错误出现频率或CPU使用率动态调节限速值

选择golang.org/x/time/rate有一个比较重要的原因是其支持运行中修改限速值

结构定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import (
"benthos_test/lib/consts/errorx"
"context"
"github.com/pkg/errors"
"github.com/redpanda-data/benthos/v4/public/service"
"golang.org/x/time/rate"
"math"
"sync"
"sync/atomic"
"time"
)

type (
limitProcess struct {
log *service.Logger
limit *rate.Limiter
minEach rate.Limit
maxEach rate.Limit

wg sync.WaitGroup
shutdownChan chan struct{}
cpuC *cpuChecker
msgC *msgChecker
}

cpuChecker struct {
checkInterval time.Duration
cpuMax float64
}

msgChecker struct {
checkInterval time.Duration
failureMax float64
sucCnt atomic.Int32
falCnt atomic.Int32
}
)

在Benthos中注册时使用如下配置配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
limit_process:
minEach: 1 # 最低速率, 每秒钟 1 调小
each: 2 # 初始速率
maxEach: 10 # 最大速率
capacity: 1 # 令牌桶容量, 默认为 1 即可
msgChecker: # 启用消息检查
checkInterval: 5s # 每5s检查一次
failureMax: 0.1 # 允许的消息出现特定错误频率为10%, 超过则需要降级, 反之升级
specialErr: "xxx" # 这里可以在加个这个配置, 可以识别指定错误
func limitProcessConf() *service.ConfigSpec {
res := service.NewConfigSpec()
res.Stable().Summary("limit processor")
res.Fields(
service.NewFloatField("each"),
service.NewFloatField("minEach").Optional(),
service.NewFloatField("maxEach").Optional(),
service.NewIntField("capacity"),
service.NewObjectField("cpuChecker",
service.NewDurationField("checkInterval"),
service.NewFloatField("cpuMax"),
).Optional(),
service.NewObjectField("msgChecker",
service.NewDurationField("checkInterval"),
service.NewFloatField("failureMax"),
).Optional(),
)
return res
}

构造Processor

1
2
3
4
5
6
7
8
9
10
func newLimitProcess(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
// ... 读取配置
res := &limitProcess{
// ... 初始化
}

// 核心机制: 启动监测器
res.checkerStart()
return res, nil
}

在处理消息时, 记录消息错误, 并从令牌桶中请求令牌

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (l *limitProcess) Process(ctx context.Context, message *service.Message) (service.MessageBatch, error) {
// 记录消息中的错误
l.recordMsg(message.GetError())
err := l.limit.Wait(ctx)
if err != nil {
l.log.Errorf("limit process wait error, msg %v", err)
}
return service.MessageBatch{message}, nil
}

func (l *limitProcess) recordMsg(err error) {
if l.msgC == nil {
return
}

// 这里使用了一个特定的错误, 例如 ErrGptLimit, 也可以添加配置在配置中设置, 再通过检查错误来记录
if err != nil && errors.Is(err, errorx.ErrGptLimit) {
l.msgC.falCnt.Add(1)
} else {
l.msgC.sucCnt.Add(1)
}
}

在启动的监测器中, 定时监测状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (l *limitProcess) cpuCheckerLoop() {
defer l.wg.Done()
// 检查CPU使用率的代码...
}

func (l *limitProcess) msgCheckerLoop() {
defer l.wg.Done()
tc := time.NewTicker(l.msgC.checkInterval)
defer tc.Stop()

for {
select {
case <-l.shutdownChan:
return
case <-tc.C:
sucCnt := l.msgC.sucCnt.Swap(0)
falCnt := l.msgC.falCnt.Swap(0)
allCnt := sucCnt + falCnt
if allCnt == 0 {
continue
}

failRate := float64(falCnt) / float64(allCnt)
if failRate > l.msgC.failureMax { // 超过给定频率就降级
l.downgrade()
} else {
l.upgrade() // 反之升级
}
}
}
}

最后在Benthos中使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
input:
rdb_input:
max_in_flight: 1
processors:
- bloblang: |
root.message = this.string()
root.meta.get_time = now()

pipeline:
threads: 2
processors:
- resource: error_process_re # 这个插件经过调整, 会对前20条消息抛出错误 ErrGPTLimit
- resource: limit_process_re # 接近着会产生特定错误的组件
- bloblang: |
root = this
root.meta.processed = now()

output:
my_stdout:
max_in_flight: 100

processor_resources:
- label: limit_process_re
limit_process:
minEach: 1
each: 2
maxEach: 10
capacity: 1
msgChecker:
checkInterval: 5s
failureMax: 0.1
- label: error_process_re
error_process: {}

最终达成的效果是, 在前期会进行降级, 后期升级, 直到到达最大限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
{"@service":"benthos","label":"","level":"error","msg":"Failed to send message to my_stdout: gpt returns code 429","path":"root.output"}
{"@service":"benthos","label":"limit_process_re","level":"info","msg":"limit processor downgrade, each from 2.000 to 1.000, minEach 1.000","path":"root.processor_resources"}
...
{"@service":"benthos","label":"","level":"error","msg":"Failed to send message to my_stdout: gpt returns code 429","path":"root.output"}
...
{"message":"24","meta":{"get_time":"2025-04-30T15:28:21.266977435+08:00","processed":"2025-04-30T15:28:21.267019259+08:00"}}
{"message":"25","meta":{"get_time":"2025-04-30T15:28:22.266920475+08:00","processed":"2025-04-30T15:28:22.266979673+08:00"}}
{"@service":"benthos","label":"limit_process_re","level":"info","msg":"limit processor upgrade, each from 1.000 to 2.000, maxEach 10.000","path":"root.processor_resources"}
...
{"message":"55","meta":{"get_time":"2025-04-30T15:28:33.766272232+08:00","processed":"2025-04-30T15:28:33.766325021+08:00"}}
{"@service":"benthos","label":"limit_process_re","level":"info","msg":"limit processor upgrade, each from 4.000 to 8.000, maxEach 10.000","path":"root.processor_resources"}
{"message":"56","meta":{"get_time":"2025-04-30T15:28:34.016153723+08:00","processed":"2025-04-30T15:28:34.016195951+08:00"}}
{"message":"57","meta":{"get_time":"2025-04-30T15:28:34.265191395+08:00","processed":"2025-04-30T15:28:34.265236555+08:00"}}
...
{"message":"95","meta":{"get_time":"2025-04-30T15:28:39.015952884+08:00","processed":"2025-04-30T15:28:39.016003045+08:00"}}
{"@service":"benthos","label":"limit_process_re","level":"info","msg":"limit processor upgrade, each from 8.000 to 10.000, maxEach 10.000","path":"root.processor_resources"}
{"message":"96","meta":{"get_time":"2025-04-30T15:28:39.141389556+08:00","processed":"2025-04-30T15:28:39.141453408+08:00"}}

小结

利用Benthos本身背压机制所制作的限速器, 较为灵活, 适用性强, 可以放在处理链路中的任意环节, 通过对其上游施加压力达到限速效果, 但是需要注意以下几点

  1. 限速逻辑需要可以支持运行时调整限速值
  2. 明确模块的升级/降级触发条件, 可以使资源的占用, 特定的错误, 处理的耗时等
  3. 如果需要分布式下的多副本共同限速, 需要使用支持分布式的限速器库或代码

过程中的一些代码: benthos_test.zip

-------------本文结束感谢您的阅读-------------