Benthos 框架下的限速器降级方案探索 rate_limit
限速器用于限制Benthos中并行组件(或跨实例)之间的共享资源使用, 一般使用resources
配置, 如下
1 2 3 4 5 rate_limit_resources: - label: my_limite local: count: 500 interval: 1s
一些内部组件支持直接在配置中带上rate_limit
配置, 例如http_client
, 其原理是在组件内部通过*service.Resources
的AccessRateLimit
方法直接获取对应资源
1 2 3 4 5 input: http_client: url: TODO verb: GET rate_limit: my_limite
通过这种方式使用速率限制,可以保证输入仅以每秒 500 个请求的速率轮询 HTTP 源, 其内部实现大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 func init () { service.RegisterBatchInput("http_client" , httpClientInputSpec(), newHttpClient) } func httpClientInputSpec () *service.ConfigSpec { return service.NewConfigSpec().service.NewStringField("rate_limit" ) } func newHtpClient (conf *service.ParsedConfig, mgr *service.Resources) (*httpClientInput, error ) { rate_limit, _ := conf.FieldString("rate_time" ) if rate_limit != "" { if !mgr.HasRateLimit(rate_limit) { return nil , fmt.Errorf("rate_limit resources %v not found" , rate_limit) } } } func (h *httpClient) Send(ctx context.Content) ([]byte , error ) { if !h.waitForAccess(ctx) { if ctx.Err != nil { return nil , ctx.Err } return nil , errTimeout } } func (h *httpClient) waitForAccess(ctx) bool { if h.rateLimit == "" { return true } for { var period time.Duration var err error if rerr := h.mgr.AccessRateLimit(ctx, h.rateLimit, func (rl service.RateLimit) { period, err = rl.Access(ctx) }); rerr != nil { err = rerr } if err != nil { h.log.Errorf("Rate limit error: %v\n" , err) period = time.Second } if period > 0 { select { case <-time.After(period): case <-ctx.Done(): return false } } else { return true } } }
如果组件内本身不提供rate_limit
, 或不想在组件内写上这个配置并在初始化的时候使用, 可以通过使用processor
结合限速器, 利用背压机制, 即下游阻塞上游也阻塞, 配置如下
1 2 3 4 5 6 7 input: csv: paths: - ./foo.csv processors: - rate_limit: resource: my_limit
以上所有限速器均为本地实现, 无法跨实例使用, 如果有次需求需要使用Redis
或其他方案做限速器, 例如基于Benthos
的connect
的实现 https://github.com/redpanda-data/connect/blob/main/internal/impl/redis/rate_limit.go , 不过需要注意其限速器只是限制速率, 功能上类似漏桶, 如果需要控制QPS需要另行实现令牌桶版本的限速器
如何动态调整限速器的限速值 一个基本的限速器配置如下(框架原生)
1 2 3 4 5 rate_limit_resources: - label: my_limite local: count: 500 interval: 1s
如果想要动态的修改其中的 count
字段, 可以考虑一下方法
使用配置热重载的方式
启动Benthos
二进制时为框架加上 -w
或 --watcher
, 让其可以在配置文件发生变化的时候自动重载配置, 使用此种方式时, 将会全量读取新的配置文件, 并且等待所有未确认消息处理完毕并确认后, 重新载入配置并重载组件. 但是需要注意, 如果更新配置文件后存在配置错误将会导致Benthos服务不可用, 其将会持续读取配置文件并解析配置错误, 并不会沿用原有配置文件
使用流模式 streams
启动, 通过 HTTP 动态修改配置
配置中设置好api的端口, 或使用默认端口 4195
1 2 3 http: address: 0.0 .0 .0 :4195 debug_endpoints: false
使用流模式启动
1 ./BenthosApp streams ./streams_config/*.yaml
获取所有流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 curl http://localhost:4195/streams curl -X PUT http://localhost:4195/streams/one_stream --data-binary @- <<EOF input: xxx output: xxx EOF curl http://localhost:4195/streams/one_stream curl -X PATCH http://localhost:4195/streams/one_stream \ --data-binary @- <<EOF { "input": { "my_input": { "timeout": "92s" } } } EOF
以上方法, 均是通过修改配置的方式执行, 也就是说如果想要实现降级, 需要配合外部监控平台 , 通过监控pod状态或一些指标, 来从外部对Benthos的限速器进行修改 , 那是否可能存在一种方法可以在运行时在程序内部自行修改呢?
通过查阅Benthos相关文档可以得知, 当组件作为资源被引用时, 具备可重用性, 每种命名资源只会创建一个实例, 可以在多个位置使用, 原文如下 https://docs.redpanda.com/redpanda-connect/configuration/resources/
1 2 3 4 5 Resources are components within Redpanda Connect that are declared with a unique label and can be referenced any number of times within a configuration. Only one instance of each named resource is created, but it is safe to use it in multiple places as they can be shared without consequence. 资源是 Redpanda Connect 中的组件,它们使用唯一标签声明,并且可以在配置中引用任意次数。每个命名资源只会创建一个实例,但可以安全地在多个位置使用,因为它们可以共享而不会产生任何后果。 Some components such as caches and rate limits can only be created as a resource. 某些组件,如缓存和速率限制,只能作为资源创建。
以上说明可以简单的通过编写一个Benthos框架的代码验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 input: broker: inputs: - rdb_input: {} processors: - rate_limit: resource: mu_local_limiter - bloblang: | root.message = this.string() root.source = "input_11111" root.meta.process_time = now() - rdb_input: {} processors: - rate_limit: resource: mu_local_limiter - bloblang: | root.message = this.string() root.source = "input_22222" root.meta.process_time = now() rate_limit_resources: - label: mu_local_limiter mut_rate_limit_local: maxcount: 1 interval: 1s
其中 input
组件为简单的从Redis中RPop
内容, 大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error ) { if ctx.Err() != nil { return nil , nil , ctx.Err() } d, err := r.l.Pop(ctx) if err != nil { return nil , nil , err } msg := service.NewMessage(d) return msg, func (ctx context.Context, err error ) error { return nil }, nil }
限速器组件实现大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func init () { err := service.RegisterRateLimit("mut_rate_limit_local" , limiterConfig(), newMutRateLimitLocal) if err != nil { panic (err) } } func newMutRateLimitLocal (conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error ) { limiter := &my_limit{ log: mgr.Logger(), } limiter.log.Infof("I am Init !!!!!!" ) return limiter, nil }
主程序启动前提前填充redis
1 2 3 4 5 6 7 8 9 func main () { l := lists.GetLists() for i := 0 ; i < 100 ; i++ { _ = l.Push(context.Background(), []byte (strconv.Itoa(i))) } service.RunCLI(context.Background()) }
程序启动后观察日志输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 {"@service" :"benthos" ,"benthos_version" :"v4.48.0" ,"level" :"info" ,"msg" :"Running main config from specified file" ,"path" :"config.yaml" } {"@service" :"benthos" ,"label" :"mu_local_limiter" ,"level" :"info" ,"msg" :"I am Init !!!!!!" ,"path" :"root.rate_limit_resources" } {"@service" :"benthos" ,"level" :"info" ,"msg" :"Listening for HTTP requests at: http://0.0.0.0:4195" } {"@service" :"benthos" ,"label" :"" ,"level" :"info" ,"msg" :"Input type rdb_input is now active" ,"path" :"root.input.broker.inputs.0" } {"@service" :"benthos" ,"label" :"" ,"level" :"info" ,"msg" :"Input type rdb_input is now active" ,"path" :"root.input.broker.inputs.1" } {"@service" :"benthos" ,"label" :"" ,"level" :"info" ,"msg" :"Output type stdout is now active" ,"path" :"root.output" } {"@service" :"benthos" ,"level" :"info" ,"msg" :"Launching a Benthos instance, use CTRL+C to close" } {"message" :"1" ,"meta" :{"process_time" :"2025-04-28T20:04:51.181221041+08:00" },"source" :"input_11111" } {"message" :"3" ,"meta" :{"process_time" :"2025-04-28T20:04:52.181650517+08:00" },"source" :"input_11111" } {"message" :"4" ,"meta" :{"process_time" :"2025-04-28T20:04:53.181898361+08:00" },"source" :"input_11111" } {"message" :"0" ,"meta" :{"process_time" :"2025-04-28T20:04:54.18314283+08:00" },"source" :"input_22222" } {"message" :"5" ,"meta" :{"process_time" :"2025-04-28T20:04:55.183107806+08:00" },"source" :"input_11111" } {"message" :"6" ,"meta" :{"process_time" :"2025-04-28T20:04:56.183515111+08:00" },"source" :"input_11111" } {"message" :"2" ,"meta" :{"process_time" :"2025-04-28T20:04:57.183863343+08:00" },"source" :"input_22222" } {"message" :"7" ,"meta" :{"process_time" :"2025-04-28T20:04:58.184574384+08:00" },"source" :"input_22222" } {"message" :"8" ,"meta" :{"process_time" :"2025-04-28T20:04:59.18518018+08:00" },"source" :"input_11111" }
可以得出以下现象:
根据消息的乱序输出, 且来源都不一致, 说明两个input
在同时工作
在乱序的情况下, 消息保持每秒一条的速率
rate_limit
组件的I am Init !!!!!!
日志仅输出了一次
由此说明rate_limit
组件作为资源加载时, 全局单例 , 根据这个特性, 可以设计出一个速率可变的限速器, 其应该继承框架原本的rate_limit
方法, 同时额外暴露一些修改限速值的方法, 包内直接单例模式, 例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 var gLimit mutRateLimitLocaltype mutRateLimitLocal struct { log *service.Logger mut sync.Mutex curSize int lastRefresh time.Time minSize int benchSize int maxSize int period time.Duration } func (m *mutRateLimitLocal) Access(ctx context.Context) (time.Duration, error ) { } func (m *mutRateLimitLocal) Close(_ context.Context) error { return nil } func Upgrade () { gLimit.mut.Lock() defer gLimit.mut.Unlock() ori := gLimit.benchSize gLimit.benchSize *= 2 if gLimit.benchSize > gLimit.maxSize { gLimit.benchSize = gLimit.maxSize } gLimit.log.Infof("limiter upgrade from %d to %d" , ori, gLimit.benchSize) } func DownGrade () { gLimit.mut.Lock() defer gLimit.mut.Unlock() ori := gLimit.benchSize gLimit.benchSize /= 2 if gLimit.benchSize < gLimit.minSize { gLimit.benchSize = gLimit.minSize } gLimit.log.Infof("limiter downGrade from %d to %d" , ori, gLimit.benchSize) }
现在进行验证, 将input
组件修改为单个组件, 保证数据的顺序性, 并在读取到特定内容时进行升级降级操作, 并观察输出, 代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error ) { if ctx.Err() != nil { return nil , nil , ctx.Err() } d, err := r.l.Pop(ctx) if err != nil { return nil , nil , err } if string (d) == "5" { mut_rate_limit_local.Upgrade() } if string (d) == "10" { mut_rate_limit_local.DownGrade() } msg := service.NewMessage(d) return msg, func (ctx context.Context, err error ) error { return nil }, nil }
input
组件相关配置修改为
1 2 3 4 5 6 7 8 input: rdb_input: processors: - rate_limit: resource: mu_local_limiter - bloblang: | root.message = this.string() root.meta.process_time = now()
由于数据是顺序输出的, 所以一定会先出现5, 后出现10, 此时启动程序观察输出为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 {"message" :"1" ,"meta" :{"process_time" :"2025-04-29T09:45:49.100212661+08:00" }} {"message" :"2" ,"meta" :{"process_time" :"2025-04-29T09:45:49.100586456+08:00" }} {"message" :"3" ,"meta" :{"process_time" :"2025-04-29T09:45:50.096771614+08:00" }} {"message" :"4" ,"meta" :{"process_time" :"2025-04-29T09:45:50.096806286+08:00" }} {"@service" :"benthos" ,"label" :"mu_local_limiter" ,"level" :"info" ,"msg" :"limiter upgrade from 2 to 4" ,"path" :"root.rate_limit_resources" } {"message" :"5" ,"meta" :{"process_time" :"2025-04-29T09:45:51.097508536+08:00" }} {"message" :"6" ,"meta" :{"process_time" :"2025-04-29T09:45:51.097554408+08:00" }} {"message" :"7" ,"meta" :{"process_time" :"2025-04-29T09:45:51.097980122+08:00" }} {"message" :"8" ,"meta" :{"process_time" :"2025-04-29T09:45:51.098075563+08:00" }} {"@service" :"benthos" ,"label" :"mu_local_limiter" ,"level" :"info" ,"msg" :"limiter downGrade from 4 to 2" ,"path" :"root.rate_limit_resources" } {"message" :"9" ,"meta" :{"process_time" :"2025-04-29T09:45:52.098415253+08:00" }} {"message" :"10" ,"meta" :{"process_time" :"2025-04-29T09:45:52.09846423+08:00" }} {"message" :"11" ,"meta" :{"process_time" :"2025-04-29T09:45:53.09906257+08:00" }} {"message" :"12" ,"meta" :{"process_time" :"2025-04-29T09:45:53.099122656+08:00" }} {"message" :"13" ,"meta" :{"process_time" :"2025-04-29T09:45:54.099771808+08:00" }} {"message" :"14" ,"meta" :{"process_time" :"2025-04-29T09:45:54.099812208+08:00" }}
可以观察到当读取到5之后, 限速从2升级为4, 消息速率有每秒两条变为每秒四条, 读取到10的时候降级为2, 此后维持2的限速运行.
背压机制对消息处理速率的影响 在Benthos中, 限速器组件能够限速的本质原理是, 通过一个给定的时间间隔, 以及在此时间间隔内, Input
组件调用Read
或ReadBatch
(如果是batchInput
)的次数, 来限制整体速率, 例如以下限速器配置
1 2 3 4 5 rate_limit_resources: - label: local_limiter local: count: 1 interval: 1s
表示限制每秒调用1次Input
组件的Read
方法或ReadBatch
, 其原理是当调用次数超过一次后, 下次调用将会强制等待到大于给定等待时间间隔为止., 本质上其是依赖Benthos
的背压机制来达到对消息进行限速的目的, 那如何验证这一机制? 可以简单的写一段Benthos配置来对其进行验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 input: rdb_input: max_in_flight: 0 processors: - bloblang: | root.message = this.string() root.meta.process_time = now() pipeline: threads: 1 processors: - sleep: duration: 1s output: my_stdout: {}
以上配置下, Benthos的输出为
1 2 3 4 5 6 7 8 { "message" : "1" , "meta" : { "process_time" : "2025-04-29T14:27:12.227711335+08:00" } } { "message" : "2" , "meta" : { "process_time" : "2025-04-29T14:27:12.22788404+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T14:27:13.22813145+08:00" } } { "message" : "4" , "meta" : { "process_time" : "2025-04-29T14:27:14.228667279+08:00" } } { "message" : "5" , "meta" : { "process_time" : "2025-04-29T14:27:15.229376735+08:00" } } { "message" : "6" , "meta" : { "process_time" : "2025-04-29T14:27:16.229656828+08:00" } } { "message" : "7" , "meta" : { "process_time" : "2025-04-29T14:27:17.230157291+08:00" } } { "message" : "8" , "meta" : { "process_time" : "2025-04-29T14:27:18.230574692+08:00" } }
可以观察到, 从第二条消息开始, 每条消息间隔了一秒钟才被取出来, 那如果将 pipeline.threads
参数变大, 则能代表每秒可以取出来更多消息, 这些消息等待一秒后, 才能取出来下一批消息
修改后输出同理论一致, 第一秒取出了六条数据, 在第六条数据进入processor
阶段时, 前五条数据都在sleep中, 整条链路被阻塞了, 直到下一秒, 前五条消息处理完毕后, 继续处理下五条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "message" : "2" , "meta" : { "process_time" : "2025-04-29T14:30:15.214659776+08:00" } } { "message" : "1" , "meta" : { "process_time" : "2025-04-29T14:30:15.214329751+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T14:30:15.214761512+08:00" } } { "message" : "4" , "meta" : { "process_time" : "2025-04-29T14:30:15.214990298+08:00" } } { "message" : "5" , "meta" : { "process_time" : "2025-04-29T14:30:15.215094723+08:00" } } { "message" : "6" , "meta" : { "process_time" : "2025-04-29T14:30:15.215260697+08:00" } } { "message" : "7" , "meta" : { "process_time" : "2025-04-29T14:30:16.214789636+08:00" } } { "message" : "8" , "meta" : { "process_time" : "2025-04-29T14:30:16.215151017+08:00" } } { "message" : "9" , "meta" : { "process_time" : "2025-04-29T14:30:16.215217328+08:00" } } { "message" : "10" , "meta" : { "process_time" : "2025-04-29T14:30:16.215294076+08:00" } } { "message" : "15" , "meta" : { "process_time" : "2025-04-29T14:30:17.215655737+08:00" } } { "message" : "11" , "meta" : { "process_time" : "2025-04-29T14:30:16.215366317+08:00" } } { "message" : "12" , "meta" : { "process_time" : "2025-04-29T14:30:17.215124804+08:00" } } { "message" : "13" , "meta" : { "process_time" : "2025-04-29T14:30:17.215432139+08:00" } } { "message" : "14" , "meta" : { "process_time" : "2025-04-29T14:30:17.215565551+08:00" } } { "message" : "20" , "meta" : { "process_time" : "2025-04-29T14:30:18.216655332+08:00" } } { "message" : "19" , "meta" : { "process_time" : "2025-04-29T14:30:18.216587464+08:00" } } { "message" : "18" , "meta" : { "process_time" : "2025-04-29T14:30:18.216449435+08:00" } } { "message" : "17" , "meta" : { "process_time" : "2025-04-29T14:30:18.216183862+08:00" } } { "message" : "16" , "meta" : { "process_time" : "2025-04-29T14:30:17.215772375+08:00" } }
那这与Input
组件的max_in_flight
参数有什么关系? 在Input
组件中 max_in_flight
参数表示系统可接受的未确认消息的数量 , 可以简单比喻成一个萝卜一个坑, 每次调用Read
或ReadBatch
都会得到一根萝卜, max_in_flight
表示有多少个坑, 当坑里的萝卜没有被拔出来时(消息未被确认), 无法把新的萝卜放进去, 可以通过以下方式简单验证
保持pipeline.threads
不变, 修改Input
组件的max_in_flight
为1, 表示只接受一个未确认的消息, 配置如下
1 2 3 4 5 6 7 8 9 10 11 12 13 input: rdb_input: max_in_flight: 1 processors: - bloblang: | root.message = this.string() root.meta.process_time = now() pipeline: threads: 5 processors: - sleep: duration: 1s
此时输出将会是每秒钟一条消息, 因为只能接受一条未确认消息, 而每条消息又会等待1秒钟
1 2 3 4 5 6 7 { "message" : "1" , "meta" : { "process_time" : "2025-04-29T15:05:40.014691582+08:00" } } { "message" : "2" , "meta" : { "process_time" : "2025-04-29T15:05:41.015494086+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T15:05:42.016571178+08:00" } } { "message" : "4" , "meta" : { "process_time" : "2025-04-29T15:05:43.017364452+08:00" } } { "message" : "5" , "meta" : { "process_time" : "2025-04-29T15:05:44.017900917+08:00" } } { "message" : "6" , "meta" : { "process_time" : "2025-04-29T15:05:45.018384+08:00" } } { "message" : "7" , "meta" : { "process_time" : "2025-04-29T15:05:46.019051633+08:00" } }
同样的, output
组件如果出现阻塞, 也会由于背压机制影响到上游的处理效率, 例如以下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 input: rdb_input: max_in_flight: 1 processors: - bloblang: | root.message = this.string() root.meta.process_time = now() pipeline: threads: 10 processors: - sleep: duration: 1s output: my_stdout_batch: max_in_flight: 10 batching: count: 100 byte_size: 1048576 period: "1s" check: "" processors: []
由于output
组件存在1s
的超时, 所以整个链路将会变为两每秒钟一条消息
1 2 3 { "message" : "1" , "meta" : { "process_time" : "2025-04-29T16:04:43.781875208+08:00" } } { "message" : "2" , "meta" : { "process_time" : "2025-04-29T16:04:45.782621816+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T16:04:47.783167403+08:00" } }
在输入组件中,max_in_flight
参数:
设置一个限制,控制在任何给定时间内可以在Benthos流中流动的待确认消息数量
默认值为0
,表示没有限制
一旦消息被确认(acknowledged)或拒绝(nacked),它就不再被视为待处理
如果输入产生逻辑批次,则每个批次被视为对最大值的单次计数, 即限制的是Read
和ReadBatch
的调用次数而并非具体几条消息
警告:如果此字段限制的消息数量低于输出级别的批处理阈值,则输出级别的批处理策略将会停滞
输出(Output)组件中的max_in_flight
在输出组件中,max_in_flight
参数:
控制在给定时间内可以有多少消息或消息批次同时处理
增加此值可以提高吞吐量
利用背压机制制作限速器 在上面的探索中, 已经大致搞清了Benthos框架原生的限速器的工作原理, 即利用框架的背压机制, 在下游制造压力, 传导到上游进而限制上游消息的读取速率, 根据这一原理, 可以实现这样一个processor
组件:
不处理消息, 将消息原样返回
记录消息个数或Input
组件调用次数
如果大于指定速率, 就sleep
, 将压力传导至上游Input
组件
监测一些指标, 当达到某些条件时降低限速, 反之提高限速
而这个组件可以放在
紧跟着Input
组件之后, 可以通过消息数, Read
或ReadBatch
的调用数, 系统的CPU占用/内存占用等不依赖具体消息内容的指标来进行降级升级
放在Processor
组件的末端或中端, 可以通过消息设置的meta
信息中处理耗时, 失败次数等与消息内容相关的指标进行升降级
同时放在Input
和Processor
中, 对多个指标进行监测
而为了保证限速器的可重用性, 最好是将其放在resources
中, 通过 label
在需要使用的地方引用, 同时通过这种方式也可以保证限速器全局唯一, 如果需要多个限速器, 则创建多个label
即可, 限速器应该使用Processor
实现而非batchProcessor
来确保其对消息限速的粒度为单条消息
Benthos框架在batch组件和非batch组件会自动转换, 例如当Input
是batch, Processor
为非batch的时候, 此时Input
每次都会读取一批消息, 一条一条的交给Processor
, 直到所有消息都处理完毕, Input
才会读取下一批消息, 所以使用非batch的Processor
可以更好的控制单条消息速率
Example: 利用背压机制结合对消息或CPU的监控制作的动态限速器 利用背压机制, 结合 golang.org/x/time/rate
制作一个基于令牌桶的限速器, 通过监测过去一段时间内的消息中特定错误出现频率或CPU使用率动态调节限速值
选择golang.org/x/time/rate
有一个比较重要的原因是其支持运行中修改限速值
结构定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import ( "benthos_test/lib/consts/errorx" "context" "github.com/pkg/errors" "github.com/redpanda-data/benthos/v4/public/service" "golang.org/x/time/rate" "math" "sync" "sync/atomic" "time" ) type ( limitProcess struct { log *service.Logger limit *rate.Limiter minEach rate.Limit maxEach rate.Limit wg sync.WaitGroup shutdownChan chan struct {} cpuC *cpuChecker msgC *msgChecker } cpuChecker struct { checkInterval time.Duration cpuMax float64 } msgChecker struct { checkInterval time.Duration failureMax float64 sucCnt atomic.Int32 falCnt atomic.Int32 } )
在Benthos中注册时使用如下配置配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 limit_process: minEach: 1 each: 2 maxEach: 10 capacity: 1 msgChecker: checkInterval: 5s failureMax: 0.1 specialErr: "xxx" func limitProcessConf() *service.ConfigSpec { res := service.NewConfigSpec() res.Stable().Summary("limit processor") res.Fields( service.NewFloatField("each") , service.NewFloatField("minEach").Optional() , service.NewFloatField("maxEach").Optional() , service.NewIntField("capacity") , service.NewObjectField("cpuChecker" , service.NewDurationField("checkInterval") , service.NewFloatField("cpuMax") , ).Optional() , service.NewObjectField("msgChecker" , service.NewDurationField("checkInterval") , service.NewFloatField("failureMax") , ).Optional() , ) return res }
构造Processor
1 2 3 4 5 6 7 8 9 10 func newLimitProcess (conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error ) { res := &limitProcess{ } res.checkerStart() return res, nil }
在处理消息时, 记录消息错误, 并从令牌桶中请求令牌
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (l *limitProcess) Process(ctx context.Context, message *service.Message) (service.MessageBatch, error ) { l.recordMsg(message.GetError()) err := l.limit.Wait(ctx) if err != nil { l.log.Errorf("limit process wait error, msg %v" , err) } return service.MessageBatch{message}, nil } func (l *limitProcess) recordMsg(err error ) { if l.msgC == nil { return } if err != nil && errors.Is(err, errorx.ErrGptLimit) { l.msgC.falCnt.Add(1 ) } else { l.msgC.sucCnt.Add(1 ) } }
在启动的监测器中, 定时监测状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (l *limitProcess) cpuCheckerLoop() { defer l.wg.Done() } func (l *limitProcess) msgCheckerLoop() { defer l.wg.Done() tc := time.NewTicker(l.msgC.checkInterval) defer tc.Stop() for { select { case <-l.shutdownChan: return case <-tc.C: sucCnt := l.msgC.sucCnt.Swap(0 ) falCnt := l.msgC.falCnt.Swap(0 ) allCnt := sucCnt + falCnt if allCnt == 0 { continue } failRate := float64 (falCnt) / float64 (allCnt) if failRate > l.msgC.failureMax { l.downgrade() } else { l.upgrade() } } } }
最后在Benthos中使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 input: rdb_input: max_in_flight: 1 processors: - bloblang: | root.message = this.string() root.meta.get_time = now() pipeline: threads: 2 processors: - resource: error_process_re - resource: limit_process_re - bloblang: | root = this root.meta.processed = now() output: my_stdout: max_in_flight: 100 processor_resources: - label: limit_process_re limit_process: minEach: 1 each: 2 maxEach: 10 capacity: 1 msgChecker: checkInterval: 5s failureMax: 0.1 - label: error_process_re error_process: {}
最终达成的效果是, 在前期会进行降级, 后期升级, 直到到达最大限制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ... {"@service" :"benthos" ,"label" :"" ,"level" :"error" ,"msg" :"Failed to send message to my_stdout: gpt returns code 429" ,"path" :"root.output" } {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor downgrade, each from 2.000 to 1.000, minEach 1.000" ,"path" :"root.processor_resources" } ... {"@service" :"benthos" ,"label" :"" ,"level" :"error" ,"msg" :"Failed to send message to my_stdout: gpt returns code 429" ,"path" :"root.output" } ... {"message" :"24" ,"meta" :{"get_time" :"2025-04-30T15:28:21.266977435+08:00" ,"processed" :"2025-04-30T15:28:21.267019259+08:00" }} {"message" :"25" ,"meta" :{"get_time" :"2025-04-30T15:28:22.266920475+08:00" ,"processed" :"2025-04-30T15:28:22.266979673+08:00" }} {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor upgrade, each from 1.000 to 2.000, maxEach 10.000" ,"path" :"root.processor_resources" } ... {"message" :"55" ,"meta" :{"get_time" :"2025-04-30T15:28:33.766272232+08:00" ,"processed" :"2025-04-30T15:28:33.766325021+08:00" }} {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor upgrade, each from 4.000 to 8.000, maxEach 10.000" ,"path" :"root.processor_resources" } {"message" :"56" ,"meta" :{"get_time" :"2025-04-30T15:28:34.016153723+08:00" ,"processed" :"2025-04-30T15:28:34.016195951+08:00" }} {"message" :"57" ,"meta" :{"get_time" :"2025-04-30T15:28:34.265191395+08:00" ,"processed" :"2025-04-30T15:28:34.265236555+08:00" }} ... {"message" :"95" ,"meta" :{"get_time" :"2025-04-30T15:28:39.015952884+08:00" ,"processed" :"2025-04-30T15:28:39.016003045+08:00" }} {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor upgrade, each from 8.000 to 10.000, maxEach 10.000" ,"path" :"root.processor_resources" } {"message" :"96" ,"meta" :{"get_time" :"2025-04-30T15:28:39.141389556+08:00" ,"processed" :"2025-04-30T15:28:39.141453408+08:00" }}
小结 利用Benthos本身背压机制所制作的限速器, 较为灵活, 适用性强, 可以放在处理链路中的任意环节, 通过对其上游施加压力达到限速效果, 但是需要注意以下几点
限速逻辑需要可以支持运行时调整限速值
明确模块的升级/降级触发条件, 可以使资源的占用, 特定的错误, 处理的耗时等
如果需要分布式下的多副本共同限速, 需要使用支持分布式的限速器库或代码
过程中的一些代码: benthos_test.zip