yes. I solved this problem with the bull library
import {getQueue} from "../../utils/bull/bull";
import axios from "axios";
import {logInfo} from "../../utils/logger";
import moment from "moment";
interface CronQueueJob {
json: any
}
const jobQueue = getQueue<CronQueueJob>("cron-queue", {singleEnv: true});
jobQueue.process('*', 1, async (job) => {
const {jobName, params} = job.data.json;
const res = await doJob(jobName, params);
job.returnvalue = {
statusCode: res.status,
result: res.data
};
return job.returnvalue;
});
export async function reAddAllJob() {
const query = new Parse.Query("_JobSchedule");
query.limit(1000);
const result = await query.find({useMasterKey: true});
for (let i = 0; i < result.length; i++) {
var job = await addCronJob(result[i])
}
logInfo("已添加" + result.length + "个定时任务")
}
// Cloud.enqueue 实现
export async function addCronJob(job: Parse.Object) {
const startDate = new Date(job.get('startAfter'));
const repeatMinutes = job.get('repeatMinutes');
const jobName = job.get('jobName');
if (!repeatMinutes) {
// 一次性作业
const delay = moment(startDate).diff(moment());
if (delay > 0) {
return await jobQueue.add(jobName, {json: job.toJSON()}, {delay, jobId: jobName, removeOnComplete: true});
} else {
return await jobQueue.add(jobName, {json: job.toJSON()}, {jobId: jobName, removeOnComplete: true});
}
} else {
let timeOfDay = moment(job.get('timeOfDay'), 'HH:mm:ss.Z').utc()
const cronDoW = '?'
const minutes = repeatMinutes % 60
const hours = Math.floor(repeatMinutes / 60)
let cron = '0 '
// Minutes
if (minutes) {
cron += `${timeOfDay.minutes()}-59/${minutes} `
} else {
cron += `0 `
}
// Hours
cron += `${timeOfDay.hours()}-23`
if (hours) {
cron += `/${hours}`
}
cron += ' '
// Day of month
cron += '* '
// Month
cron += '* '
// Days of week
cron += cronDoW
console.log(`${jobName}: ${cron}`)
// 定期作业
const repeatOptions = {};
// @ts-ignore
repeatOptions.cron = cron;
// @ts-ignore
repeatOptions.startDate = startDate;
// @ts-ignore
repeatOptions.jobId = jobName;
// @ts-ignore
return await jobQueue.add(jobName, {json: job.toJSON()}, {repeat: repeatOptions, removeOnComplete: true});
}
}
export async function removeCronJob(jobName) {
const jobs = await jobQueue.getRepeatableJobs();
let isRemove = false;
for (let i = 0; i < jobs.length; i++) {
if (jobs[i].id === jobName) {
await jobQueue.removeRepeatableByKey(jobs[i].key);
isRemove = true;
}
}
if (!isRemove) {
const job = await jobQueue.getJob(jobName);
if (job) {
await job.remove();
}
}
}
async function doJob(jobName, params) {
return await axios.post(Parse.serverURL + '/jobs/' + jobName, params, {
headers: {
'X-Parse-Application-Id': Parse.applicationId,
'X-Parse-Master-Key': Parse.masterKey
},
});
}
use
import {addCronJob, reAddAllJob, removeCronJob} from "./bull/bull_cron_queue";
// Recreates schedule when a job schedule has changed
Parse.Cloud.afterSave('_JobSchedule', async (request) => {
await addCronJob(request.object)
})
// Destroy schedule for removed job
Parse.Cloud.afterDelete('_JobSchedule', async (request) => {
await removeCronJob(request.object.get("jobName"))
})
reAddAllJob();