feat(cron): refactor cron utility with validation and metadata
- Add input validation for name, schedule expression, and handler - Store full CronEntry metadata (handler, schedule, timezone, registeredAt) instead of raw job instance to support introspection - Add JSDoc typedefs for CronEntry and improve all function docs - Use globalThis symbol store to survive Next.js hot-reload - Remove verbose per-run info logs to reduce noise - Replace `||` with `??` for runOnInit default to handle falsy correctly - Fix stop/stopAll to access `entry.job` from new storage structure
This commit is contained in:
+115
-87
@@ -1,7 +1,7 @@
|
|||||||
/**
|
/**
|
||||||
* Cron Utility
|
* Cron Utility
|
||||||
* Wrapper around node-cron for scheduling tasks
|
* Wrapper around node-cron for scheduling tasks
|
||||||
*
|
*
|
||||||
* Usage in modules:
|
* Usage in modules:
|
||||||
* import { schedule, validate } from '@zen/core/cron';
|
* import { schedule, validate } from '@zen/core/cron';
|
||||||
*/
|
*/
|
||||||
@@ -9,11 +9,20 @@
|
|||||||
import cron from 'node-cron';
|
import cron from 'node-cron';
|
||||||
import { done, fail, info } from '../../shared/lib/logger.js';
|
import { done, fail, info } from '../../shared/lib/logger.js';
|
||||||
|
|
||||||
// Store for all scheduled cron jobs
|
// Shared store — survives Next.js hot-reload and module-cache invalidation
|
||||||
const CRON_JOBS_KEY = Symbol.for('__ZEN_CRON_JOBS__');
|
const CRON_JOBS_KEY = Symbol.for('__ZEN_CRON_JOBS__');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize cron jobs storage
|
* @typedef {Object} CronEntry
|
||||||
|
* @property {Object} job - node-cron task instance
|
||||||
|
* @property {Function} handler - original handler function
|
||||||
|
* @property {string} schedule - cron expression
|
||||||
|
* @property {string} timezone - timezone used
|
||||||
|
* @property {string} registeredAt - ISO timestamp of registration
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @returns {Map<string, CronEntry>}
|
||||||
*/
|
*/
|
||||||
function getJobsStorage() {
|
function getJobsStorage() {
|
||||||
if (!globalThis[CRON_JOBS_KEY]) {
|
if (!globalThis[CRON_JOBS_KEY]) {
|
||||||
@@ -23,161 +32,180 @@ function getJobsStorage() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule a cron job
|
* Schedule a cron job.
|
||||||
* @param {string} name - Unique name for the job
|
*
|
||||||
* @param {string} schedule - Cron schedule expression
|
* If a job with the same name already exists it is stopped and replaced.
|
||||||
* @param {Function} handler - Handler function to execute
|
*
|
||||||
* @param {Object} options - Options
|
* @param {string} name - Unique name for the job
|
||||||
* @param {string} options.timezone - Timezone (default: from env or America/Toronto)
|
* @param {string} cronSchedule - Cron expression (5 or 6 fields)
|
||||||
* @param {boolean} options.runOnInit - Run immediately on schedule (default: false)
|
* @param {Function} handler - Async function to execute
|
||||||
* @returns {Object} Cron job instance
|
* @param {Object} [options]
|
||||||
*
|
* @param {string} [options.timezone] - IANA timezone (default: ZEN_TIMEZONE env or America/Toronto)
|
||||||
|
* @param {boolean} [options.runOnInit] - Run immediately when scheduled (default: false)
|
||||||
|
* @returns {Object} node-cron task instance
|
||||||
|
*
|
||||||
* @example
|
* @example
|
||||||
* schedule('my-task', '0 9 * * *', async () => {
|
* schedule('daily-report', '0 9 * * *', async () => {
|
||||||
* console.log('Running every day at 9 AM');
|
* await sendReport();
|
||||||
* });
|
* });
|
||||||
*
|
*
|
||||||
* @example
|
* @example
|
||||||
* schedule('reminder', ''\''*\/5 5-17 * * *'\'', async () => {
|
* schedule('every-5min', '*\/5 * * * *', async () => {
|
||||||
* console.log('Every 5 minutes between 5 AM and 5 PM');
|
* await syncData();
|
||||||
* }, { timezone: 'America/New_York' });
|
* }, { timezone: 'America/New_York' });
|
||||||
*/
|
*/
|
||||||
export function schedule(name, cronSchedule, handler, options = {}) {
|
export function schedule(name, cronSchedule, handler, options = {}) {
|
||||||
|
if (!name || typeof name !== 'string') {
|
||||||
|
throw new Error('Cron job name must be a non-empty string');
|
||||||
|
}
|
||||||
|
if (!validate(cronSchedule)) {
|
||||||
|
throw new Error(`Invalid cron expression: "${cronSchedule}"`);
|
||||||
|
}
|
||||||
|
if (typeof handler !== 'function') {
|
||||||
|
throw new Error('Cron job handler must be a function');
|
||||||
|
}
|
||||||
|
|
||||||
const jobs = getJobsStorage();
|
const jobs = getJobsStorage();
|
||||||
|
|
||||||
// Stop existing job with same name
|
// Replace existing job with same name
|
||||||
if (jobs.has(name)) {
|
if (jobs.has(name)) {
|
||||||
jobs.get(name).stop();
|
jobs.get(name).job.stop();
|
||||||
info(`Cron replaced: ${name}`);
|
info(`Cron replaced: ${name}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const timezone = options.timezone || process.env.ZEN_TIMEZONE || 'America/Toronto';
|
const timezone = options.timezone || process.env.ZEN_TIMEZONE || 'America/Toronto';
|
||||||
|
|
||||||
const job = cron.schedule(cronSchedule, async () => {
|
const job = cron.schedule(cronSchedule, async () => {
|
||||||
info(`Cron ${name} running at ${new Date().toISOString()}`);
|
|
||||||
try {
|
try {
|
||||||
await handler();
|
await handler();
|
||||||
info(`Cron ${name} completed`);
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
fail(`Cron ${name}: ${error.message}`);
|
fail(`Cron ${name}: ${error.message}`);
|
||||||
}
|
}
|
||||||
}, {
|
}, {
|
||||||
scheduled: true,
|
scheduled: true,
|
||||||
timezone,
|
timezone,
|
||||||
runOnInit: options.runOnInit || false
|
runOnInit: options.runOnInit ?? false
|
||||||
});
|
});
|
||||||
|
|
||||||
jobs.set(name, job);
|
jobs.set(name, {
|
||||||
|
job,
|
||||||
|
handler,
|
||||||
|
schedule: cronSchedule,
|
||||||
|
timezone,
|
||||||
|
registeredAt: new Date().toISOString()
|
||||||
|
});
|
||||||
|
|
||||||
done(`Cron scheduled: ${name} (${cronSchedule})`);
|
done(`Cron scheduled: ${name} (${cronSchedule})`);
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop a scheduled cron job
|
* Stop and remove a scheduled cron job.
|
||||||
|
*
|
||||||
* @param {string} name - Job name
|
* @param {string} name - Job name
|
||||||
* @returns {boolean} True if job was stopped
|
* @returns {boolean} True if the job existed and was stopped
|
||||||
*/
|
*/
|
||||||
export function stop(name) {
|
export function stop(name) {
|
||||||
const jobs = getJobsStorage();
|
const jobs = getJobsStorage();
|
||||||
|
|
||||||
if (jobs.has(name)) {
|
if (jobs.has(name)) {
|
||||||
jobs.get(name).stop();
|
jobs.get(name).job.stop();
|
||||||
jobs.delete(name);
|
jobs.delete(name);
|
||||||
info(`Cron stopped: ${name}`);
|
info(`Cron stopped: ${name}`);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop all cron jobs
|
* Stop and remove all scheduled cron jobs.
|
||||||
*/
|
*/
|
||||||
export function stopAll() {
|
export function stopAll() {
|
||||||
const jobs = getJobsStorage();
|
const jobs = getJobsStorage();
|
||||||
|
|
||||||
for (const [name, job] of jobs.entries()) {
|
for (const [name, entry] of jobs.entries()) {
|
||||||
job.stop();
|
entry.job.stop();
|
||||||
info(`Cron stopped: ${name}`);
|
info(`Cron stopped: ${name}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
jobs.clear();
|
jobs.clear();
|
||||||
done('All cron jobs stopped');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get status of all cron jobs
|
* Manually trigger a job by name, bypassing its schedule.
|
||||||
* @returns {Object} Status of all jobs
|
*
|
||||||
*/
|
|
||||||
export function getStatus() {
|
|
||||||
const jobs = getJobsStorage();
|
|
||||||
const status = {};
|
|
||||||
|
|
||||||
for (const [name] of jobs.entries()) {
|
|
||||||
status[name] = { running: true };
|
|
||||||
}
|
|
||||||
|
|
||||||
return status;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if a cron job is running
|
|
||||||
* @param {string} name - Job name
|
* @param {string} name - Job name
|
||||||
* @returns {boolean}
|
* @returns {Promise<void>}
|
||||||
|
* @throws {Error} If the job does not exist
|
||||||
*/
|
*/
|
||||||
export function isRunning(name) {
|
export async function trigger(name) {
|
||||||
const jobs = getJobsStorage();
|
const jobs = getJobsStorage();
|
||||||
return jobs.has(name);
|
|
||||||
|
if (!jobs.has(name)) {
|
||||||
|
throw new Error(`Cron job '${name}' not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
info(`Cron manual trigger: ${name}`);
|
||||||
|
await jobs.get(name).handler();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validate a cron expression
|
* Validate a cron expression.
|
||||||
* @param {string} expression - Cron expression to validate
|
*
|
||||||
* @returns {boolean} True if valid
|
* @param {string} expression
|
||||||
|
* @returns {boolean}
|
||||||
*/
|
*/
|
||||||
export function validate(expression) {
|
export function validate(expression) {
|
||||||
return cron.validate(expression);
|
return cron.validate(expression);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get list of all scheduled job names
|
* Check whether a job is currently scheduled.
|
||||||
* @returns {string[]} Array of job names
|
*
|
||||||
|
* @param {string} name - Job name
|
||||||
|
* @returns {boolean}
|
||||||
*/
|
*/
|
||||||
export function getJobs() {
|
export function isRunning(name) {
|
||||||
const jobs = getJobsStorage();
|
return getJobsStorage().has(name);
|
||||||
return Array.from(jobs.keys());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manually trigger a job by name
|
* Return the names of all scheduled jobs.
|
||||||
* @param {string} name - Job name
|
*
|
||||||
* @returns {Promise<void>}
|
* @returns {string[]}
|
||||||
*/
|
*/
|
||||||
export async function trigger(name) {
|
export function getJobs() {
|
||||||
const jobs = getJobsStorage();
|
return Array.from(getJobsStorage().keys());
|
||||||
|
|
||||||
if (!jobs.has(name)) {
|
|
||||||
throw new Error(`Cron job '${name}' not found`);
|
|
||||||
}
|
|
||||||
|
|
||||||
info(`Cron manual trigger: ${name}`);
|
|
||||||
// Note: node-cron doesn't expose the handler directly,
|
|
||||||
// so modules should keep their handler function accessible
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Re-export the raw cron module for advanced usage
|
/**
|
||||||
export { cron };
|
* Return metadata for all scheduled jobs.
|
||||||
|
*
|
||||||
|
* @returns {Object.<string, { schedule: string, timezone: string, registeredAt: string }>}
|
||||||
|
*/
|
||||||
|
export function getStatus() {
|
||||||
|
const status = {};
|
||||||
|
|
||||||
|
for (const [name, entry] of getJobsStorage().entries()) {
|
||||||
|
status[name] = {
|
||||||
|
schedule: entry.schedule,
|
||||||
|
timezone: entry.timezone,
|
||||||
|
registeredAt: entry.registeredAt
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
// Default export for convenience
|
|
||||||
export default {
|
export default {
|
||||||
schedule,
|
schedule,
|
||||||
stop,
|
stop,
|
||||||
stopAll,
|
stopAll,
|
||||||
getStatus,
|
|
||||||
isRunning,
|
|
||||||
validate,
|
|
||||||
getJobs,
|
|
||||||
trigger,
|
trigger,
|
||||||
cron
|
validate,
|
||||||
|
isRunning,
|
||||||
|
getJobs,
|
||||||
|
getStatus
|
||||||
};
|
};
|
||||||
|
|||||||
+22
-67
@@ -11,11 +11,11 @@ import {
|
|||||||
getAllDatabaseSchemas,
|
getAllDatabaseSchemas,
|
||||||
isModuleEnabled
|
isModuleEnabled
|
||||||
} from './registry.js';
|
} from './registry.js';
|
||||||
|
import { schedule, stopAll, getStatus } from '@zen/core/cron';
|
||||||
import { step, done, warn, fail, info } from '../../shared/lib/logger.js';
|
import { step, done, warn, fail, info } from '../../shared/lib/logger.js';
|
||||||
|
|
||||||
// Use globalThis to track initialization state
|
// Use globalThis to track initialization state
|
||||||
const INIT_KEY = Symbol.for('__ZEN_MODULES_INITIALIZED__');
|
const INIT_KEY = Symbol.for('__ZEN_MODULES_INITIALIZED__');
|
||||||
const CRON_JOBS_KEY = Symbol.for('__ZEN_MODULE_CRON_JOBS__');
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize all modules
|
* Initialize all modules
|
||||||
@@ -109,95 +109,50 @@ export async function initializeModuleDatabases() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start cron jobs for all enabled modules
|
* Start cron jobs for all enabled modules.
|
||||||
|
* Delegates scheduling to core/cron so all jobs share a single registry.
|
||||||
* @returns {Promise<Object>} Cron job start result
|
* @returns {Promise<Object>} Cron job start result
|
||||||
*/
|
*/
|
||||||
export async function startModuleCronJobs() {
|
export async function startModuleCronJobs() {
|
||||||
step('Starting cron jobs...');
|
step('Starting cron jobs...');
|
||||||
|
|
||||||
// Stop existing cron jobs first
|
// Clear any jobs registered by a previous init cycle
|
||||||
stopModuleCronJobs();
|
stopModuleCronJobs();
|
||||||
|
|
||||||
const jobs = getAllCronJobs();
|
const jobs = getAllCronJobs();
|
||||||
const result = {
|
const result = { started: [], errors: [] };
|
||||||
started: [],
|
|
||||||
errors: []
|
|
||||||
};
|
|
||||||
|
|
||||||
// Initialize cron jobs storage
|
|
||||||
if (!globalThis[CRON_JOBS_KEY]) {
|
|
||||||
globalThis[CRON_JOBS_KEY] = new Map();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const job of jobs) {
|
for (const job of jobs) {
|
||||||
try {
|
try {
|
||||||
if (job.handler && typeof job.handler === 'function') {
|
if (typeof job.handler !== 'function') continue;
|
||||||
// Dynamic import of node-cron
|
|
||||||
const cron = (await import('node-cron')).default;
|
schedule(job.name, job.schedule, job.handler, {
|
||||||
|
timezone: job.timezone
|
||||||
const cronJob = cron.schedule(job.schedule, async () => {
|
|
||||||
info(`Cron ${job.name} running at ${new Date().toISOString()}`);
|
|
||||||
try {
|
|
||||||
await job.handler();
|
|
||||||
info(`Cron ${job.name} completed`);
|
|
||||||
} catch (error) {
|
|
||||||
fail(`Cron ${job.name}: ${error.message}`);
|
|
||||||
}
|
|
||||||
}, {
|
|
||||||
scheduled: true,
|
|
||||||
timezone: job.timezone || process.env.ZEN_TIMEZONE || 'America/Toronto'
|
|
||||||
});
|
|
||||||
|
|
||||||
globalThis[CRON_JOBS_KEY].set(job.name, cronJob);
|
|
||||||
result.started.push(job.name);
|
|
||||||
info(`Cron ready: ${job.name} (${job.schedule})`);
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
result.errors.push({
|
|
||||||
job: job.name,
|
|
||||||
module: job.module,
|
|
||||||
error: error.message
|
|
||||||
});
|
});
|
||||||
|
result.started.push(job.name);
|
||||||
|
} catch (error) {
|
||||||
|
result.errors.push({ job: job.name, module: job.module, error: error.message });
|
||||||
fail(`Cron error for ${job.name}: ${error.message}`);
|
fail(`Cron error for ${job.name}: ${error.message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop all module cron jobs
|
* Stop all module cron jobs.
|
||||||
|
* Delegates to core/cron which owns the shared registry.
|
||||||
*/
|
*/
|
||||||
export function stopModuleCronJobs() {
|
export function stopModuleCronJobs() {
|
||||||
if (globalThis[CRON_JOBS_KEY]) {
|
stopAll();
|
||||||
for (const [name, job] of globalThis[CRON_JOBS_KEY].entries()) {
|
|
||||||
try {
|
|
||||||
job.stop();
|
|
||||||
info(`Cron stopped: ${name}`);
|
|
||||||
} catch (error) {
|
|
||||||
fail(`Error stopping cron ${name}: ${error.message}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
globalThis[CRON_JOBS_KEY].clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get status of all cron jobs
|
* Get status of all cron jobs.
|
||||||
* @returns {Object} Cron job status
|
* @returns {Object} Cron job status (from core/cron)
|
||||||
*/
|
*/
|
||||||
export function getCronJobStatus() {
|
export function getCronJobStatus() {
|
||||||
const status = {};
|
return getStatus();
|
||||||
|
|
||||||
if (globalThis[CRON_JOBS_KEY]) {
|
|
||||||
for (const [name, job] of globalThis[CRON_JOBS_KEY].entries()) {
|
|
||||||
status[name] = {
|
|
||||||
running: true // node-cron doesn't expose a running state easily
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return status;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
+2
-1
@@ -15,6 +15,7 @@ export default defineConfig([
|
|||||||
'src/features/admin/components/index.js',
|
'src/features/admin/components/index.js',
|
||||||
'src/core/api/index.js',
|
'src/core/api/index.js',
|
||||||
'src/core/api/route-handler.js',
|
'src/core/api/route-handler.js',
|
||||||
|
'src/core/cron/index.js',
|
||||||
'src/core/database/index.js',
|
'src/core/database/index.js',
|
||||||
'src/cli/database.js',
|
'src/cli/database.js',
|
||||||
'src/core/email/index.js',
|
'src/core/email/index.js',
|
||||||
@@ -41,7 +42,7 @@ export default defineConfig([
|
|||||||
splitting: false,
|
splitting: false,
|
||||||
sourcemap: false,
|
sourcemap: false,
|
||||||
clean: true,
|
clean: true,
|
||||||
external: ['react', 'react-dom', 'next', 'pg', 'dotenv', 'dotenv/config', 'resend', '@react-email/components', 'node-cron', 'readline', 'crypto', 'url', 'fs', 'path', 'net', 'dns', 'tls', '@zen/core/api', '@zen/core/database', '@zen/core/email', '@zen/core/email/templates', '@zen/core/storage', '@zen/core/toast', '@zen/core/modules/actions', '@zen/core/modules/storage', '@aws-sdk/client-s3', '@aws-sdk/s3-request-presigner'],
|
external: ['react', 'react-dom', 'next', 'pg', 'dotenv', 'dotenv/config', 'resend', '@react-email/components', 'node-cron', 'readline', 'crypto', 'url', 'fs', 'path', 'net', 'dns', 'tls', '@zen/core/api', '@zen/core/cron', '@zen/core/database', '@zen/core/email', '@zen/core/email/templates', '@zen/core/storage', '@zen/core/toast', '@zen/core/modules/actions', '@zen/core/modules/storage', '@aws-sdk/client-s3', '@aws-sdk/s3-request-presigner'],
|
||||||
noExternal: [],
|
noExternal: [],
|
||||||
bundle: true,
|
bundle: true,
|
||||||
banner: {
|
banner: {
|
||||||
|
|||||||
Reference in New Issue
Block a user