登录
原创

基于cron的秒级计划任务实现思路和演示

专栏golang大杂烩
发布于 2021-04-30 阅读 38
  • 后端
  • Go
原创

基于cron的秒级计划任务实现

main入口文件

package main

import (
    "fmt"
    "github.com/miya0315/cron/v3"
    "gocrongtab/function"
    "log"
    "os"
    "sync"
)

func main() {

    // 初始化计划任务限制实例:
    // 通过channel控制并发数量,
    // CronList监控当前执行的计划任务(这里需要用sync.map实现读写安全;如果直接用map当并发大的时候容易出现读写异常)
    // CronJob 实例化秒级别的任务
    
    queueLimit := function.QueueLimit{
        Ch:       make(chan model.CronTabQueue, 15),
        CronList: &sync.Map{},
        CronJob:  newWithSeconds(),
    }

    // 将需要执行的队列任务通过go协程进行循环添加,同时不影响下面代码处理
    go function.QueueTypeRun(&queueLimit)

    // 循环读取队列channel里需要执行的任务
    for queue := range queueLimit.Ch {
        go function.QueueRequest(queue, &queueLimit)
    }
}

// 初始化秒级别的任务
func newWithSeconds() *cron.Cron {
    fmt.Println("newWithSeconds")
    // 自定义任务时间解析器(秒级别)
    secondParser := cron.NewParser(cron.Second | cron.Minute |
        cron.Hour | cron.Dom | cron.Month | cron.DowOptional | cron.Descriptor)
    // 记录日志
    // logger := cron.PrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
    
    // 记录详细日志
    // logger := cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))
    // return cron.New(cron.WithParser(secondParser), cron.WithChain(),cron.WithLogger(logger))


    return cron.New(cron.WithParser(secondParser), cron.WithChain())
}

funciton 任务处理包

package function

import (
    "fmt"
    "github.com/miya0315/cron/v3"
    "sync"
    "time"
)
type QueueLimit struct {
    Ch       chan CronTabQueue //url 地址的通道
    CronList *sync.Map
    CronJob  *cron.Cron
}


// 计划任务结构体
type CronTabQueue struct {
    ID       int       // 任务标题
    Title    string    // 任务标题
    Url      string    // 任务地址
    RunRole  string    // 任务执行规则、策略:*/1 * * * * ?
    TimeOut  int8      // 任务执行过期时间
    Active   int8      // 任务状态
    NextTime time.Time // 下次实现时间
}

// 全局参数初始化
func QueueTypeRun(queueLimit *QueueLimit) {

QUEUE:
    go processQueue(queueLimit)
    time.Sleep(500 * time.Millisecond)

    goto QUEUE
}


// 模拟创建队列任务
func processQueue(limit *QueueLimit) {
    queues := make(map[int]CronTabQueue)
    for i := 1; i < 10; i++ {
        queues[i] = CronTabQueue{
            ID:      i,
            Title:   fmt.Sprintf("测试任务00%d", i),
            Url:     "http://localhost:8080",
            RunRole: fmt.Sprintf("*/%d * * * * ? ", i),
            TimeOut: 5,
            Active:  1,
        }
    }
    for _, queue := range queues {
        limit.Ch <- queue
    }
}

// 执行通道里面的计划任务

func QueueRequest(queue CronTabQueue, limit *QueueLimit) {

    // 根据当前的需要执行的任务ID,判断该任务是否已经在执行队列中
    val, ok := limit.CronList.Load(queue.ID)

    // 如果能获取到该任务的EntryId,则在看下他的实时状态
    if ok {
        // 类型格转化
        eId := val.(cron.EntryID)

        if queue.Active == 0 {
            limit.CronJob.Remove(eId)       // 移除任务
            limit.CronList.Delete(queue.ID) // 移除map列表
        } else {
            // 获取任务的下一次执行时间
            //nextTime := limit.CronJob.Entry(eId).Next

            //fmt.Println(queue.ID, queue.Title, "任务将在:[", nextTime, "]再次执行; 任务当前时间是[", queue.NextTime, "]", nextTime.After(queue.NextTime))

        }
    } else { //未在map列表中
        // 另起协程执行每个请求发起和后续操作
        go func(queue CronTabQueue, limit *QueueLimit) {
            if queue.Active == 1 {
                entryId, err := limit.CronJob.AddFunc(queue.RunRole, func() {
                    fmt.Println(queue.ID, queue.Title, "任务在[", time.Now().Format("2006-01-02 15:04:05"), "]执行;",queue.RunRole)
                })
                if err == nil {
                    limit.CronList.Store(queue.ID, entryId) // 将任务添加到map列表中
                }
            }

        }(queue, limit)
    }

    limit.CronJob.Start()
}


评论区

励志做一条安静的咸鱼,从此走上人生巅峰。

0

0

0

举报