Hono
Implementing Pulse with hono is simple.
Here is the structured approach to implement queues in hono.
Example Usage
typescript
// types.ts
import { JobQueue } from "@pulsejs/core";
declare module "hono" {
interface ContextVariableMap {
queues?: JobQueue[];
}
}
// honoJobs.middleware.ts
import { Hono } from "hono";
import { JobQueue } from "@pulsejs/core;
export const honoJobs = (app: Hono, queues: JobQueue[]) => {
queues.forEach((queue) => {
queue.start();
});
app.use(async (c, next) => {
c.set("queues", queues);
await next();
});
// Stop all queues when the process is terminated
["SIGTERM", "SIGINT", "SIGKILL"].forEach((signal) => {
process.on(signal, () => {
queues.forEach((queue) => {
queue.stop();
});
});
});
};
// server.ts
import { Hono } from "hono";
import {
PostgreSQLJobQueue,
PostgreSQLJobStorage,
QueueEvent,
} from "@pulsejs/core";
import { serve } from "@hono/node-server";
import dotenv from "dotenv";
import { honoJobs } from "./honoJobs.middleware";
import { Pool } from "pg";
const app = new Hono();
const pool = new Pool({
connectionString: process.env.TEST_POSTGRESQL_URL!,
});
const storage = new PostgreSQLJobStorage(pool, {
tableName: "jobs",
});
const queue = new PostgreSQLJobQueue(storage, {
concurrency: 2,
maxRetries: 3,
name: "test-queue",
processingInterval: 1000,
});
queue.register("test-job", async (data) => {
await new Promise((resolve) => setTimeout(resolve, 3000));
return data;
});
queue.addEventListener("completed", async (event: QueueEvent) => {
console.log(event.data);
});
honoJobs(app, [queue]);
app.get("/job/:id", async (c) => {
const jobId = c.req.param("id");
const job = await storage.getJob(jobId);
return c.json(job);
});
app.post("/job", async (c) => {
const data = await c.req.json();
const queues = c.get("queues");
const testQueue = queues?.find((q) => q.getName() === "test-queue");
if (!testQueue) {
return c.json({ error: "Queue not found" }, 404);
}
const job = await testQueue.add("test-job", data);
return c.json(job);
});
console.log("Starting server on port 3000");
serve({
fetch: app.fetch,
port: 3000,
});