Extending Background
Background is designed with extensibility in mind, allowing you to create custom implementations tailored to your specific requirements.
Core Design Principles
Background follows object-oriented design principles. When extending its functionality, it's recommended to maintain these principles in your custom implementations.
Key OOP Concepts
To effectively extend Background, familiarity with these concepts is essential:
- Inheritance: Creating new classes based on existing ones
- Polymorphism: Implementing interfaces with various concrete implementations
- Encapsulation: Controlling access to class components through access modifiers
The JobQueue Class
The JobQueue
class serves as the foundation for all queue implementations in Background. When creating custom queues, you'll extend this class and work with its:
- Protected properties
- Protected methods
- Storage implementation patterns
The class interacts with storage implementations that follow the JobStorage
interface.
JobStorage Interface
export interface JobStorage {
saveJob(job: Job): Promise<void>;
getJob(id: string): Promise<Job | null>;
getJobsByStatus(status: JobStatus): Promise<Job[]>;
updateJob(job: Job): Promise<void>;
acquireNextJob(): Promise<Job | null>;
completeJob(jobId: string, result: any): Promise<void>;
failJob(jobId: string, error: string): Promise<void>;
}
Extending the Storage Interface
You can extend the base JobStorage
interface to add specialized functionality:
export interface RedisStorage extends JobStorage {
// Atomic job acquisition with optional time-to-live
acquireNextJob(ttl?: number): Promise<Job | null>;
// Retrieve jobs by priority level
getJobsByPriority(priority: number): Promise<Job[]>;
// Get scheduled jobs within a specified time range
getScheduledJobs(startTime: Date, endTime?: Date): Promise<Job[]>;
// Remove all jobs from storage
clear(): Promise<void>;
}
Creating Custom Queues
Implementation Steps
- Create a class that extends
JobQueue
- Initialize storage and configuration in the constructor
- Call the parent constructor using
super()
- Override key methods to customize behavior
Required Overrides
At minimum, you'll typically need to override these methods:
protected async processNextBatch(): Promise<void>
protected async processJob(job: Job): Promise<void>
Implement additional private methods as needed for your specific implementation.
Job Interface Reference
Here's the complete Job
interface for reference:
// Job status types
export type JobStatus = "pending" | "processing" | "completed" | "failed";
// Job interface
export interface Job<T = any> {
id: string;
name: string;
data: T;
status: JobStatus;
createdAt: Date;
scheduledAt?: Date;
startedAt?: Date;
completedAt?: Date;
error?: string;
priority?: number;
result?: any;
retryCount?: number;
repeat?: {
every?: number;
unit?: "seconds" | "minutes" | "hours" | "days" | "weeks" | "months";
startDate?: Date;
endDate?: Date;
limit?: number;
};
}
// Job handler type
export type JobHandler<T = any, R = any> = (data: T) => Promise<R>;
Best Practices
- Review the source code before implementing custom extensions
- Maintain consistent error handling patterns
- Follow the existing architectural patterns where possible
- Document custom implementations thoroughly
Source Code Reference
For a deeper understanding of Background internals, refer to the GitHub Repository.
Example MSSQL Implementation(Not Tested)
This implementation is not tested so be careful.
// mssql-storage.ts
import { Job, JobStatus, JobStorage } from "@backgroundjs/core";
import { ConnectionPool} from "mssql";
export class MSSQLStorage implements JobStorage {
private readonly tableName: string;
private readonly pool: ConnectionPool;
private initialized: boolean = false;
private logging: boolean = true;
private readonly staleJobTimeout: number = 1000 * 60 * 5; // 5 minutes
constructor(tableName: string, pool: ConnectionPool, options: {
staleJobTimeout?: number;
logging?: boolean;
} = {}) {
this.tableName = tableName;
this.pool = pool;
this.logging = options.logging ?? true;
this.staleJobTimeout = options.staleJobTimeout ?? 1000 * 60 * 5; // 5 minutes
}
async initialize(): Promise<void> {
if(!this.pool.connected){
return;
}
const client = this.pool.request();
try {
// CREATE TABLE
await client.query(`
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='${this.tableName}' AND xtype='U')
EXEC('CREATE TABLE ${this.tableName} (
id NVARCHAR(255) PRIMARY KEY,
name NVARCHAR(255) NOT NULL,
data NVARCHAR(MAX) NOT NULL,
status NVARCHAR(255) NOT NULL,
created_at DATETIME NOT NULL,
scheduled_at DATETIME,
started_at DATETIME,
completed_at DATETIME,
error NVARCHAR(MAX),
priority INT DEFAULT 0,
result NVARCHAR(MAX),
repeat NVARCHAR(MAX),
retry_count INT DEFAULT 0,
timeout INT DEFAULT 1000
)')
`);
// CREATE INDEX 1
await client.query(`
IF NOT EXISTS (
SELECT * FROM sys.indexes WHERE name = '${this.tableName}_status_idx'
)
EXEC('CREATE INDEX ${this.tableName}_status_idx ON ${this.tableName} (status)')
`);
// CREATE INDEX 2
await client.query(`
IF NOT EXISTS (
SELECT * FROM sys.indexes WHERE name = '${this.tableName}_scheduled_at_idx'
)
EXEC('CREATE INDEX ${this.tableName}_scheduled_at_idx ON ${this.tableName} (scheduled_at)')
`);
this.initialized = true;
} finally {
}
}
async saveJob(job: Job): Promise<void> {
try {
if (!this.initialized) {
await this.initialize();
}
if(!this.pool.connected){
return;
}
const request = this.pool.request();
request.input('p1', job.id);
request.input('p2', job.name);
request.input('p3', JSON.stringify(job.data));
request.input('p4', job.status);
request.input('p5', job.createdAt);
request.input('p6', job.scheduledAt || null);
request.input('p7', job.retryCount || 0);
request.input('p8', job.priority || 0);
request.input('p9', JSON.stringify(job.repeat));
request.input('p10', job.timeout || 1000);
await request.query(
`INSERT INTO ${this.tableName} (
id, name, data, status, created_at, scheduled_at, retry_count, priority, repeat, timeout
) VALUES (@p1, @p2, @p3, @p4, @p5, @p6, @p7, @p8, @p9, @p10)`
);
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error saving job:`, error);
}
}
}
async getJob(id: string): Promise<Job | null> {
try {
if (!this.initialized) {
await this.initialize();
}
if(!this.pool.connected){
return null;
}
const request = this.pool.request();
request.input('id', id);
const result = await request.query(
`SELECT * FROM ${this.tableName} WHERE id = @id`
);
if (result.recordset.length === 0) {
return null;
}
return this.mapRowToJob(result.recordset[0]);
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error getting job:`, error);
}
return null;
}
}
async getJobsByStatus(status: JobStatus): Promise<Job[]> {
try {
if (!this.initialized) {
await this.initialize();
}
if(!this.pool.connected){
return [];
}
const request = this.pool.request();
request.input('status', status);
const result = await request.query(
`SELECT * FROM ${this.tableName} WHERE status = @status`
);
return result.recordset.map((row) => this.mapRowToJob(row));
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error getting jobs by status:`, error);
}
return [];
}
}
async updateJob(job: Job): Promise<void> {
try {
if (!this.initialized) {
await this.initialize();
}
if(!this.pool.connected){
return;
}
const request = this.pool.request();
request.input('id', job.id);
request.input('name', job.name);
request.input('data', JSON.stringify(job.data));
request.input('status', job.status);
request.input('scheduledAt', job.scheduledAt || null);
request.input('startedAt', job.startedAt || null);
request.input('completedAt', job.completedAt || null);
request.input('error', job.error || null);
request.input('result', job.result ? JSON.stringify(job.result) : null);
request.input('repeat', job.repeat ? JSON.stringify(job.repeat) : null);
request.input('retryCount', job.retryCount || 0);
request.input('priority', job.priority || 0);
request.input('timeout', job.timeout || 1000);
const result = await request.query(
`UPDATE ${this.tableName} SET
name = @name,
data = @data,
status = @status,
scheduled_at = @scheduledAt,
started_at = @startedAt,
completed_at = @completedAt,
error = @error,
result = @result,
repeat = @repeat,
retry_count = @retryCount,
priority = @priority,
timeout = @timeout
WHERE id = @id`
);
if (result.rowsAffected[0] === 0) {
throw new Error(`Job with ID ${job.id} not found`);
}
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error updating job:`, error);
}
}
}
async acquireNextJob(): Promise<Job | null> {
if (!this.initialized) {
await this.initialize();
}
if(!this.pool.connected){
return null;
}
const transaction = this.pool.transaction();
try {
await transaction.begin();
const request = transaction.request();
const now = new Date();
request.input('now', now);
request.input('staleTime', new Date(Date.now() - this.staleJobTimeout));
// Select the next pending job with locking
const query = await request.query(
`SELECT TOP 1 * FROM ${this.tableName} WITH (UPDLOCK, READPAST)
WHERE (
status = 'pending'
AND (scheduled_at IS NULL OR scheduled_at <= @now)
)
OR (
status = 'processing'
AND started_at IS NOT NULL
AND completed_at IS NULL
AND started_at < @staleTime
)
ORDER BY priority ASC, created_at ASC`,
);
if (query.recordset.length === 0) {
await transaction.rollback();
return null;
}
let job = query.recordset[0];
// Update the job status to 'processing'
request.input('id', job.id);
await request.query(
`UPDATE ${this.tableName} SET status = 'processing', started_at = @now WHERE id = @id`);
job = this.mapRowToJob(job) as Job;
if (job) {
job.status = "processing";
job.startedAt = now;
}
await transaction.commit();
return job;
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error acquiring next job:`, error);
}
try { await transaction.rollback(); } catch {}
return null;
}
}
async completeJob(jobId: string, result: any): Promise<void> {
try {
if (!this.initialized) {
await this.initialize();
}
const request = this.pool.request();
request.input('id', jobId);
request.input('result', JSON.stringify(result));
await request.query(
`UPDATE ${this.tableName} SET status = 'completed', completed_at = GETDATE(), result = @result WHERE id = @id`
);
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error completing job:`, error);
}
}
}
async failJob(jobId: string, error: string): Promise<void> {
try {
if (!this.initialized) {
await this.initialize();
}
const request = this.pool.request();
request.input('id', jobId);
request.input('error', error);
await request.query(
`UPDATE ${this.tableName} SET status = 'failed', completed_at = GETDATE(), error = @error WHERE id = @id`
);
} catch (error) {
if (this.logging) {
console.error(`[MSSQLJobStorage] Error failing job:`, error);
}
}
}
private mapRowToJob(row: any): Job {
return {
id: row.id,
name: row.name,
data: typeof row.data === "string" ? JSON.parse(row.data) : row.data,
status: row.status as JobStatus,
createdAt: new Date(row.created_at),
scheduledAt: row.scheduled_at ? new Date(row.scheduled_at) : undefined,
startedAt: row.started_at ? new Date(row.started_at) : undefined,
completedAt: row.completed_at ? new Date(row.completed_at) : undefined,
error: row.error || undefined,
result: row.result
? typeof row.result === "string"
? JSON.parse(row.result)
: row.result
: undefined,
repeat: row.repeat
? typeof row.repeat === "string"
? JSON.parse(row.repeat)
: row.repeat
: undefined,
retryCount: row.retry_count || 0,
};
}
}
// mssql-queue.ts
import { Job, JobQueue, JobStatus } from "@backgroundjs/core";
import { MSSQLStorage } from "./msssql-storage";
export class MSSQLJobQueue extends JobQueue {
private readonly mssqlStorage :MSSQLStorage;
constructor(
mssqlStorage: MSSQLStorage,
options:{
concurrency?: number;
maxRetries?: number;
name?: string;
processingInterval?: number;
logging?: boolean;
intelligentPolling?: boolean;
minInterval?: number;
maxInterval?: number;
maxEmptyPolls?: number;
loadFactor?: number;
standAlone?: boolean;
} = {}
){
super(mssqlStorage, options);
this.mssqlStorage = mssqlStorage as MSSQLStorage;
this.concurrency = options.concurrency || 1;
this.logging = options.logging || false;
this.standAlone = options.standAlone ?? true;
}
protected async processNextBatch(): Promise<void> {
try {
if (this.isStopping && this.logging) {
console.log(`[${this.name}] Stopping job queue ... skipping`);
}
if (this.activeJobs.size >= this.concurrency || this.isStopping) {
return;
}
const availableSlots = this.concurrency - this.activeJobs.size;
let jobsProcessed = 0;
for (let i = 0; i < availableSlots; i++) {
const job = await this.mssqlStorage.acquireNextJob();
if (!job) {
break;
}
if (this.logging) {
console.log(`[${this.name}] Processing job:`, job);
console.log(
`[${this.name}] Available handlers:`,
Array.from(this.handlers.keys()),
);
console.log(
`[${this.name}] Has handler for ${job.name}:`,
this.handlers.has(job.name),
);
}
this.activeJobs.add(job.id);
this.processJob(job).finally(() => {
this.activeJobs.delete(job.id);
});
jobsProcessed++;
}
this.updatePollingInterval(jobsProcessed > 0);
} catch (error) {
if (this.logging) {
console.error(`[${this.name}] Error in processNextBatch:`, error);
}
}
}
protected async processJob(job: Job): Promise<void> {
try {
if (this.logging) {
console.log(
`[${this.name}] Starting to process job ${job.id} (${job.name})`,
);
}
await super.processJob(job);
if (this.logging && job.repeat) {
console.log(`[${this.name}] Completed repeatable job ${job.id}`);
}
} catch (error) {
const retryCount = job.retryCount || 0;
if (retryCount < this.maxRetries) {
const updatedJob: Job = {
...job,
status: "pending" as JobStatus,
retryCount: retryCount + 1,
error: `${error instanceof Error ? error.message : String(error)} (Retry ${retryCount + 1}/${this.maxRetries})`,
};
await this.mssqlStorage.updateJob(updatedJob);
} else {
job.status = "failed";
job.completedAt = new Date();
job.error = `Failed after ${this.maxRetries} retries. Last error: ${error instanceof Error ? error.message : String(error)}`;
await this.mssqlStorage.updateJob(job);
}
}
}
}
// server.ts
import { Hono } from "hono";
import { serve } from "@hono/node-server";
import pool from "mssql";
import { MSSQLStorage } from "./msssql-storage";
import { MSSQLJobQueue } from "./mssql-queue";
const mssqlPool = new pool.ConnectionPool({
server: 'localhost',
database: 'backgroundjs',
user: 'appuser',
password: '12345',
options: {
encrypt: false,
trustServerCertificate: true,
enableArithAbort: true
}
});
mssqlPool.connect()
.then(() => {
console.log("Connected to MSSQL");
})
.catch((err) => {
console.error("Error connecting to MSSQL", err);
});
const app = new Hono();
const storage = new MSSQLStorage("jobs", mssqlPool);
const queue = new MSSQLJobQueue(storage, { processingInterval: 200, logging: true });
queue.register("test-job", async (data: any) => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log("test-job", data);
});
queue.start();
app.post("/job", async (c) => {
const { data } = await c.req.json();
await queue.add("test-job", data);
return c.json({ message: "Job added" });
});
["SIGINT", "SIGTERM"].forEach((signal) => {
process.on(signal, () => {
queue.stop();
mssqlPool.close();
process.exit(0);
});
});
console.log("Starting server on port 3000");
serve({
fetch: app.fetch,
port: 3000,
});