Skip to content
Contents

Extending Background

Background is designed with extensibility in mind, allowing you to create custom implementations tailored to your specific requirements.

Core Design Principles

Background follows object-oriented design principles. When extending its functionality, it's recommended to maintain these principles in your custom implementations.

Key OOP Concepts

To effectively extend Background, familiarity with these concepts is essential:

  • Inheritance: Creating new classes based on existing ones
  • Polymorphism: Implementing interfaces with various concrete implementations
  • Encapsulation: Controlling access to class components through access modifiers

The JobQueue Class

The JobQueue class serves as the foundation for all queue implementations in Background. When creating custom queues, you'll extend this class and work with its:

  • Protected properties
  • Protected methods
  • Storage implementation patterns

The class interacts with storage implementations that follow the JobStorage interface.

JobStorage Interface

typescript
export interface JobStorage {
  saveJob(job: Job): Promise<void>;
  getJob(id: string): Promise<Job | null>;
  getJobsByStatus(status: JobStatus): Promise<Job[]>;
  updateJob(job: Job): Promise<void>;
  acquireNextJob(): Promise<Job | null>;
  completeJob(jobId: string, result: any): Promise<void>;
  failJob(jobId: string, error: string): Promise<void>;
}

Extending the Storage Interface

You can extend the base JobStorage interface to add specialized functionality:

typescript
export interface RedisStorage extends JobStorage {
  // Atomic job acquisition with optional time-to-live
  acquireNextJob(ttl?: number): Promise<Job | null>;
  
  // Retrieve jobs by priority level
  getJobsByPriority(priority: number): Promise<Job[]>;
  
  // Get scheduled jobs within a specified time range
  getScheduledJobs(startTime: Date, endTime?: Date): Promise<Job[]>;
  
  // Remove all jobs from storage
  clear(): Promise<void>;
}

Creating Custom Queues

Implementation Steps

  1. Create a class that extends JobQueue
  2. Initialize storage and configuration in the constructor
  3. Call the parent constructor using super()
  4. Override key methods to customize behavior

Required Overrides

At minimum, you'll typically need to override these methods:

typescript
protected async processNextBatch(): Promise<void>
protected async processJob(job: Job): Promise<void>

Implement additional private methods as needed for your specific implementation.

Job Interface Reference

Here's the complete Job interface for reference:

typescript
// Job status types
export type JobStatus = "pending" | "processing" | "completed" | "failed";

// Job interface
export interface Job<T = any> {
  id: string;
  name: string;
  data: T;
  status: JobStatus;
  createdAt: Date;
  scheduledAt?: Date;
  startedAt?: Date;
  completedAt?: Date;
  error?: string;
  priority?: number;
  result?: any;
  retryCount?: number;
  repeat?: {
    every?: number;
    unit?: "seconds" | "minutes" | "hours" | "days" | "weeks" | "months";
    startDate?: Date;
    endDate?: Date;
    limit?: number;
  };
}

// Job handler type
export type JobHandler<T = any, R = any> = (data: T) => Promise<R>;

Best Practices

  • Review the source code before implementing custom extensions
  • Maintain consistent error handling patterns
  • Follow the existing architectural patterns where possible
  • Document custom implementations thoroughly

Source Code Reference

For a deeper understanding of Background internals, refer to the GitHub Repository.

Example MSSQL Implementation(Not Tested)

This implementation is not tested so be careful.

typescript
// mssql-storage.ts

import { Job, JobStatus, JobStorage } from "@backgroundjs/core";
import { ConnectionPool} from "mssql";

export class MSSQLStorage implements JobStorage {
    private readonly tableName: string;
    private readonly pool: ConnectionPool;
    private initialized: boolean = false;
    private logging: boolean = true;
    private readonly staleJobTimeout: number = 1000 * 60 * 5; // 5 minutes

    constructor(tableName: string, pool: ConnectionPool, options: {
        staleJobTimeout?: number;
        logging?: boolean;
    } = {}) {
        this.tableName = tableName;
        this.pool = pool;
        this.logging = options.logging ?? true;
        this.staleJobTimeout = options.staleJobTimeout ?? 1000 * 60 * 5; // 5 minutes
    }

    async initialize(): Promise<void> {
        if(!this.pool.connected){
            return;
        }
        const client = this.pool.request();
        try {
            // CREATE TABLE
            await client.query(`
                IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='${this.tableName}' AND xtype='U')
                EXEC('CREATE TABLE ${this.tableName} (
                    id NVARCHAR(255) PRIMARY KEY,
                    name NVARCHAR(255) NOT NULL,
                    data NVARCHAR(MAX) NOT NULL,
                    status NVARCHAR(255) NOT NULL,
                    created_at DATETIME NOT NULL,
                    scheduled_at DATETIME,
                    started_at DATETIME,
                    completed_at DATETIME,
                    error NVARCHAR(MAX),
                    priority INT DEFAULT 0,
                    result NVARCHAR(MAX),
                    repeat NVARCHAR(MAX),
                    retry_count INT DEFAULT 0,
                    timeout INT DEFAULT 1000
                )')
            `);

            // CREATE INDEX 1
            await client.query(`
                IF NOT EXISTS (
                    SELECT * FROM sys.indexes WHERE name = '${this.tableName}_status_idx'
                )
                EXEC('CREATE INDEX ${this.tableName}_status_idx ON ${this.tableName} (status)')
            `);

            // CREATE INDEX 2
            await client.query(`
                IF NOT EXISTS (
                    SELECT * FROM sys.indexes WHERE name = '${this.tableName}_scheduled_at_idx'
                )
                EXEC('CREATE INDEX ${this.tableName}_scheduled_at_idx ON ${this.tableName} (scheduled_at)')
            `);

            this.initialized = true;
        } finally {
            
        }
    }

    async saveJob(job: Job): Promise<void> {
        try {
            if (!this.initialized) {
                await this.initialize();
            }
            if(!this.pool.connected){
                return;
            }
            const request = this.pool.request();
            request.input('p1', job.id);
            request.input('p2', job.name);
            request.input('p3', JSON.stringify(job.data));
            request.input('p4', job.status);
            request.input('p5', job.createdAt);
            request.input('p6', job.scheduledAt || null);
            request.input('p7', job.retryCount || 0);
            request.input('p8', job.priority || 0);
            request.input('p9', JSON.stringify(job.repeat));
            request.input('p10', job.timeout || 1000);

            await request.query(
                `INSERT INTO ${this.tableName} (
                    id, name, data, status, created_at, scheduled_at, retry_count, priority, repeat, timeout
                ) VALUES (@p1, @p2, @p3, @p4, @p5, @p6, @p7, @p8, @p9, @p10)`
            );
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error saving job:`, error);
            }
        }
    }

    async getJob(id: string): Promise<Job | null> {
        try {
            if (!this.initialized) {
                await this.initialize();
            }
            if(!this.pool.connected){
                return null;
            }
            const request = this.pool.request();
            request.input('id', id);
            const result = await request.query(
                `SELECT * FROM ${this.tableName} WHERE id = @id`
            );

            if (result.recordset.length === 0) {
                return null;
            }

            return this.mapRowToJob(result.recordset[0]);
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error getting job:`, error);
            }
            return null;
        }
    }

    async getJobsByStatus(status: JobStatus): Promise<Job[]> {
        try {
            if (!this.initialized) {
                await this.initialize();
            }
            if(!this.pool.connected){
                return [];
            }
            const request = this.pool.request();
            request.input('status', status);
            const result = await request.query(
                `SELECT * FROM ${this.tableName} WHERE status = @status`
            );

            return result.recordset.map((row) => this.mapRowToJob(row));
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error getting jobs by status:`, error);
            }
            return [];
        }
    }

    async updateJob(job: Job): Promise<void> {
        try {
            if (!this.initialized) {
                await this.initialize();
            }
            if(!this.pool.connected){
                return;
            }
            const request = this.pool.request();
            request.input('id', job.id);
            request.input('name', job.name);
            request.input('data', JSON.stringify(job.data));
            request.input('status', job.status);
            request.input('scheduledAt', job.scheduledAt || null);
            request.input('startedAt', job.startedAt || null);
            request.input('completedAt', job.completedAt || null);
            request.input('error', job.error || null);
            request.input('result', job.result ? JSON.stringify(job.result) : null);
            request.input('repeat', job.repeat ? JSON.stringify(job.repeat) : null);
            request.input('retryCount', job.retryCount || 0);
            request.input('priority', job.priority || 0);
            request.input('timeout', job.timeout || 1000);

            const result = await request.query(
                `UPDATE ${this.tableName} SET
                    name = @name,
                    data = @data,
                    status = @status,
                    scheduled_at = @scheduledAt,
                    started_at = @startedAt,
                    completed_at = @completedAt,
                    error = @error,
                    result = @result,
                    repeat = @repeat,
                    retry_count = @retryCount,
                    priority = @priority,
                    timeout = @timeout
                WHERE id = @id`
            );

            if (result.rowsAffected[0] === 0) {
                throw new Error(`Job with ID ${job.id} not found`);
            }
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error updating job:`, error);
            }
        }
    }

    async acquireNextJob(): Promise<Job | null> {
        if (!this.initialized) {
            await this.initialize();
        }
        if(!this.pool.connected){
            return null;
        }
        const transaction = this.pool.transaction();
        try {
            await transaction.begin();

            const request = transaction.request();
            const now = new Date();
            request.input('now', now);
            request.input('staleTime', new Date(Date.now() - this.staleJobTimeout));

            // Select the next pending job with locking
            const query = await request.query(
                `SELECT TOP 1 * FROM ${this.tableName} WITH (UPDLOCK, READPAST)
                 WHERE (
                    status = 'pending' 
                    AND (scheduled_at IS NULL OR scheduled_at <= @now)
                 )
                 OR (
                    status = 'processing' 
                    AND started_at IS NOT NULL 
                    AND completed_at IS NULL 
                    AND started_at < @staleTime
                 )
                 ORDER BY priority ASC, created_at ASC`,
            );

            if (query.recordset.length === 0) {
                await transaction.rollback();
                return null;
            }

            let job = query.recordset[0];

            // Update the job status to 'processing'
            request.input('id', job.id);
            await request.query(
                `UPDATE ${this.tableName} SET status = 'processing', started_at = @now WHERE id = @id`);

            job = this.mapRowToJob(job) as Job;
            if (job) {
                job.status = "processing";
                job.startedAt = now;
            }
            await transaction.commit();
            return job;
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error acquiring next job:`, error);
            }
            try { await transaction.rollback(); } catch {}
            return null;
        }
    }

    async completeJob(jobId: string, result: any): Promise<void> {
        try {
            if (!this.initialized) {
                await this.initialize();
            }
            const request = this.pool.request();
            request.input('id', jobId);
            request.input('result', JSON.stringify(result));
            await request.query(
                `UPDATE ${this.tableName} SET status = 'completed', completed_at = GETDATE(), result = @result WHERE id = @id`
            );
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error completing job:`, error);
            }
        }
    }

    async failJob(jobId: string, error: string): Promise<void> {
        try {
            if (!this.initialized) {
                await this.initialize();
            }
            const request = this.pool.request();
            request.input('id', jobId);
            request.input('error', error);
            await request.query(
                `UPDATE ${this.tableName} SET status = 'failed', completed_at = GETDATE(), error = @error WHERE id = @id`
            );
        } catch (error) {
            if (this.logging) {
                console.error(`[MSSQLJobStorage] Error failing job:`, error);
            }
        }
    }

    private mapRowToJob(row: any): Job {
        return {
            id: row.id,
            name: row.name,
            data: typeof row.data === "string" ? JSON.parse(row.data) : row.data,
            status: row.status as JobStatus,
            createdAt: new Date(row.created_at),
            scheduledAt: row.scheduled_at ? new Date(row.scheduled_at) : undefined,
            startedAt: row.started_at ? new Date(row.started_at) : undefined,
            completedAt: row.completed_at ? new Date(row.completed_at) : undefined,
            error: row.error || undefined,
            result: row.result
                ? typeof row.result === "string"
                    ? JSON.parse(row.result)
                    : row.result
                : undefined,
            repeat: row.repeat
                ? typeof row.repeat === "string"
                    ? JSON.parse(row.repeat)
                    : row.repeat
                : undefined,
            retryCount: row.retry_count || 0,
        };
    }
}

// mssql-queue.ts

import { Job, JobQueue, JobStatus } from "@backgroundjs/core";
import { MSSQLStorage } from "./msssql-storage";

export class MSSQLJobQueue extends JobQueue {
    private readonly mssqlStorage :MSSQLStorage;
    constructor(
        mssqlStorage: MSSQLStorage,
        options:{
            concurrency?: number;
            maxRetries?: number;
            name?: string;
            processingInterval?: number;
            logging?: boolean;
            intelligentPolling?: boolean;
            minInterval?: number;
            maxInterval?: number;
            maxEmptyPolls?: number;
            loadFactor?: number;
            standAlone?: boolean;
        } = {}
    ){
        super(mssqlStorage, options);
        this.mssqlStorage = mssqlStorage as MSSQLStorage;
        this.concurrency = options.concurrency || 1;
        this.logging = options.logging || false;
        this.standAlone = options.standAlone ?? true;
    }

    protected async processNextBatch(): Promise<void> {
        try {
          if (this.isStopping && this.logging) {
            console.log(`[${this.name}] Stopping job queue ... skipping`);
          }
          if (this.activeJobs.size >= this.concurrency || this.isStopping) {
            return;
          }
          const availableSlots = this.concurrency - this.activeJobs.size;
          let jobsProcessed = 0;
          for (let i = 0; i < availableSlots; i++) {
            const job = await this.mssqlStorage.acquireNextJob();
            if (!job) {
              break;
            }
            if (this.logging) {
              console.log(`[${this.name}] Processing job:`, job);
              console.log(
                `[${this.name}] Available handlers:`,
                Array.from(this.handlers.keys()),
              );
              console.log(
                `[${this.name}] Has handler for ${job.name}:`,
                this.handlers.has(job.name),
              );
            }
            this.activeJobs.add(job.id);
            this.processJob(job).finally(() => {
              this.activeJobs.delete(job.id);
            });
            jobsProcessed++;
          }
          this.updatePollingInterval(jobsProcessed > 0);
        } catch (error) {
          if (this.logging) {
            console.error(`[${this.name}] Error in processNextBatch:`, error);
          }
        }
    }

    protected async processJob(job: Job): Promise<void> {
        try {
          if (this.logging) {
            console.log(
              `[${this.name}] Starting to process job ${job.id} (${job.name})`,
            );
          }
    
          await super.processJob(job);
          if (this.logging && job.repeat) {
            console.log(`[${this.name}] Completed repeatable job ${job.id}`);
          }
        } catch (error) {
          const retryCount = job.retryCount || 0;
          if (retryCount < this.maxRetries) {
            const updatedJob: Job = {
              ...job,
              status: "pending" as JobStatus,
              retryCount: retryCount + 1,
              error: `${error instanceof Error ? error.message : String(error)} (Retry ${retryCount + 1}/${this.maxRetries})`,
            };
            await this.mssqlStorage.updateJob(updatedJob);
          } else {
            job.status = "failed";
            job.completedAt = new Date();
            job.error = `Failed after ${this.maxRetries} retries. Last error: ${error instanceof Error ? error.message : String(error)}`;
            await this.mssqlStorage.updateJob(job);
          }
        }
    }
}

// server.ts

import { Hono } from "hono";
import { serve } from "@hono/node-server";
import pool from "mssql";
import { MSSQLStorage } from "./msssql-storage";
import { MSSQLJobQueue } from "./mssql-queue";

const mssqlPool = new pool.ConnectionPool({
    server: 'localhost',
    database: 'backgroundjs',
    user: 'appuser',
    password: '12345',
    options: {
        encrypt: false,
        trustServerCertificate: true,
        enableArithAbort: true
    }
});

mssqlPool.connect()
.then(() => {
    console.log("Connected to MSSQL");
})
.catch((err) => {
    console.error("Error connecting to MSSQL", err);
});


const app = new Hono();
const storage = new MSSQLStorage("jobs", mssqlPool);
const queue = new MSSQLJobQueue(storage, { processingInterval: 200, logging: true });

queue.register("test-job", async (data: any) => {
    await new Promise(resolve => setTimeout(resolve, 1000));
    console.log("test-job", data);
});

queue.start();

app.post("/job", async (c) => {
    const { data } = await c.req.json();
    await queue.add("test-job", data);
    return c.json({ message: "Job added" });
});

["SIGINT", "SIGTERM"].forEach((signal) => {
    process.on(signal, () => {
        queue.stop();
        mssqlPool.close();
        process.exit(0);
    });
});

console.log("Starting server on port 3000");
serve({
    fetch: app.fetch,
    port: 3000,
});