最近需求中涉及到定时任务的执行,具体场景是这样的
- 设置规则 rule1,比如: 某个广告过去两天消耗<1000
- 规则 rule1 每 N 天比如:7 天检查一次
- 如果满足规则 rule1 则可以进行下面操作:关停广告 或者 调整报价
我们首先想到的是用系统自带的 cron 服务或者公司的任务调度系统来实现,但它其实并不单纯是一个固定的定时任务(在某个固定时间执行某个任务就可以了)。这个场景要稍微复杂一些,它可能还会发生下列行为操作
- rule1 可以随时关停
- rule1 检查频次调整为 3 天检查一次
- 可以随时新建 rule2
- 定时任务执行时间不能完全根据服务开启的时间来设置。比如 rule1 创建时间是 5.1 号,频次是 7 天 1 次,正常来说下次任务执行时间应该是 5.8 号;如果我们 5.3 号开启服务,设置每隔 7 天来执行任务的话,下次执行时间就到了 5.10 号,跟我们预期不太一致
就需要动态调整定时任务的频次、新增或者删除定时任务了
系统自带 cron 不能完全满足我们需求的话,就要看看有没有第三方库来支持。github 上找到了 robfig/cron 能够很灵活地进行配置,并且格式什么的和系统自带 cron 也差不多
系统 cron
Linux 中使用内置 cron 计划任务服务,按照约定的时间定时执行特定的任务。cron 服务启动后会读取配置文件/etc/crontab,cron 服务根据命令和执行时间按时来调用工作任务。
cron 服务操作命令
# 启动crontab
$ service crond start
# 关闭crontab
$ service crond stop
# 重启crontab
$ service crond restart
# 重载crontab
$ service crond reload
cron 服务提供了 crontab 命令来设置计划任务
$ crontab [-u user] [-e] [-l] [-r]
参数 | 描述 |
---|---|
crontab -u user | 用于设定用户的定时服务,Linux 中每个用户对应着一份 crontab 任务清单。 |
crontab -e | 编辑用户的 crontab 文件内容来设置定时任务 |
crontab -l | 显示用户的 crontab 文件内容 |
crontab -r | 从/var/spool/cron 目录中删除用户的 crontab 文件 |
crontab -i | 在删除用户 crontab 时给出确认提示 |
crontab 命令可固定的间隔时间执行指定的系统指令或 shell script 脚本,时间间隔的单位是分钟、小时、日、月、周即以上的任意组合。
当使用 crontab 命令编辑完用户的计划任务后,cron 服务自动会在/var/spool/cron 文件夹下生成一个与此用户同名的文件,该文件记录对应用户的计划任务信息,此文件是禁止直接编辑的,只能通过 crontab -e 命令来编辑。cron 服务启动后每过 1 分钟会读取用户配置的计划任务文件,检查是否需要执行里面的命令,因此修改后无需重启 cron 服务。
cron 表达式基础语法
minute hour day month week command
时间 | 名称 | 取值范围 | 描述 |
---|---|---|---|
minute | 分钟 | 0~59 | 整数 |
hour | 小时 | 0~23 | 整数 |
day | 日期 | 1~31 | 一个月中的某一天 |
month | 月份 | 1~12 | 整数 |
week | 星期几 | 0~7 | 0 或 7 表示星期日 |
command | 命令 | - | - |
# .---------------- minute (0 - 59)
# | .------------- hour (0 - 23)
# | | .---------- day of month (1 - 31)
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ...
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# | | | | |
# * * * * * user-name command to be executed
每个域都可以使用数字,但也可以使用下列特殊字符。
特殊字符 | 名称 | 描述 |
---|---|---|
* | 星号 | 星号 |
, | 逗号 | 枚举值,使用逗号分隔指定的列表任务。 |
- | 中杠 | 范围,整数之间使用中杠表示一个整数范围。 |
/ | 正斜线 | 增长间隔,指定时间的间隔频率 |
? | 问号 | 仅用于日和星期,可代替*星号。 |
例如:
表达式 | 分钟 | 小时 | 日期 | 月份 | 星期 | 描述 |
---|---|---|---|---|---|---|
0 6 * * * cmd | 0 | 6 | * | * | * | 每天早上 6:00 点执行一次任务 |
0 _/2 _ * * cmd | 0 | */2 | * | * | * | 每间隔 2 小时执行一次任务 |
_/5 _ * * * cmd | */5 | * | * | * | * | 每隔 5 分钟执行一次任务 |
⚠️ 注意: centos 下配置 crontab 和 linux 下略有不同,如果直接编辑 /etc/crontab,需要指定用户
vim /etc/crontab
加上用户名 root
*/30 * * * * root date && source /root/.bash_profile && /usr/local/bin/ruby /data/programs/appstore_connect/main.rb >> /data/programs/appstore_connect/login.log
55 17 * * * root /bin/bash /data/programs/appstore_connect/get_appstoreconnect_data.sh >> /data/programs/appstore_connect/get_appstoreconnect_data.log
robfig/cron
robfig/cron 包是 Go 的定时任务框架,实现了 cron 计划任务规范的解析器和任务运行器。不同之处在于 robfig/cron 不仅兼容了 Linux 标准的 Crontab 格式,而且扩展到秒级。
调用 demo
下载安装包
$ go get -u -v github.com/robfig/cron/v3
demo
func TestCronDemo(t *testing.T) {
c := cron.New()
// 通过AddFunc注册
c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") })
c.AddFunc("@every 5m", func() { fmt.Println("every 5m, start 5m fron now") })
// 通过AddJob注册
// var cJob cronJobDemo
// c.AddJob("@every 5s", cJob)
// 启动
c.Start()
// 停止
c.Stop()
}
type cronJobDemo int
func (c cronJobDemo) Run() {
fmt.Println("5s func trigger")
return
}
主要逻辑
- 先用 cron.New()初始化一个实例
- 然后调用 AddFunc(spec string, cmd func()) 注册你希望调用的 func
- 最后通过 cron.Start()方法启动。
New()
cron.go 中的 New() 方法用来创建并返回一个 Corn 对象指针
这个函数接收一组可变的 Option 类型的参数,该类型实际上是一类函数:
type Option func(*Cron)
Corn 内置了一些 Option 类型的函数,都在 option.go 中,以 With 开头,用来改变 Cron 的默认行为,在 New() 中创建完 Cron 之后,会依次执行这些函数。
比如
Cron = cron.New(cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)))
AddFunc()
AddFunc() 用于向 Corn 中添加一个作业:
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
// 包装
return c.AddJob(spec, FuncJob(cmd))
}
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
AddFunc() 相较于 AddJob() 帮用户省去了包装成 Job 类型的一步,在 AddJob() 中,调用了 standardParser.Parse() 将 cron 表达式解释成了 schedule 类型,最终,他们调用了 Schedule() 方法:
这个方法负责创建 Entry 结构体,并把它追加到 Cron 的 entries 列表中,如果 Cron 已经处于运行状态,会将这个创建好的 entry 发送到 Cron 的 add chan 中,在 run() 中会处理这种情况。
AddJob 后发生了什么? (主要的数据结构)
对于 Cron 的整体逻辑,最关键的两个数据结构就是 struct Entry 和 Cron。
每当你用 AddJob 注册一个定时调用策略,就会为这个策略生成一个唯一的 Entry,不难想象,Entry 里会存储被执行的时间、需要被调度执行的实体 Job。
生成 entry 后,再将 entry 放到 struct Cron 的 entry 列表里,Cron 的结构里,主要是一些用来和外部交互的 channel,比如通过 channel 添加、删除 entry 等。详见下面的代码。
// Entry 数据结构,每一个被调度实体一个
type Entry struct {
// 唯一id,用于查询和删除
ID EntryID
// 本Entry的调度时间,不是绝对时间,在生成entry时会计算出来
Schedule Schedule
// 本entry下次需要执行的绝对时间,会一直被更新
// 被封装的含义是Job可以多层嵌套,可以实现基于需要执行Job的额外处理
// 比如抓取Job异常、如果Job没有返回下一个时间点的Job是还是继续执行还是delay
Next time.Time
// 上一次被执行时间,主要用来查询
Prev time.Time
// WrappedJob 是真实执行的Job实体
WrappedJob Job
// Job 主要给用户查询
Job Job
}
// Cron 数据结构,为robfig/cron的运行实体使用的s数据结构
type Cron struct {
entries []*Entry // 调度执行实体列表
// chain 用来定义entry里的warppedJob使用什么逻辑(e.g. skipIfLastRunning)
// 即一个cron里所有entry只有一个封装逻辑
chain Chain
stop chan struct{} // 停止整个cron的channel
add chan *Entry // 增加一个entry的channel
remove chan EntryID // 移除一个entry的channel
snapshot chan chan []Entry // 获取entry整体快照的channel
running bool // 代表是否已经在执行,是cron为使用者提供的动态修改entry的接口准备的
logger Logger // 封装golang的log包
runningMu sync.Mutex // 用来修改运行中的cron数据,比如增加entry,移除entry
location *time.Location // 地理位置
parser ScheduleParser // 对时间格式的解析,为interface, 可以定制自己的时间规则。
nextID EntryID // entry的全局ID,新增一个entry就加1
jobWaiter sync.WaitGroup // run job时会进行add(1), job 结束会done(),stop整个cron,以此保证所有job都能退出
}
需要注意的是,WrappedJob 和 chain 这两个成员,这是 Cron 实现的 Job 封装逻辑,目前是解决实际调度 Job 的异常处理。比如你希望自己的上一个时间点的 JobA 没有结束,下一个时间点的 JobA 就不执行,这个“不执行”的逻辑实现就定义在 chain,初始化时通过 chain 将 JobA 进行封装写入 WrappedJob,那么每次 JobA 调用前会先执行封装逻辑,进行判断。
Start()
Start() 用于开始执行 Cron:
func (c *Cron) Start() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
return
}
c.running = true
go c.run()
}
这个函数干了三件事:
- 获取锁
- 将 c.running 置为 true 表示 cron 已经在运行中了
- 开启一个 goroutine 执行 c.run(), run 中会一直轮循 c.entries 中的 entry, 如果一个 entry 允许执行了,就会开启单独的 goroutine 去执行这个作业
run 是整个 cron 的一个核心,它负责处理 cron 开始执行后的大部分事情,包括添加作业,删除作业,执行作业等,其结构如下:
func (c *Cron) run() {
c.logger.Info("start")
// 第一部分
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
// 第二部分
for {
// 2.1
sort.Sort(byTime(c.entries))
// 2.2
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
timer = time.NewTimer(100000 * time.Hour)
} else {
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
// 2.3
for {
select {}
break
}
}
}
大概包含下面这几部分:
-
第一部分:遍历了 c.entries 列表,通过 schedule.Next() 计算出这个作业下一次执行的时间,并赋值给了 entry.Next 字段。
-
第二部分是一个死循环,这一部分又可以分为三个部分:
-
2.1:调用了 sort 的快排,其实是对 entries 中的元素按 Next 字段的时间线后顺序排序。
-
2.2:这一部分是对定时器的一个初始化操作:如果没有可以执行的作业,定时器被设置为十万小时后触发(其实就是休眠),否则定时器会在第一个作业允许被执行时触发,定时器触发后, 2.3 部分会去做剩下的事。
-
2.3:这又是整个 run 的核心,其主体是一个死循环(其实它会退出,不算是死循环),这个循环里面的核心又是一个 select 多路复用,这个多路复用里监听了五种信号,这五种信号是怎样发出的我们在上面其实已经说过了,他们分别是定时器触发信号 timer.C, 运行过程中添加作业的信号 c.add, 快照信号 c.snapshot, cron 停止的信号 c.stop, 移除作业的信号 c.remove。
-
Run()
和Start类似的还有的Run函数,不同的是start会开启一个goroutine来运行,而run是在当前goroutine执行的
c.runningMu.Lock()
if c.running {
c.runningMu.Unlock()
return
}
c.running = true
c.runningMu.Unlock()
c.run()
我的实现
包括crontab.go、run.go 两个文件,通过crontab.go来启动
crontab.go
func main() {
//启动
go func() {
defer func() {
if r := recover(); r != nil {
logger.Log.Errorf("start panic:%v stack:%s", r, string(debug.Stack()))
}
}()
start()
}()
go func() {
//定时检查是否有规则调整
defer func() {
if r := recover(); r != nil {
logger.Log.Errorf("checkRules panic:%v stack:%s", r, string(debug.Stack()))
}
}()
checkRules()
}()
for {
select {}
}
}
func start() {
Cron = cron.New(cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)))
//获取有效的规则
ruleList, err := rule.GetRuleList("", []int{rule.StatusOpen})
if err != nil {
logger.Log.Errorf("get rule list err:%s", err.Error())
} else {
for _, rule := range ruleList {
addTask(&rule)
}
}
//开启任务
Cron.Run()
}
func addTask(data *rule.RulesList) {
if data == nil {
return
}
task, ok := taskMap[data.RuleId]
if data.Status != rule.StatusOpen {
//删除或者暂停规则
if ok {
//如果已经在任务队列中,则移除
Cron.Remove(task.CronId)
delete(taskMap, data.RuleId)
}
return
}
//下面的都是开启规则的逻辑
//如果该规则已经存在于任务中
if ok {
//上次开启任务后,规则没有改变
if data.LastUpdateTime.Equal(task.RuleInfo.LastUpdateTime) {
return
}
//有改变,则先删除旧的任务
Cron.Remove(task.CronId)
delete(taskMap, data.RuleId)
}
checkFrequency := &controllerRule.Frequency{}
err := json.Unmarshal([]byte(data.CheckFrequency), checkFrequency)
if err != nil {
return
}
//根据检查频次来设置任务
//for example "30 */3 * * * ?"
//【second】:0-59、【minute】:0-59、【hour】:0-23、【day of month】:1-31 【month】:1-12 or JAN-DEC 【day of week】:0-6 orSUN-SAT
specs := []string{"*", "*", "*", "*", "*", "?"}
var nextTime time.Time
if checkFrequency.Type == rule.FrequencyTypeDay {
//正常
specs[0] = strconv.Itoa(data.LastUpdateTime.Second())
specs[1] = strconv.Itoa(data.LastUpdateTime.Minute())
specs[2] = strconv.Itoa(data.LastUpdateTime.Hour())
specs[3] = "*/" + strconv.Itoa(checkFrequency.Period)
} else {
//主要为了测试用
specs[0] = strconv.Itoa(data.LastUpdateTime.Second())
specs[1] = "*/" + strconv.Itoa(checkFrequency.Period)
}
newTask := &Task{
RuleInfo: data,
}
spec := strings.Join(specs, " ")
entryId, err := Cron.AddJob(spec, newTask)
if err != nil {
return
}
timeNow := time.Now()
if data.LastUpdateTime.Before(timeNow) {
subDuration := time.Now().Sub(data.LastUpdateTime)
if checkFrequency.Type == rule.FrequencyTypeDay {
//正常
subMinute := subDuration.Hours() / 24
tmp := int(math.Ceil(subMinute / float64(checkFrequency.Period)))
nextTime = data.LastUpdateTime.Add(time.Duration(tmp*tmp*checkFrequency.Period) * 24 * time.Hour)
} else {
//主要为了测试
subMinute := subDuration.Minutes()
tmp := int(math.Ceil(subMinute / float64(checkFrequency.Period)))
nextTime = data.LastUpdateTime.Add(time.Duration(tmp*checkFrequency.Period) * time.Minute)
}
////测试用
//nextTime = timeNow.Add(2 * time.Second)
Cron.SetNextTime(entryId, nextTime)
}
//添加到任务映射
newTask.CronId = entryId
taskMap[data.RuleId] = newTask
return
}
run.go
type Task struct {
RuleInfo *rule.RulesList
CronId cron.EntryID
}
func (t Task) Run() {
defer func() {
if r := recover(); r != nil {
dingding.SendDdMessage("", fmt.Sprintf("run panic:%s", r))
}
}()
logger.Log.Infof("cronId:%d ***** run start *****:%s", t.CronId, time.Now().Format("2006-01-02 15:04:05"))
err := run(t.RuleInfo)
if err != nil {
dingding.SendDdMessage("", fmt.Sprintf("ruleId:%d run fail", t.RuleInfo.RuleId))
}
logger.Log.Infof("cronId:%d ***** run end *****:%s", t.CronId, time.Now().Format("2006-01-02 15:04:05"))
}
由于robfig计算下一次运行时间时默认时从当前时间算起的,而我们是需要从规则的创建时间来计算的,所以对其代码进行了微调
主要是在 robfig/cron/cron.go 中增加了修改nextTime的方法,主要是在start之前进行修改
func (c *Cron) SetNextTime(id EntryID, nextTime time.Time) {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
return
}
if nextTime.Before(time.Now()) {
return
}
for _, e := range c.entries {
if e.ID != id {
continue
}
e.Next = nextTime
}
}