Benthos 框架下的限速器降级方案探索 rate_limit限速器用于限制Benthos中并行组件(或跨实例)之间的共享资源使用, 一般使用resources配置, 如下
1 2 3 4 5 rate_limit_resources: - label: my_limite local: count: 500 interval: 1s
一些内部组件支持直接在配置中带上rate_limit配置, 例如http_client, 其原理是在组件内部通过*service.Resources的AccessRateLimit方法直接获取对应资源
1 2 3 4 5 input: http_client: url: TODO verb: GET rate_limit: my_limite
通过这种方式使用速率限制,可以保证输入仅以每秒 500 个请求的速率轮询 HTTP 源, 其内部实现大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 func init () { service.RegisterBatchInput("http_client" , httpClientInputSpec(), newHttpClient) } func httpClientInputSpec () *service.ConfigSpec { return service.NewConfigSpec().service.NewStringField("rate_limit" ) } func newHtpClient (conf *service.ParsedConfig, mgr *service.Resources) (*httpClientInput, error ) { rate_limit, _ := conf.FieldString("rate_time" ) if rate_limit != "" { if !mgr.HasRateLimit(rate_limit) { return nil , fmt.Errorf("rate_limit resources %v not found" , rate_limit) } } } func (h *httpClient) Send(ctx context.Content) ([]byte , error ) { if !h.waitForAccess(ctx) { if ctx.Err != nil { return nil , ctx.Err } return nil , errTimeout } } func (h *httpClient) waitForAccess(ctx) bool { if h.rateLimit == "" { return true } for { var period time.Duration var err error if rerr := h.mgr.AccessRateLimit(ctx, h.rateLimit, func (rl service.RateLimit) { period, err = rl.Access(ctx) }); rerr != nil { err = rerr } if err != nil { h.log.Errorf("Rate limit error: %v\n" , err) period = time.Second } if period > 0 { select { case <-time.After(period): case <-ctx.Done(): return false } } else { return true } } }
如果组件内本身不提供rate_limit, 或不想在组件内写上这个配置并在初始化的时候使用, 可以通过使用processor结合限速器, 利用背压机制, 即下游阻塞上游也阻塞, 配置如下
1 2 3 4 5 6 7 input: csv: paths: - ./foo.csv processors: - rate_limit: resource: my_limit
以上所有限速器均为本地实现, 无法跨实例使用, 如果有次需求需要使用Redis或其他方案做限速器, 例如基于Benthos的connect的实现 https://github.com/redpanda-data/connect/blob/main/internal/impl/redis/rate_limit.go , 不过需要注意其限速器只是限制速率, 功能上类似漏桶, 如果需要控制QPS需要另行实现令牌桶版本的限速器
如何动态调整限速器的限速值 一个基本的限速器配置如下(框架原生)
1 2 3 4 5 rate_limit_resources: - label: my_limite local: count: 500 interval: 1s
如果想要动态的修改其中的 count字段, 可以考虑一下方法
使用配置热重载的方式
启动Benthos二进制时为框架加上 -w 或 --watcher, 让其可以在配置文件发生变化的时候自动重载配置, 使用此种方式时, 将会全量读取新的配置文件, 并且等待所有未确认消息处理完毕并确认后, 重新载入配置并重载组件. 但是需要注意, 如果更新配置文件后存在配置错误将会导致Benthos服务不可用, 其将会持续读取配置文件并解析配置错误, 并不会沿用原有配置文件
使用流模式 streams 启动, 通过 HTTP 动态修改配置
配置中设置好api的端口, 或使用默认端口 4195
1 2 3 http: address: 0.0 .0 .0 :4195 debug_endpoints: false
使用流模式启动
1 ./BenthosApp streams ./streams_config/*.yaml
获取所有流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 curl http://localhost:4195/streams curl -X PUT http://localhost:4195/streams/one_stream --data-binary @- <<EOF input: xxx output: xxx EOF curl http://localhost:4195/streams/one_stream curl -X PATCH http://localhost:4195/streams/one_stream \ --data-binary @- <<EOF { "input": { "my_input": { "timeout": "92s" } } } EOF
以上方法, 均是通过修改配置的方式执行, 也就是说如果想要实现降级, 需要配合外部监控平台 , 通过监控pod状态或一些指标, 来从外部对Benthos的限速器进行修改 , 那是否可能存在一种方法可以在运行时在程序内部自行修改呢?
通过查阅Benthos相关文档可以得知, 当组件作为资源被引用时, 具备可重用性, 每种命名资源只会创建一个实例, 可以在多个位置使用, 原文如下 https://docs.redpanda.com/redpanda-connect/configuration/resources/
1 2 3 4 5 Resources are components within Redpanda Connect that are declared with a unique label and can be referenced any number of times within a configuration. Only one instance of each named resource is created, but it is safe to use it in multiple places as they can be shared without consequence. 资源是 Redpanda Connect 中的组件,它们使用唯一标签声明,并且可以在配置中引用任意次数。每个命名资源只会创建一个实例,但可以安全地在多个位置使用,因为它们可以共享而不会产生任何后果。 Some components such as caches and rate limits can only be created as a resource. 某些组件,如缓存和速率限制,只能作为资源创建。
以上说明可以简单的通过编写一个Benthos框架的代码验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 input: broker: inputs: - rdb_input: {} processors: - rate_limit: resource: mu_local_limiter - bloblang: | root.message = this.string() root.source = "input_11111" root.meta.process_time = now() - rdb_input: {} processors: - rate_limit: resource: mu_local_limiter - bloblang: | root.message = this.string() root.source = "input_22222" root.meta.process_time = now() rate_limit_resources: - label: mu_local_limiter mut_rate_limit_local: maxcount: 1 interval: 1s
其中 input 组件为简单的从Redis中RPop内容, 大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error ) { if ctx.Err() != nil { return nil , nil , ctx.Err() } d, err := r.l.Pop(ctx) if err != nil { return nil , nil , err } msg := service.NewMessage(d) return msg, func (ctx context.Context, err error ) error { return nil }, nil }
限速器组件实现大致如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func init () { err := service.RegisterRateLimit("mut_rate_limit_local" , limiterConfig(), newMutRateLimitLocal) if err != nil { panic (err) } } func newMutRateLimitLocal (conf *service.ParsedConfig, mgr *service.Resources) (service.RateLimit, error ) { limiter := &my_limit{ log: mgr.Logger(), } limiter.log.Infof("I am Init !!!!!!" ) return limiter, nil }
主程序启动前提前填充redis
1 2 3 4 5 6 7 8 9 func main () { l := lists.GetLists() for i := 0 ; i < 100 ; i++ { _ = l.Push(context.Background(), []byte (strconv.Itoa(i))) } service.RunCLI(context.Background()) }
程序启动后观察日志输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 {"@service" :"benthos" ,"benthos_version" :"v4.48.0" ,"level" :"info" ,"msg" :"Running main config from specified file" ,"path" :"config.yaml" } {"@service" :"benthos" ,"label" :"mu_local_limiter" ,"level" :"info" ,"msg" :"I am Init !!!!!!" ,"path" :"root.rate_limit_resources" } {"@service" :"benthos" ,"level" :"info" ,"msg" :"Listening for HTTP requests at: http://0.0.0.0:4195" } {"@service" :"benthos" ,"label" :"" ,"level" :"info" ,"msg" :"Input type rdb_input is now active" ,"path" :"root.input.broker.inputs.0" } {"@service" :"benthos" ,"label" :"" ,"level" :"info" ,"msg" :"Input type rdb_input is now active" ,"path" :"root.input.broker.inputs.1" } {"@service" :"benthos" ,"label" :"" ,"level" :"info" ,"msg" :"Output type stdout is now active" ,"path" :"root.output" } {"@service" :"benthos" ,"level" :"info" ,"msg" :"Launching a Benthos instance, use CTRL+C to close" } {"message" :"1" ,"meta" :{"process_time" :"2025-04-28T20:04:51.181221041+08:00" },"source" :"input_11111" } {"message" :"3" ,"meta" :{"process_time" :"2025-04-28T20:04:52.181650517+08:00" },"source" :"input_11111" } {"message" :"4" ,"meta" :{"process_time" :"2025-04-28T20:04:53.181898361+08:00" },"source" :"input_11111" } {"message" :"0" ,"meta" :{"process_time" :"2025-04-28T20:04:54.18314283+08:00" },"source" :"input_22222" } {"message" :"5" ,"meta" :{"process_time" :"2025-04-28T20:04:55.183107806+08:00" },"source" :"input_11111" } {"message" :"6" ,"meta" :{"process_time" :"2025-04-28T20:04:56.183515111+08:00" },"source" :"input_11111" } {"message" :"2" ,"meta" :{"process_time" :"2025-04-28T20:04:57.183863343+08:00" },"source" :"input_22222" } {"message" :"7" ,"meta" :{"process_time" :"2025-04-28T20:04:58.184574384+08:00" },"source" :"input_22222" } {"message" :"8" ,"meta" :{"process_time" :"2025-04-28T20:04:59.18518018+08:00" },"source" :"input_11111" }
可以得出以下现象:
根据消息的乱序输出, 且来源都不一致, 说明两个input在同时工作
在乱序的情况下, 消息保持每秒一条的速率
rate_limit组件的I am Init !!!!!! 日志仅输出了一次
由此说明rate_limit组件作为资源加载时, 全局单例 , 根据这个特性, 可以设计出一个速率可变的限速器, 其应该继承框架原本的rate_limit方法, 同时额外暴露一些修改限速值的方法, 包内直接单例模式, 例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 var gLimit mutRateLimitLocaltype mutRateLimitLocal struct { log *service.Logger mut sync.Mutex curSize int lastRefresh time.Time minSize int benchSize int maxSize int period time.Duration } func (m *mutRateLimitLocal) Access(ctx context.Context) (time.Duration, error ) { } func (m *mutRateLimitLocal) Close(_ context.Context) error { return nil } func Upgrade () { gLimit.mut.Lock() defer gLimit.mut.Unlock() ori := gLimit.benchSize gLimit.benchSize *= 2 if gLimit.benchSize > gLimit.maxSize { gLimit.benchSize = gLimit.maxSize } gLimit.log.Infof("limiter upgrade from %d to %d" , ori, gLimit.benchSize) } func DownGrade () { gLimit.mut.Lock() defer gLimit.mut.Unlock() ori := gLimit.benchSize gLimit.benchSize /= 2 if gLimit.benchSize < gLimit.minSize { gLimit.benchSize = gLimit.minSize } gLimit.log.Infof("limiter downGrade from %d to %d" , ori, gLimit.benchSize) }
现在进行验证, 将input组件修改为单个组件, 保证数据的顺序性, 并在读取到特定内容时进行升级降级操作, 并观察输出, 代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (r *rdbInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error ) { if ctx.Err() != nil { return nil , nil , ctx.Err() } d, err := r.l.Pop(ctx) if err != nil { return nil , nil , err } if string (d) == "5" { mut_rate_limit_local.Upgrade() } if string (d) == "10" { mut_rate_limit_local.DownGrade() } msg := service.NewMessage(d) return msg, func (ctx context.Context, err error ) error { return nil }, nil }
input组件相关配置修改为
1 2 3 4 5 6 7 8 input: rdb_input: processors: - rate_limit: resource: mu_local_limiter - bloblang: | root.message = this.string() root.meta.process_time = now()
由于数据是顺序输出的, 所以一定会先出现5, 后出现10, 此时启动程序观察输出为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 {"message" :"1" ,"meta" :{"process_time" :"2025-04-29T09:45:49.100212661+08:00" }} {"message" :"2" ,"meta" :{"process_time" :"2025-04-29T09:45:49.100586456+08:00" }} {"message" :"3" ,"meta" :{"process_time" :"2025-04-29T09:45:50.096771614+08:00" }} {"message" :"4" ,"meta" :{"process_time" :"2025-04-29T09:45:50.096806286+08:00" }} {"@service" :"benthos" ,"label" :"mu_local_limiter" ,"level" :"info" ,"msg" :"limiter upgrade from 2 to 4" ,"path" :"root.rate_limit_resources" } {"message" :"5" ,"meta" :{"process_time" :"2025-04-29T09:45:51.097508536+08:00" }} {"message" :"6" ,"meta" :{"process_time" :"2025-04-29T09:45:51.097554408+08:00" }} {"message" :"7" ,"meta" :{"process_time" :"2025-04-29T09:45:51.097980122+08:00" }} {"message" :"8" ,"meta" :{"process_time" :"2025-04-29T09:45:51.098075563+08:00" }} {"@service" :"benthos" ,"label" :"mu_local_limiter" ,"level" :"info" ,"msg" :"limiter downGrade from 4 to 2" ,"path" :"root.rate_limit_resources" } {"message" :"9" ,"meta" :{"process_time" :"2025-04-29T09:45:52.098415253+08:00" }} {"message" :"10" ,"meta" :{"process_time" :"2025-04-29T09:45:52.09846423+08:00" }} {"message" :"11" ,"meta" :{"process_time" :"2025-04-29T09:45:53.09906257+08:00" }} {"message" :"12" ,"meta" :{"process_time" :"2025-04-29T09:45:53.099122656+08:00" }} {"message" :"13" ,"meta" :{"process_time" :"2025-04-29T09:45:54.099771808+08:00" }} {"message" :"14" ,"meta" :{"process_time" :"2025-04-29T09:45:54.099812208+08:00" }}
可以观察到当读取到5之后, 限速从2升级为4, 消息速率有每秒两条变为每秒四条, 读取到10的时候降级为2, 此后维持2的限速运行.
背压机制对消息处理速率的影响 在Benthos中, 限速器组件能够限速的本质原理是, 通过一个给定的时间间隔, 以及在此时间间隔内, Input组件调用Read或ReadBatch(如果是batchInput)的次数, 来限制整体速率, 例如以下限速器配置
1 2 3 4 5 rate_limit_resources: - label: local_limiter local: count: 1 interval: 1s
表示限制每秒调用1次Input组件的Read方法或ReadBatch, 其原理是当调用次数超过一次后, 下次调用将会强制等待到大于给定等待时间间隔为止., 本质上其是依赖Benthos的背压机制来达到对消息进行限速的目的, 那如何验证这一机制? 可以简单的写一段Benthos配置来对其进行验证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 input: rdb_input: max_in_flight: 0 processors: - bloblang: | root.message = this.string() root.meta.process_time = now() pipeline: threads: 1 processors: - sleep: duration: 1s output: my_stdout: {}
以上配置下, Benthos的输出为
1 2 3 4 5 6 7 8 { "message" : "1" , "meta" : { "process_time" : "2025-04-29T14:27:12.227711335+08:00" } } { "message" : "2" , "meta" : { "process_time" : "2025-04-29T14:27:12.22788404+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T14:27:13.22813145+08:00" } } { "message" : "4" , "meta" : { "process_time" : "2025-04-29T14:27:14.228667279+08:00" } } { "message" : "5" , "meta" : { "process_time" : "2025-04-29T14:27:15.229376735+08:00" } } { "message" : "6" , "meta" : { "process_time" : "2025-04-29T14:27:16.229656828+08:00" } } { "message" : "7" , "meta" : { "process_time" : "2025-04-29T14:27:17.230157291+08:00" } } { "message" : "8" , "meta" : { "process_time" : "2025-04-29T14:27:18.230574692+08:00" } }
可以观察到, 从第二条消息开始, 每条消息间隔了一秒钟才被取出来, 那如果将 pipeline.threads 参数变大, 则能代表每秒可以取出来更多消息, 这些消息等待一秒后, 才能取出来下一批消息
修改后输出同理论一致, 第一秒取出了六条数据, 在第六条数据进入processor阶段时, 前五条数据都在sleep中, 整条链路被阻塞了, 直到下一秒, 前五条消息处理完毕后, 继续处理下五条
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "message" : "2" , "meta" : { "process_time" : "2025-04-29T14:30:15.214659776+08:00" } } { "message" : "1" , "meta" : { "process_time" : "2025-04-29T14:30:15.214329751+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T14:30:15.214761512+08:00" } } { "message" : "4" , "meta" : { "process_time" : "2025-04-29T14:30:15.214990298+08:00" } } { "message" : "5" , "meta" : { "process_time" : "2025-04-29T14:30:15.215094723+08:00" } } { "message" : "6" , "meta" : { "process_time" : "2025-04-29T14:30:15.215260697+08:00" } } { "message" : "7" , "meta" : { "process_time" : "2025-04-29T14:30:16.214789636+08:00" } } { "message" : "8" , "meta" : { "process_time" : "2025-04-29T14:30:16.215151017+08:00" } } { "message" : "9" , "meta" : { "process_time" : "2025-04-29T14:30:16.215217328+08:00" } } { "message" : "10" , "meta" : { "process_time" : "2025-04-29T14:30:16.215294076+08:00" } } { "message" : "15" , "meta" : { "process_time" : "2025-04-29T14:30:17.215655737+08:00" } } { "message" : "11" , "meta" : { "process_time" : "2025-04-29T14:30:16.215366317+08:00" } } { "message" : "12" , "meta" : { "process_time" : "2025-04-29T14:30:17.215124804+08:00" } } { "message" : "13" , "meta" : { "process_time" : "2025-04-29T14:30:17.215432139+08:00" } } { "message" : "14" , "meta" : { "process_time" : "2025-04-29T14:30:17.215565551+08:00" } } { "message" : "20" , "meta" : { "process_time" : "2025-04-29T14:30:18.216655332+08:00" } } { "message" : "19" , "meta" : { "process_time" : "2025-04-29T14:30:18.216587464+08:00" } } { "message" : "18" , "meta" : { "process_time" : "2025-04-29T14:30:18.216449435+08:00" } } { "message" : "17" , "meta" : { "process_time" : "2025-04-29T14:30:18.216183862+08:00" } } { "message" : "16" , "meta" : { "process_time" : "2025-04-29T14:30:17.215772375+08:00" } }
那这与Input组件的max_in_flight参数有什么关系? 在Input组件中 max_in_flight 参数表示系统可接受的未确认消息的数量 , 可以简单比喻成一个萝卜一个坑, 每次调用Read或ReadBatch都会得到一根萝卜, max_in_flight 表示有多少个坑, 当坑里的萝卜没有被拔出来时(消息未被确认), 无法把新的萝卜放进去, 可以通过以下方式简单验证
保持pipeline.threads不变, 修改Input组件的max_in_flight为1, 表示只接受一个未确认的消息, 配置如下
1 2 3 4 5 6 7 8 9 10 11 12 13 input: rdb_input: max_in_flight: 1 processors: - bloblang: | root.message = this.string() root.meta.process_time = now() pipeline: threads: 5 processors: - sleep: duration: 1s
此时输出将会是每秒钟一条消息, 因为只能接受一条未确认消息, 而每条消息又会等待1秒钟
1 2 3 4 5 6 7 { "message" : "1" , "meta" : { "process_time" : "2025-04-29T15:05:40.014691582+08:00" } } { "message" : "2" , "meta" : { "process_time" : "2025-04-29T15:05:41.015494086+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T15:05:42.016571178+08:00" } } { "message" : "4" , "meta" : { "process_time" : "2025-04-29T15:05:43.017364452+08:00" } } { "message" : "5" , "meta" : { "process_time" : "2025-04-29T15:05:44.017900917+08:00" } } { "message" : "6" , "meta" : { "process_time" : "2025-04-29T15:05:45.018384+08:00" } } { "message" : "7" , "meta" : { "process_time" : "2025-04-29T15:05:46.019051633+08:00" } }
同样的, output组件如果出现阻塞, 也会由于背压机制影响到上游的处理效率, 例如以下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 input: rdb_input: max_in_flight: 1 processors: - bloblang: | root.message = this.string() root.meta.process_time = now() pipeline: threads: 10 processors: - sleep: duration: 1s output: my_stdout_batch: max_in_flight: 10 batching: count: 100 byte_size: 1048576 period: "1s" check: "" processors: []
由于output组件存在1s的超时, 所以整个链路将会变为两每秒钟一条消息
1 2 3 { "message" : "1" , "meta" : { "process_time" : "2025-04-29T16:04:43.781875208+08:00" } } { "message" : "2" , "meta" : { "process_time" : "2025-04-29T16:04:45.782621816+08:00" } } { "message" : "3" , "meta" : { "process_time" : "2025-04-29T16:04:47.783167403+08:00" } }
在输入组件中,max_in_flight参数:
设置一个限制,控制在任何给定时间内可以在Benthos流中流动的待确认消息数量
默认值为0,表示没有限制
一旦消息被确认(acknowledged)或拒绝(nacked),它就不再被视为待处理
如果输入产生逻辑批次,则每个批次被视为对最大值的单次计数, 即限制的是Read和ReadBatch的调用次数而并非具体几条消息
警告:如果此字段限制的消息数量低于输出级别的批处理阈值,则输出级别的批处理策略将会停滞
输出(Output)组件中的max_in_flight 在输出组件中,max_in_flight参数:
控制在给定时间内可以有多少消息或消息批次同时处理
增加此值可以提高吞吐量
利用背压机制制作限速器 在上面的探索中, 已经大致搞清了Benthos框架原生的限速器的工作原理, 即利用框架的背压机制, 在下游制造压力, 传导到上游进而限制上游消息的读取速率, 根据这一原理, 可以实现这样一个processor组件:
不处理消息, 将消息原样返回
记录消息个数或Input组件调用次数
如果大于指定速率, 就sleep, 将压力传导至上游Input组件
监测一些指标, 当达到某些条件时降低限速, 反之提高限速
而这个组件可以放在
紧跟着Input组件之后, 可以通过消息数, Read或ReadBatch的调用数, 系统的CPU占用/内存占用等不依赖具体消息内容的指标来进行降级升级
放在Processor组件的末端或中端, 可以通过消息设置的meta信息中处理耗时, 失败次数等与消息内容相关的指标进行升降级
同时放在Input和Processor中, 对多个指标进行监测
而为了保证限速器的可重用性, 最好是将其放在resources中, 通过 label在需要使用的地方引用, 同时通过这种方式也可以保证限速器全局唯一, 如果需要多个限速器, 则创建多个label即可, 限速器应该使用Processor实现而非batchProcessor来确保其对消息限速的粒度为单条消息
Benthos框架在batch组件和非batch组件会自动转换, 例如当Input是batch, Processor为非batch的时候, 此时Input每次都会读取一批消息, 一条一条的交给Processor, 直到所有消息都处理完毕, Input才会读取下一批消息, 所以使用非batch的Processor可以更好的控制单条消息速率
Example: 利用背压机制结合对消息或CPU的监控制作的动态限速器 利用背压机制, 结合 golang.org/x/time/rate 制作一个基于令牌桶的限速器, 通过监测过去一段时间内的消息中特定错误出现频率或CPU使用率动态调节限速值
选择golang.org/x/time/rate有一个比较重要的原因是其支持运行中修改限速值
结构定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import ( "benthos_test/lib/consts/errorx" "context" "github.com/pkg/errors" "github.com/redpanda-data/benthos/v4/public/service" "golang.org/x/time/rate" "math" "sync" "sync/atomic" "time" ) type ( limitProcess struct { log *service.Logger limit *rate.Limiter minEach rate.Limit maxEach rate.Limit wg sync.WaitGroup shutdownChan chan struct {} cpuC *cpuChecker msgC *msgChecker } cpuChecker struct { checkInterval time.Duration cpuMax float64 } msgChecker struct { checkInterval time.Duration failureMax float64 sucCnt atomic.Int32 falCnt atomic.Int32 } )
在Benthos中注册时使用如下配置配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 limit_process: minEach: 1 each: 2 maxEach: 10 capacity: 1 msgChecker: checkInterval: 5s failureMax: 0.1 specialErr: "xxx" func limitProcessConf() *service.ConfigSpec { res := service.NewConfigSpec() res.Stable().Summary("limit processor") res.Fields( service.NewFloatField("each") , service.NewFloatField("minEach").Optional() , service.NewFloatField("maxEach").Optional() , service.NewIntField("capacity") , service.NewObjectField("cpuChecker" , service.NewDurationField("checkInterval") , service.NewFloatField("cpuMax") , ).Optional() , service.NewObjectField("msgChecker" , service.NewDurationField("checkInterval") , service.NewFloatField("failureMax") , ).Optional() , ) return res }
构造Processor
1 2 3 4 5 6 7 8 9 10 func newLimitProcess (conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error ) { res := &limitProcess{ } res.checkerStart() return res, nil }
在处理消息时, 记录消息错误, 并从令牌桶中请求令牌
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func (l *limitProcess) Process(ctx context.Context, message *service.Message) (service.MessageBatch, error ) { l.recordMsg(message.GetError()) err := l.limit.Wait(ctx) if err != nil { l.log.Errorf("limit process wait error, msg %v" , err) } return service.MessageBatch{message}, nil } func (l *limitProcess) recordMsg(err error ) { if l.msgC == nil { return } if err != nil && errors.Is(err, errorx.ErrGptLimit) { l.msgC.falCnt.Add(1 ) } else { l.msgC.sucCnt.Add(1 ) } }
在启动的监测器中, 定时监测状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (l *limitProcess) cpuCheckerLoop() { defer l.wg.Done() } func (l *limitProcess) msgCheckerLoop() { defer l.wg.Done() tc := time.NewTicker(l.msgC.checkInterval) defer tc.Stop() for { select { case <-l.shutdownChan: return case <-tc.C: sucCnt := l.msgC.sucCnt.Swap(0 ) falCnt := l.msgC.falCnt.Swap(0 ) allCnt := sucCnt + falCnt if allCnt == 0 { continue } failRate := float64 (falCnt) / float64 (allCnt) if failRate > l.msgC.failureMax { l.downgrade() } else { l.upgrade() } } } }
最后在Benthos中使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 input: rdb_input: max_in_flight: 1 processors: - bloblang: | root.message = this.string() root.meta.get_time = now() pipeline: threads: 2 processors: - resource: error_process_re - resource: limit_process_re - bloblang: | root = this root.meta.processed = now() output: my_stdout: max_in_flight: 100 processor_resources: - label: limit_process_re limit_process: minEach: 1 each: 2 maxEach: 10 capacity: 1 msgChecker: checkInterval: 5s failureMax: 0.1 - label: error_process_re error_process: {}
最终达成的效果是, 在前期会进行降级, 后期升级, 直到到达最大限制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ... {"@service" :"benthos" ,"label" :"" ,"level" :"error" ,"msg" :"Failed to send message to my_stdout: gpt returns code 429" ,"path" :"root.output" } {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor downgrade, each from 2.000 to 1.000, minEach 1.000" ,"path" :"root.processor_resources" } ... {"@service" :"benthos" ,"label" :"" ,"level" :"error" ,"msg" :"Failed to send message to my_stdout: gpt returns code 429" ,"path" :"root.output" } ... {"message" :"24" ,"meta" :{"get_time" :"2025-04-30T15:28:21.266977435+08:00" ,"processed" :"2025-04-30T15:28:21.267019259+08:00" }} {"message" :"25" ,"meta" :{"get_time" :"2025-04-30T15:28:22.266920475+08:00" ,"processed" :"2025-04-30T15:28:22.266979673+08:00" }} {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor upgrade, each from 1.000 to 2.000, maxEach 10.000" ,"path" :"root.processor_resources" } ... {"message" :"55" ,"meta" :{"get_time" :"2025-04-30T15:28:33.766272232+08:00" ,"processed" :"2025-04-30T15:28:33.766325021+08:00" }} {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor upgrade, each from 4.000 to 8.000, maxEach 10.000" ,"path" :"root.processor_resources" } {"message" :"56" ,"meta" :{"get_time" :"2025-04-30T15:28:34.016153723+08:00" ,"processed" :"2025-04-30T15:28:34.016195951+08:00" }} {"message" :"57" ,"meta" :{"get_time" :"2025-04-30T15:28:34.265191395+08:00" ,"processed" :"2025-04-30T15:28:34.265236555+08:00" }} ... {"message" :"95" ,"meta" :{"get_time" :"2025-04-30T15:28:39.015952884+08:00" ,"processed" :"2025-04-30T15:28:39.016003045+08:00" }} {"@service" :"benthos" ,"label" :"limit_process_re" ,"level" :"info" ,"msg" :"limit processor upgrade, each from 8.000 to 10.000, maxEach 10.000" ,"path" :"root.processor_resources" } {"message" :"96" ,"meta" :{"get_time" :"2025-04-30T15:28:39.141389556+08:00" ,"processed" :"2025-04-30T15:28:39.141453408+08:00" }}
小结 利用Benthos本身背压机制所制作的限速器, 较为灵活, 适用性强, 可以放在处理链路中的任意环节, 通过对其上游施加压力达到限速效果, 但是需要注意以下几点
限速逻辑需要可以支持运行时调整限速值
明确模块的升级/降级触发条件, 可以使资源的占用, 特定的错误, 处理的耗时等
如果需要分布式下的多副本共同限速, 需要使用支持分布式的限速器库或代码
过程中的一些代码: benthos_test.zip