Files
echoboard/packages/api/src/cron/index.ts

200 lines
6.3 KiB
TypeScript

import cron from "node-cron";
import { unlink } from "node:fs/promises";
import { resolve } from "node:path";
import prisma from "../lib/prisma.js";
import { config } from "../config.js";
import { cleanExpiredChallenges } from "../routes/passkey.js";
import { cleanupExpiredTokens } from "../lib/token-blocklist.js";
import { getPluginCronJobs } from "../plugins/loader.js";
import { cleanupViews } from "../lib/view-tracker.js";
import {
takeVoteSnapshots,
scanVoteVelocity,
scanPostVelocity,
scanIdentityClusters,
scanInflectionPoints,
scanCohortArrivals,
scanReferrerConcentration,
scanPostSimilarity,
recalculateBoardBaselines,
scanVoterOverlap,
scanOutboundLinks,
scanCommentVoteRatio,
scanVoteDistribution,
compareSeasonalBaseline,
pruneOldSnapshots,
pruneOldAnomalyEvents,
scanOffHoursActivity,
buildVoterGraph,
} from "../services/detection-engine.js";
export function startCronJobs() {
// prune old activity events - daily at 3am
cron.schedule("0 3 * * *", async () => {
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - config.DATA_RETENTION_ACTIVITY_DAYS);
const result = await prisma.activityEvent.deleteMany({
where: { createdAt: { lt: cutoff } },
});
if (result.count > 0) {
console.log(`Pruned ${result.count} old activity events`);
}
});
// prune orphaned anonymous users - daily at 4am
cron.schedule("0 4 * * *", async () => {
const cutoff = new Date();
cutoff.setDate(cutoff.getDate() - config.DATA_RETENTION_ORPHAN_USER_DAYS);
const result = await prisma.user.deleteMany({
where: {
authMethod: "COOKIE",
createdAt: { lt: cutoff },
posts: { none: {} },
comments: { none: {} },
votes: { none: {} },
},
});
if (result.count > 0) {
console.log(`Pruned ${result.count} orphaned users`);
}
});
// clean webauthn challenges - every minute
cron.schedule("* * * * *", () => {
cleanExpiredChallenges();
});
// clean expired recovery codes - daily at 3:30am
cron.schedule("30 3 * * *", async () => {
const result = await prisma.recoveryCode.deleteMany({
where: { expiresAt: { lt: new Date() } },
});
if (result.count > 0) {
console.log(`Cleaned ${result.count} expired recovery codes`);
}
});
// clean expired blocked tokens - every 30 minutes
cron.schedule("*/30 * * * *", async () => {
const count = await cleanupExpiredTokens();
if (count > 0) {
console.log(`Cleaned ${count} expired blocked tokens`);
}
});
// remove push subscriptions with too many failures - daily at 5am
cron.schedule("0 5 * * *", async () => {
const result = await prisma.pushSubscription.deleteMany({
where: { failureCount: { gte: 3 } },
});
if (result.count > 0) {
console.log(`Cleaned ${result.count} failed push subscriptions`);
}
});
// clean orphaned attachments (uploaded but never linked) - hourly
cron.schedule("30 * * * *", async () => {
const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000);
const orphans = await prisma.attachment.findMany({
where: {
postId: null,
commentId: null,
createdAt: { lt: cutoff },
},
select: { id: true, path: true },
});
if (orphans.length === 0) return;
const ids = orphans.map((a) => a.id);
const result = await prisma.attachment.deleteMany({
where: { id: { in: ids }, postId: null, commentId: null },
});
if (result.count > 0) {
console.log(`Cleaned ${result.count} orphaned attachments`);
}
for (const att of orphans) {
try {
await unlink(resolve(process.cwd(), "uploads", att.path));
} catch {}
}
});
// clean expired view-tracker entries - every 5 minutes
cron.schedule("*/5 * * * *", () => { cleanupViews(); });
// --- anti-brigading detection jobs ---
// every 5 min: snapshots + velocity scans
cron.schedule("*/5 * * * *", async () => {
await takeVoteSnapshots().catch(() => {});
await scanVoteVelocity().catch(() => {});
await scanPostVelocity().catch(() => {});
});
// every 10 min: identity clustering
cron.schedule("*/10 * * * *", async () => {
await scanIdentityClusters().catch(() => {});
});
// every 15 min: inflection point detection
cron.schedule("*/15 * * * *", async () => {
await scanInflectionPoints().catch(() => {});
});
// every 30 min: cohort arrivals, referrer analysis, post similarity
cron.schedule("*/30 * * * *", async () => {
await scanCohortArrivals().catch(() => {});
await scanReferrerConcentration().catch(() => {});
await scanPostSimilarity().catch(() => {});
});
// hourly: baselines, overlap, links, comment ratio, off-hours, voter graph
cron.schedule("0 * * * *", async () => {
await recalculateBoardBaselines().catch(() => {});
await scanVoterOverlap().catch(() => {});
await scanOutboundLinks().catch(() => {});
await scanCommentVoteRatio().catch(() => {});
await scanOffHoursActivity().catch(() => {});
const boards = await prisma.board.findMany({ select: { id: true } }).catch(() => [] as { id: string }[]);
for (const b of boards) {
await buildVoterGraph(b.id).catch(() => {});
}
});
// daily at 3am: distribution, seasonal, pruning
cron.schedule("0 3 * * *", async () => {
await scanVoteDistribution().catch(() => {});
await compareSeasonalBaseline().catch(() => {});
await pruneOldSnapshots().catch(() => {});
await pruneOldAnomalyEvents().catch(() => {});
});
// register plugin-provided cron jobs (min interval: every minute, reject sub-minute)
for (const job of getPluginCronJobs()) {
if (!cron.validate(job.schedule)) {
console.error(`Plugin cron "${job.name}" has invalid schedule: ${job.schedule}, skipping`);
continue;
}
// reject schedules with 6 fields (seconds) to prevent sub-minute execution
const fields = job.schedule.trim().split(/\s+/);
if (fields.length > 5) {
console.error(`Plugin cron "${job.name}" uses sub-minute schedule, skipping`);
continue;
}
cron.schedule(job.schedule, async () => {
try {
await job.handler();
} catch (err) {
console.error(`Plugin cron job "${job.name}" failed:`, err);
}
});
console.log(`Registered plugin cron: ${job.name} (${job.schedule})`);
}
}