Some Doubts About Background Running Tasks in k3s(or k8s)

use this to scheduler background job.

const Scheduler = require('parse-server-jobs-scheduler').default;
const scheduler = new Scheduler();
 
// Recreates all crons when the server is launched
scheduler.recreateScheduleForAllJobs();
 
// Recreates schedule when a job schedule has changed
Parse.Cloud.afterSave('_JobSchedule', async (request) => {
  scheduler.recreateSchedule(request.object.id)
});
 
// Destroy schedule for removed job
Parse.Cloud.afterDelete('_JobSchedule', async (request) => {
  scheduler.destroySchedule(request.object.id)
});

About ’ Recreates all crons when the server is launched’

I use k3s(something like k8s) to make 2 replicas. Each pod generates the same batch of timed tasks when it starts, which results in the same task running twice.

Is there any solution?

You can use a queue. RabbitMq or Kafka can resolve this issue. You can assign to a queue and ensure only once time consumes that task.

yes. I solved this problem with the bull library

import {getQueue} from "../../utils/bull/bull";
import axios from "axios";
import {logInfo} from "../../utils/logger";
import moment from "moment";

interface CronQueueJob {
    json: any
}

const jobQueue = getQueue<CronQueueJob>("cron-queue", {singleEnv: true});
jobQueue.process('*', 1, async (job) => {
    const {jobName, params} = job.data.json;
    const res = await doJob(jobName, params);

    job.returnvalue = {
        statusCode: res.status,
        result: res.data
    };

    return job.returnvalue;
});

export async function reAddAllJob() {
    const query = new Parse.Query("_JobSchedule");
    query.limit(1000);
    const result = await query.find({useMasterKey: true});
    for (let i = 0; i < result.length; i++) {
        var job = await addCronJob(result[i])
    }

    logInfo("已添加" + result.length + "个定时任务")
}

// Cloud.enqueue 实现
export async function addCronJob(job: Parse.Object) {
    const startDate = new Date(job.get('startAfter'));
    const repeatMinutes = job.get('repeatMinutes');
    const jobName = job.get('jobName');
    if (!repeatMinutes) {
        // 一次性作业
        const delay = moment(startDate).diff(moment());
        if (delay > 0) {
            return await jobQueue.add(jobName, {json: job.toJSON()}, {delay, jobId: jobName, removeOnComplete: true});
        } else {
            return await jobQueue.add(jobName, {json: job.toJSON()}, {jobId: jobName, removeOnComplete: true});
        }
    } else {
        let timeOfDay = moment(job.get('timeOfDay'), 'HH:mm:ss.Z').utc()
        const cronDoW = '?'
        const minutes = repeatMinutes % 60
        const hours = Math.floor(repeatMinutes / 60)

        let cron = '0 '
        // Minutes
        if (minutes) {
            cron += `${timeOfDay.minutes()}-59/${minutes} `
        } else {
            cron += `0 `
        }

        // Hours
        cron += `${timeOfDay.hours()}-23`
        if (hours) {
            cron += `/${hours}`
        }
        cron += ' '

        // Day of month
        cron += '* '

        // Month
        cron += '* '

        // Days of week
        cron += cronDoW

        console.log(`${jobName}: ${cron}`)

        // 定期作业
        const repeatOptions = {};
        // @ts-ignore
        repeatOptions.cron = cron;
        // @ts-ignore
        repeatOptions.startDate = startDate;
        // @ts-ignore
        repeatOptions.jobId = jobName;

        // @ts-ignore
        return await jobQueue.add(jobName, {json: job.toJSON()}, {repeat: repeatOptions, removeOnComplete: true});
    }
}

export async function removeCronJob(jobName) {
    const jobs = await jobQueue.getRepeatableJobs();
    let isRemove = false;
    for (let i = 0; i < jobs.length; i++) {
        if (jobs[i].id === jobName) {
            await jobQueue.removeRepeatableByKey(jobs[i].key);
            isRemove = true;
        }
    }

    if (!isRemove) {
        const job = await jobQueue.getJob(jobName);
        if (job) {
            await job.remove();
        }
    }
}

async function doJob(jobName, params) {
    return await axios.post(Parse.serverURL + '/jobs/' + jobName, params, {
        headers: {
            'X-Parse-Application-Id': Parse.applicationId,
            'X-Parse-Master-Key': Parse.masterKey
        },
    });
}

use

import {addCronJob, reAddAllJob, removeCronJob} from "./bull/bull_cron_queue";
// Recreates schedule when a job schedule has changed
Parse.Cloud.afterSave('_JobSchedule', async (request) => {
    await addCronJob(request.object)
})

// Destroy schedule for removed job
Parse.Cloud.afterDelete('_JobSchedule', async (request) => {
    await removeCronJob(request.object.get("jobName"))
})

reAddAllJob();
1 Like

very nice… :+1: But there’s not much information on the web about using timed tasks in k3s or k8s.

k8s have their own job scheduler, but also we can use rabbitmq, kafka or bull likewise for solving these problems in software layer.