基于cron的秒级计划任务实现:
main入口文件
package main
import (
"fmt"
"github.com/miya0315/cron/v3"
"gocrongtab/function"
"log"
"os"
"sync"
)
func main() {
// 初始化计划任务限制实例:
// 通过channel控制并发数量,
// CronList监控当前执行的计划任务(这里需要用sync.map实现读写安全;如果直接用map当并发大的时候容易出现读写异常)
// CronJob 实例化秒级别的任务
queueLimit := function.QueueLimit{
Ch: make(chan model.CronTabQueue, 15),
CronList: &sync.Map{},
CronJob: newWithSeconds(),
}
// 将需要执行的队列任务通过go协程进行循环添加,同时不影响下面代码处理
go function.QueueTypeRun(&queueLimit)
// 循环读取队列channel里需要执行的任务
for queue := range queueLimit.Ch {
go function.QueueRequest(queue, &queueLimit)
}
}
// 初始化秒级别的任务
func newWithSeconds() *cron.Cron {
fmt.Println("newWithSeconds")
// 自定义任务时间解析器(秒级别)
secondParser := cron.NewParser(cron.Second | cron.Minute |
cron.Hour | cron.Dom | cron.Month | cron.DowOptional | cron.Descriptor)
// 记录日志
// logger := cron.PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
// 记录详细日志
// logger := cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
// return cron.New(cron.WithParser(secondParser), cron.WithChain(),cron.WithLogger(logger))
return cron.New(cron.WithParser(secondParser), cron.WithChain())
}
funciton 任务处理包
package function
import (
"fmt"
"github.com/miya0315/cron/v3"
"sync"
"time"
)
type QueueLimit struct {
Ch chan CronTabQueue //url 地址的通道
CronList *sync.Map
CronJob *cron.Cron
}
// 计划任务结构体
type CronTabQueue struct {
ID int // 任务标题
Title string // 任务标题
Url string // 任务地址
RunRole string // 任务执行规则、策略:*/1 * * * * ?
TimeOut int8 // 任务执行过期时间
Active int8 // 任务状态
NextTime time.Time // 下次实现时间
}
// 全局参数初始化
func QueueTypeRun(queueLimit *QueueLimit) {
QUEUE:
go processQueue(queueLimit)
time.Sleep(500 * time.Millisecond)
goto QUEUE
}
// 模拟创建队列任务
func processQueue(limit *QueueLimit) {
queues := make(map[int]CronTabQueue)
for i := 1; i < 10; i++ {
queues[i] = CronTabQueue{
ID: i,
Title: fmt.Sprintf("测试任务00%d", i),
Url: "http://localhost:8080",
RunRole: fmt.Sprintf("*/%d * * * * ? ", i),
TimeOut: 5,
Active: 1,
}
}
for _, queue := range queues {
limit.Ch <- queue
}
}
// 执行通道里面的计划任务
func QueueRequest(queue CronTabQueue, limit *QueueLimit) {
// 根据当前的需要执行的任务ID,判断该任务是否已经在执行队列中
val, ok := limit.CronList.Load(queue.ID)
// 如果能获取到该任务的EntryId,则在看下他的实时状态
if ok {
// 类型格转化
eId := val.(cron.EntryID)
if queue.Active == 0 {
limit.CronJob.Remove(eId) // 移除任务
limit.CronList.Delete(queue.ID) // 移除map列表
} else {
// 获取任务的下一次执行时间
//nextTime := limit.CronJob.Entry(eId).Next
//fmt.Println(queue.ID, queue.Title, "任务将在:[", nextTime, "]再次执行; 任务当前时间是[", queue.NextTime, "]", nextTime.After(queue.NextTime))
}
} else { //未在map列表中
// 另起协程执行每个请求发起和后续操作
go func(queue CronTabQueue, limit *QueueLimit) {
if queue.Active == 1 {
entryId, err := limit.CronJob.AddFunc(queue.RunRole, func() {
fmt.Println(queue.ID, queue.Title, "任务在[", time.Now().Format("2006-01-02 15:04:05"), "]执行;",queue.RunRole)
})
if err == nil {
limit.CronList.Store(queue.ID, entryId) // 将任务添加到map列表中
}
}
}(queue, limit)
}
limit.CronJob.Start()
}