kd-schedule

  • 限制并发,使程序可控
  • 错峰处理,使程序平衡性更强,性能占用可控(自动错峰,如果用户不配置错峰处理,则程序自动错峰处理,区间可配置最少为1秒)
  • 定时发送任务,周期型任务(按分钟),单项程序管控,使得程序定时器结束后立刻增加权重,并在结束后恢复其权重重新排序

协程控制

77sPsK.png
77sPsK.png

权重配比说明

参考了CPU及GMP的调度算法,增加了权重的配置,权重可以使程序不断地插队

建议增加任务时按大到小形式增加任务,且权重尽可能不重复,重复权重会依次排序

TODO

轮询任务,定时器监控

增加定时增加权重,轮询任务增加权重,避免一个任务长期被插队无法执行

设计

每个控制器被称作一个worker,每个worker可以控制协程数,由于各个参数不同因此不支持带参数的任务

程序在worker初始化时开启一个协程去监听add操作,并在start后退出该协程,之后所有的任务都以动态加载形式增加

任务使用链表形式存储,为了按权重插队,插队速度更快

每个worker都有一个入口workerentry,用来控制任务链表,每个workerentry都是一个协程

7XcVlF.png
7XcVlF.png

worker在add的时候会再workerentry中按大到小顺序添加任务,但多个workerentry不能排序因此需要用户自行按从大到小增加任务,否则权重可能不生效

循环worker中的workerentry增加任务,到头后停止增加并从第一个workerentry重新添加

workerentry中遇到相同的权重任务时会将后加入的任务权重自动后排

删除任务时只能通过权重删除,因此权重也被视为一个编号,尽可能不要重复

head只作为链表头部,执行从head后面开始执行

7Xcnm9.png
7Xcnm9.png

kd-edr 中的应用

edr-agent作为系统底层硬件监控,需要对其限制性能,性能瓶颈时做出告警行为,这是 kd-schedule 的作用

edr-agent希望 kd-schedule 在初始化结束后启用动态加载的形式给链表重新排队,但当队列出现延时过高时提示错误,并重新分配协程任务

同时在任务中增加定时器概念,使得不使用重新排队也可以让任务继续执行,定时器只能保证单个entry,重新排队需要手动调用或者等待程序自动调用

不同于协程池

协程池是一个worker从池子里取任务,控制协程数不超过一个阈值,但权重排序问题比较麻烦,而且协程池无法控制程序周期

链表结构保证了可以由程序控制下一个执行的方法,支持了动态加载

动态加载(插队)

动态加载时给每个workerentry一个标记,当workerentry队列为空的时候,将动态加载的任务送给这个workerentry

任务自动加载,任务在初始阶段给定一个数值,标记其需要执行的次数,并增加定时,0为不限制次数,默认为1

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"fmt"
"github.com/ai0by/kd-schedule/schedule"
"time"
)

type edr struct {
Num int
}
type edr2 struct {
Num int
}

func main() {
var edrS = &edr{Num:0}
var edr2 = &edr2{Num:0}
wk := schedule.NewWorker(1, 100)
for i := 1; i < 9999; i++ {
_ = wk.Add(uint8(i), 50, edrS)
}
wk.Start()
// 实现接口增加任务
wk.Add(200,50, edr2)
// 闭包带参数任务
wk.AddClosureFunc(199,50, func(args ...interface{}) {
for _,v := range args{
fmt.Println(v.(string))
}
},"~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
wk.AddClosureFunc(199,50, func(args ...interface{}) {
for _,v := range args{
fmt.Println(v.(string))
}
},"---------------------------")
time.Sleep(2 * time.Second)
//wk.Stop()
var cc = make(chan int)
<-cc
}

func (e *edr)TaskFunc(args ...interface{}) {
fmt.Println("--------------",e.Num)
e.Num++
time.Sleep(500 * time.Millisecond)
}

func (e *edr2)TaskFunc(args ...interface{}){
fmt.Println("+++++++++++++++++",e.Num)
e.Num++
time.Sleep(50 * time.Millisecond)
}

后续这个库应该会用在edr项目中,根据edr的需求会再迭代