Building a Simple yet Robust Job Queue System Using Postgres
Introduction
In this article, we’ll walk through the rationale behind using Postgres as a queue, explore the schema design, implement concurrency-safe task polling with SELECT … FOR UPDATE SKIP LOCKED, and ensure that only one worker processes a specific task at any given time. By the time we’re done, you’ll have the blueprint for a dependable, transactionally-secure system for background task processing—all running in Postgres.
Table of Contents
- Why Use Postgres as a Job Queue?
- Key Concepts and Definitions
- Database Schema Design
- Locking in Postgres: FOR UPDATE SKIP LOCKED
- Inserting New Tasks
- Polling for Tasks to Process
- Updating Task Status
- Parallel Workers and Concurrency Considerations
- Retries and Error Handling
- Pros and Cons
- Sample Implementation Walkthrough
- Practical Usage Tips
- Conclusion and Reflective Question
In the PATREON article you will find 2 projects:
- JobProducer.dproj - this is the client program which requests some job to be done. In the example it can request 2 kind of jobs (“send_email”, “create_report”), but you can expand it to anything you need.
- QueueWorker.dproj - this is the actual worker. You can spawn multiple instances of this and check that the system assign any single job to a single instance of worker. If the worker crashes after getting a job, that job returns available for other workers.
1. Why Use Postgres as a Job Queue?
When talking about job queues, developers often reach for tools like Redis, RabbitMQ, or specialized systems like Sidekiq (in the Ruby ecosystem). Each of these is a perfectly valid option. However, there are compelling reasons to consider Postgres for your queue needs:
- Simplicity: If your infrastructure already relies on Postgres, you don’t need to install or configure another service. This can reduce maintenance overhead and the time it takes to get your queue up and running.
- Data Consistency: Postgres is a robust, ACID-compliant database. Managing tasks inside the same database ensures strong data consistency and transactional integrity.
- Reliability: If you already trust Postgres to hold critical application data, you might also trust it for your background job management. You avoid introducing multiple points of failure since everything is in one place.
- Ease of Backup: You can back up your queue along with your application data in a single process. No need to implement or maintain a separate backup strategy for an additional system.
However, one should be aware that Postgres might not be the perfect queue solution for huge-scale scenarios where you have tens of thousands of jobs posted per second. It is quite capable, but you should always measure performance and optimize accordingly, or perhaps consider more specialized systems in extreme cases. For most small-to-medium workloads, Postgres does just fine.
🔔 If you need a job queue with even more features but simpler to use offering a clean and simple REST API, check DMSContainer with its Event Stream module. It is ready to use ans using DMSContainer you can start to use queues, send email, generate Excel and PDF reports (and much more) in minutes. Check it out!
2. Key Concepts and Definitions
Before diving into the specifics, let’s lay out some terms we’ll be using:
- Job/Task: We’ll use the terms job and task interchangeably to refer to a unit of work that needs to be processed.
- Producer: An entity (usually part of your application) that creates tasks and inserts them into the queue.
- Consumer/Worker: A background process that retrieves pending tasks from the queue, processes them, and updates their status.
FOR UPDATE SKIP LOCKED
: A Postgres feature that allows you to lock rows as you select them, skipping any rows that are already locked by another transaction. This is key to avoiding conflicts when multiple workers are polling for tasks.
3. Database Schema Design
Designing a table to store tasks is relatively straightforward. Let’s outline a minimal schema you might use:
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
Here’s what each column does:
- id: A unique identifier for each task, generated automatically.
- status: Indicates the state of the task, such as ‘pending’, ‘in_progress’, ‘completed’, or ‘failed’.
- payload: Contains the data needed to execute the task. JSONB allows flexibility in storing a variety of data structures.
- created_at: Timestamp marking when the task was created.
- updated_at: Timestamp updated whenever the task’s status changes.
We’ll keep this schema lightweight. In practice, you might add indices on specific JSON fields, or add other columns like a priority level or references to other tables.
4. Locking in Postgres: FOR UPDATE SKIP LOCKED
The star of the show is Postgres’s row-level locking, specifically SELECT … FOR UPDATE SKIP LOCKED
. Introduced in PostgreSQL 9.5, SKIP LOCKED
ensures that if multiple transactions are attempting to lock the same rows, one transaction will lock them while the others will skip those rows that are already locked.
This means you can run the same SELECT
statement in parallel across multiple workers without risking duplication. Each worker grabs a row (i.e., a task) and locks it so no one else can. It’s perfect for distributing tasks across multiple workers, as it avoids collisions.
In short:
FOR UPDATE
: Acquires a lock on the row(s) so that no other transaction can modify them until the lock is released.SKIP LOCKED
: Tells Postgres not to wait for locked rows, but to skip over them and lock only the rows that are not currently locked.
5. Inserting New Tasks
When your application creates a new task, it just needs to insert a row into the tasks
table. Here’s a simple example:
INSERT INTO tasks (payload)
VALUES ('{"task_type": "send_email", "recipient": "[email protected]"}');
The status defaults to ‘pending’, created_at defaults to now()
, and the updated_at is also set to now()
. This is all standard DML you’re familiar with—no surprises here.
6. Polling for Tasks to Process
Now comes the important bit: how do we fetch tasks so that only one worker processes a given task at a time? Let’s suppose we have multiple worker processes (or threads) that each run a SELECT
statement to find the next task that’s pending.
Here’s a simplified version:
BEGIN;
WITH cte_task AS (
SELECT id
FROM tasks
WHERE status = 'pending'
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE tasks
SET status = 'in_progress',
updated_at = now()
FROM cte_task
WHERE tasks.id = cte_task.id
RETURNING tasks.id, tasks.payload;
We do the above in a single transaction:
- WITH cte_task: This common table expression locks exactly one task that’s in a ‘pending’ status. We use
ORDER BY id
just to give a deterministic ordering, though you could order by created date or priority if desired. - FOR UPDATE SKIP LOCKED: Locks the row and skips it if another worker has already locked the same row.
- LIMIT 1: Ensures we only lock one task at a time.
- UPDATE: After the CTE finds the row, we update its status to
in_progress
, effectively assigning the task to this worker. - RETURNING: We get back the
id
andpayload
for the task we plan to work on.
If the above query returns zero rows, it means there are no tasks in a ‘pending’ state. The worker can simply commit and sleep for a while before trying again.
If it returns a row, the worker can proceed to process the task. After finishing the work, the worker can issue another UPDATE statement to mark the task as ‘completed’, or maybe ‘failed’ if the processing encountered an error.
This approach ensures that each task is locked and updated in an atomic step, preventing other workers from grabbing the same task simultaneously.
7. Updating Task Status
Once a worker finishes the task, it needs to update the tasks
table accordingly:
UPDATE tasks
SET status = 'completed',
updated_at = now()
WHERE id = <task_id>;
Or, if the task processing fails, you could do:
UPDATE tasks
SET status = 'failed',
updated_at = now()
WHERE id = <task_id>;
You might also want to record error messages, the number of retries, or other relevant metadata. Consider adding these fields in your schema if you anticipate that your tasks might fail and need re-processing.
8. Parallel Workers and Concurrency Considerations
Using FOR UPDATE SKIP LOCKED
means multiple workers can run the same polling query at the same time without conflicting. Each worker ends up locking different rows. If one worker locks a row, other workers skip it. This concurrency approach is ideal for distributing tasks among a pool of workers.
However, you must manage how frequently each worker polls the table. Polling too frequently can cause a lot of overhead on your database, especially if you have many worker processes. There’s a trade-off between picking up tasks quickly and not hammering your database with too many SELECT statements.
9. Retries and Error Handling
Real-world job processing is never 100% error-free. You’ll need a strategy for tasks that fail. Suppose a worker picks up a task, but the job execution fails due to a network error or some other problem. You can:
- Mark the task as ‘failed’ and log the error details. Later, another subsystem or a specialized script can look for ‘failed’ tasks and decide whether to retry them or not.
- Automatically retry a limited number of times. You might track a
retry_count
column in thetasks
table. If it’s below some threshold, reassign the task to ‘pending’ status and incrementretry_count
.
Here’s an example of marking a task as failed and incrementing a retry_count
:
ALTER TABLE tasks ADD COLUMN retry_count INT NOT NULL DEFAULT 0;
BEGIN;
UPDATE tasks
SET status = 'failed',
retry_count = retry_count + 1,
updated_at = now()
WHERE id = <task_id>;
COMMIT;
From there, you can have a separate process or cron job that looks for failed
tasks with retry_count < 5
and moves them back to ‘pending’, effectively re-queuing them for another attempt. In simpler scenarios the worker can set the retry_count to retry_count + 1 and stop the task processing after specified a number of attempts.
10. Pros and Cons
Even though using Postgres as a job queue can be incredibly convenient, it’s not a universal solution. Below is a list of pros and cons that reflects the general consensus among developers:
Pros
- No Extra Infrastructure: You already have Postgres, so nothing else to install or maintain.
- Transactional Safety: If your code is tightly coupled with the data stored in Postgres, having tasks in the same place simplifies consistency.
- Row-Level Locking: Postgres has robust concurrency mechanisms.
FOR UPDATE SKIP LOCKED
is well-tested and reliable. - Backup and Restore: All data, including tasks, can be backed up together.
Cons
- Scalability: If you have extremely high throughput or require advanced queue features (like advanced routing, priority queues, etc.), specialized systems might be better.
- Database Load: Polling for tasks can add load to your primary database. You can mitigate this with thoughtful architecture or a replica, but it’s an extra consideration.
- Feature Limitation: Systems like RabbitMQ or Kafka offer sophisticated message routing, fan-out, and more. Postgres is simpler in that regard, but also less flexible for certain patterns.
11. Sample Implementation Walkthrough
Let’s walk through a hypothetical scenario with a small tech startup: Acme Email Services. They handle transactional emails. They need to queue email sending tasks to ensure they don’t overload their SMTP server.
-
Create the
tasks
table:CREATE TABLE tasks ( id SERIAL PRIMARY KEY, status VARCHAR(50) NOT NULL DEFAULT 'pending', payload JSONB NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT now(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT now() );
-
Insert tasks (producer side, e.g., from an API endpoint):
INSERT INTO tasks (payload) VALUES ('{"task_type": "send_email", "subject": "Welcome!", "recipient": "[email protected]", "body": "Hello and welcome!"}');
-
Worker process (pseudo-code in Python, for instance):
import psycopg2 import time def poll_and_process_tasks(): while True: conn = psycopg2.connect("dbname=acme user=postgres password=postgres") conn.autocommit = False try: with conn.cursor() as cur: cur.execute(""" WITH cte_task AS ( SELECT id, payload FROM tasks WHERE status = 'pending' ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE tasks SET status = 'in_progress', updated_at = now() FROM cte_task WHERE tasks.id = cte_task.id RETURNING tasks.id, tasks.payload; """) row = cur.fetchone() if row: task_id, task_payload = row # Process the task process_email(task_payload) # your custom function # Mark as completed cur.execute(""" UPDATE tasks SET status = 'completed', updated_at = now() WHERE id = %s """, (task_id,)) conn.commit() else: conn.rollback() # No tasks, sleep for a bit before checking again time.sleep(5) finally: conn.close() def process_email(payload): # Pseudo-code for sending the email # payload['task_type'] == 'send_email' # Use payload['recipient'], payload['subject'], etc. print(f"Sending email to {payload['recipient']}...") # Actually send the email here
In the above script:
- We connect to the database and start a transaction.
- We attempt to grab a single
pending
task using theFOR UPDATE SKIP LOCKED
technique. - If we successfully get one, we mark it as
in_progress
immediately. - We then process the email (omitting real email-sending code for brevity).
- If successful, we update its status to
completed
. - If no tasks are pending, we roll back (so we release any locks or partial updates) and sleep for a bit.
⭐ The complete Delphi version of the job queue system will be released to the PATREON supporter in the next days.
- Handling Errors: If an error occurs while sending the email, you can catch the exception, mark the task as
failed
, and possibly record the error. Then you can have a re-queue strategy if needed.
12. Practical Usage Tips
- Indexing: If you have a very high volume of tasks, consider creating an index on
(status)
or(status, id)
to speed up lookups for pending tasks. - Limit Polling Frequency: Implement a short delay for your workers or a backoff strategy to reduce database load.
- Use a Separate Table: If your application has multiple types of tasks or a huge number of tasks, you could separate them into different tables or even different schemas. This helps with organization and performance tuning.
- Monitoring: Keep an eye on the number of tasks in each status. Tools like Grafana or custom metrics can alert you if a backlog starts forming (e.g., if your tasks remain ‘pending’ for too long).
- Transaction Size: Be mindful of transaction boundaries. If you try to lock and process hundreds of tasks in a single transaction, you could hold locks for too long. Usually, you want to process tasks one by one or in small batches.
13. Conclusion and Reflective Question
By now, you’ve seen that using Postgres as a job queue can be both practical and elegant, especially if your environment already uses Postgres for other tasks. You avoid the complexity of additional systems, gain robust transactional guarantees, and leverage Postgres’s concurrency features to distribute tasks among multiple workers without duplication.
Using PostreSQL you can build a cluster of worker processes that reliably handled thousands of tasks per day. Store all logs and job metadata in Postgres, making it straightforward to query historical data, analyze performance, and manage error recovery. While solutions like DMSContainer/EventStream!, Redis, RabbitMQ, or Kafka might be more suitable in certain high-scale or specialized scenarios, the Postgres job queue approach is a strong contender for many small-to-medium-scale needs.
Reflective Question: Given your organization’s workload and infrastructure, do you need the specialized features of a dedicated queueing system, or can Postgres handle your job processing effectively enough to simplify your architecture? Drop us an email for specialized consulting & development.
References
-
PostgreSQL Official Documentation:
https://www.postgresql.org/docs/current/sql-select.html#SQL-FOR-UPDATE-SHARE
Contains details onFOR UPDATE SKIP LOCKED
usage and syntax. -
PostgreSQL Wiki on Queueing:
https://wiki.postgresql.org/wiki/Category:Queueing
Various community discussions and extensions for queue-like functionality in Postgres. -
ACID Properties:
https://en.wikipedia.org/wiki/ACID
Explanation of transaction guarantees that Postgres provides out of the box.
And that’s the story. Whether you’re building a side project, upgrading an internal tool, or need a quick but reliable solution for background processing, Postgres can be your job queue hero. By using a carefully designed schema, the magic of FOR UPDATE SKIP LOCKED
, and a bit of worker logic, you’ll have a system that keeps your tasks organized, your workers busy, and your infrastructure simplified. Enjoy the simplicity and reliability of your brand-new queue system—powered by good old Postgres!
Need More?
Join to get support the project and access to premium contents as articles, video and misc insigth.
Remember to join PATREON community to get valuable informations, tutorials, insight, get priority support and more. You can also support the project through Buy Me a Coffe and gets the same benefits.
In the PATREON article you will find 2 projects:
- JobProducer.dproj - this is the client program which requests some job to be done. In the example it can request 2 kind of jobs (“send_email”, “create_report”), but you can expand it to anything you need.
- QueueWorker.dproj - this is the actual worker. You can spawn multiple instances of this and check that the system assign any single job to a single instance of worker. If the worker crashes after getting a job, that job returns available for other workers.
Enjoy!
– Daniele Teti
Comments
comments powered by Disqus