Posted by abelanger 3 days ago
Just over a year ago, we launched Hatchet as a distributed task queue built on top of Postgres with a 100% MIT license (https://news.ycombinator.com/item?id=39643136). The feedback and response we got from the HN community was overwhelming. In the first month after launching, we processed about 20k tasks on the platform — today, we’re processing over 20k tasks per minute (>1 billion per month).
Scaling up this quickly was difficult — every task in Hatchet corresponds to at minimum 5 Postgres transactions and we would see bursts on Hatchet Cloud instances to over 5k tasks/second, which corresponds to roughly 25k transactions/second. As it turns out, a simple Postgres queue utilizing FOR UPDATE SKIP LOCKED doesn’t cut it at this scale. After provisioning the largest instance type that CloudSQL offers, we even discussed potentially moving some load off of Postgres in favor of something trendy like Clickhouse + Kafka.
But we doubled down on Postgres, and spent about 6 months learning how to operate Postgres databases at scale and reading the Postgres manual and several other resources [0] during commutes and at night. We stuck with Postgres for two reasons:
1. We wanted to make Hatchet as portable and easy to administer as possible, and felt that implementing our own storage engine specifically on Hatchet Cloud would be disingenuous at best, and in the worst case, would take our focus away from the open source community.
2. More importantly, Postgres is general-purpose, which is what makes it both great but hard to scale for some types of workloads. This is also what allows us to offer a general-purpose orchestration platform — we heavily utilize Postgres features like transactions, SKIP LOCKED, recursive queries, triggers, COPY FROM, and much more.
Which brings us to today. We’re announcing a full rewrite of the Hatchet engine — still built on Postgres — together with our task orchestration layer which is built on top of our underlying queue. To be more specific, we’re launching:
1. DAG-based workflows that support a much wider array of conditions, including sleep conditions, event-based triggering, and conditional execution based on parent output data [1].
2. Durable execution — durable execution refers to a function’s ability to recover from failure by caching intermediate results and automatically replaying them on a retry. We call a function with this ability a durable task. We also support durable sleep and durable events, which you can read more about here [2]
3. Queue features such as key-based concurrency queues (for implementing fair queueing), rate limiting, sticky assignment, and worker affinity.
4. Improved performance across every dimension we’ve tested, which we attribute to six improvements to the Hatchet architecture: range-based partitioning of time series tables, hash-based partitioning of task events (for updating task statuses), separating our monitoring tables from our queue, buffered reads and writes, switching all high-volume tables to use identity columns, and aggressive use of Postgres triggers.
We've also removed RabbitMQ as a required dependency for self-hosting.
We'd greatly appreciate any feedback you have and hope you get the chance to try out Hatchet.
[0] https://www.postgresql.org/docs/
These look like great projects to get something running quickly, but likely will experience many of the challenges Alexander mentioned under load. They look quite similar to our initial implementation using FOR UPDATE and maintaining direct connections from workers to PostgreSQL instead of a central orchestrator (a separate issue that deserves its own post).
One of the reasons for this decision to performantly support more complex scheduling requirements and durable execution patterns -- things like dynamic concurrency [0] or rate limits [1] which can be quite tricky to implement on a worker-pull model where there will likely be contention on these orchestration tables.
They also appear to be pure queues to run individual tasks in python only. We've been working hard on our py, ts, and go sdks
I'm excited to see how these projects approach these problems over time!
[0] https://docs.hatchet.run/home/concurrency [1] https://docs.hatchet.run/home/rate-limits
I've intentionally chosen simple over performance when the choice is there. Chancy still happily handles millions of jobs and workflows a day with dynamic concurrency and global rate limits, even in low-resource environments. But it would never scale horizontally to the same level you could achieve with RabbitMQ, and it's not meant for massive multi-tenant cloud hosting. It's just not the project's goal.
Chancy's aim is to be the low dependency, low infrastructure option that's "good enough" for the vast majority of projects. It has 1 required package dependency (the postgres driver) and 1 required infrastructure dependency (postgres) while bundling everything inside a single ASGI-embeddable process (no need for separate processes like flower or beat). It's used in many of my self-hosted projects, and in a couple of commercial projects to add ETL workflows, rate limiting, and observability to projects that were previously on Celery. Going from Celery to Chancy is typically just replacing your `delay()/apply_async()` with `push()` and swapping `@shared_task()` with `@job()`.
If you have hundreds of employees and need to run hundreds of millions of jobs a day, it's never going to be the right choice - go with something like Hatchet. Chancy's for teams of one to dozens that need a simple option while still getting things like global rate limits and workflows.
I’m curious: When you say FOR UPDATE SKIP LOCKED does not scale to 25k queries/s, did you observe a threshold at which it became untenable for you?
I’m also curious about the two points of:
- buffered reads and writes
- switching all high-volume tables to use identity columns
What do you mean by these? Were those (part of) the solution to scale FOR UPDATE SKIP LOCKED up to your needs?
Those other points are mostly unrelated to the core queue, and more related to helper tables for monitoring, tracking task statuses, etc. But it was important to optimize these tables because unrelated spikes on other tables in the database could start getting us into a deteriorated state as well.
To be more specific about the solutions here:
> buffered reads and writes
To run a task through the system, we need to write the task itself, write the instance of that retry of the count to the queue, write an event that the task has been queued, started, completed | failed, etc. Generally one task will correspond to many writes along the way, not all of which need to be extremely latency sensitive. So we started buffering items coming from our internal queues and flushing them once every 10ms, which helped considerably.
> switching all high-volume tables to use identity columns
We originally had combined some of our workflow tables with our monitoring tables -- this table was called `WorkflowRun` and it was used for both concurrency queues and queried when serving the API. This table used a UUID as the primary key, because we wanted UUIDs over the API instead of auto-incrementing IDs. The UUIDs caused some headaches down the line when trying to delete batches of data and prevent index bloat.
From the above link:[1]
> I found that performing extremely frequent vacuum analyze (every 30 minutes) helps a small amount but this is not that helpful so problems are still very apparent.
> The queue table itself fits in RAM (with 2M hugepages) and during the wait, all the performance counters drop to almost 0 - no disk read or write (semi-expected due to the table fitting in memory) with 100% buffer hit rate in pg_top and row read around 100/s which is much smaller than expected.
Bullet points 2 and 3 from here [2] are what first came to mind, due to the 100% buffer hit rate.
Note that vacuuming every 30min provided "minor improvements" but the worst case of:
25000 tps * 60sec *30min * 250rows == 11,250,000,000 ID's (assuming worst case every client locking conflicting rows)
Even:
25000tps 60sec 30minIs only two orders of magnitude away from blowing through the 32bit transaction ID's.
45,000,000
4,294,967,296
But XID exhaustion is not as hidden as the MXID exhaustion and will block all writes, while the harder to see MXID exhaustion will only block some writes.IMHO, if I was writing this, and knowing that you are writing an orchestration platform, getting rid of the long term transactions with just a status column would be better, row level locks are writing to the row anyways, actually twice.
tuple lock -> write row lock to xmax column -> release tuple lock.
Long lived transactions are always problematic for scaling, and that status column would allow for more recovery options etc...But to be honest, popping off the left of a red black tree like the linux scheduler does is probably so much better than fighting this IMHO.
This opinion is assuming I am reading this right from the linked to issue [1]
> SELECT FOR UPDATE SKIP LOCKED executes and the select processes wait for multiple minutes (10-20 minutes) before completing
There is a undocumented command pg_get_multixact_members() [3] that can help troubleshoot as many people are using hosted Postgres, the tools too look into the above problems can be limited.
It does appear that Amazon documents a bit about the above here [4].
[1] https://postgrespro.com/list/thread-id/2505440 [2] https://www.postgresql.org/docs/current/routine-vacuuming.ht... [3] https://doxygen.postgresql.org/multixact_8c.html#adf3c97f22b... [4] https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide...
Imo that's the killer feature of database-based queues, because it dramatically simplifies reasoning about retries, i.e. "did my endpoint logic commit _and_ my background operation enqueue both atomically commit, or atomically fail"?
Same thing for performing jobs, if my worker's business logic commits, but the job later retries (b/c marking the job as committed is a separate transaction), then oof, that's annoying.
And I might as well be using SQS at that point.
On your point of using transactions for idempotency: you’re right that it’s a great advantage of a db-based queue, but I’d be wary about taking it as a holy grail for a few reasons:
- it locks you into using a db-based queue. If for any reason you don’t want to anymore (eg you’re reaching scalability issues) it’ll be very difficult to switch to another queue system as you’re relying on transactions for idempotency.
- you only get transactional idempotency for db operations. Any other side effect won’t be automatically idempotent: external API calls, sending messages to other queues, writing files…
- if you decide to move some of your domain to another service, you lose transactional idempotency (it’s now two databases)
- relying on transactionality means you’re not resilient to having duplicate tasks in the queue (duplicate publishing). That can easily happen: bug of the publisher, two users triggering an action concurrently… it’s quite often a very normal thing to trigger the same action multiple times
So I’d avoid having my tasks rely on transactionality for idempotency, your system is much more resilient if you don’t
Amazing what you can do when you read the manual, eh?
Seriously though, that’s awesome, and I’m very happy to see someone leaning hard into RDBMS features like triggers instead of shying away.
Only fix we could find was using unlogged tables and a full vacuum on a schedule. We aren’t big Postgres experts but since you are I was wondering if you have fixed this issue/this framework works well for large payloads.
Nice work on the lite mode, open source, logging, dx interface.
You may want to replace Hello world examples with real world scenarios.
The workflows that involve multiple steps tasks, dag in your terminology - the code simply isn't intuitive.
You now have to get into the hatchets mindset, patterns, terminology. Eg: the random number example is riddled with too many. How many of the logos on your homepage did you have to write code for? Be honest.
Knowing to program should be 90% enough. Eg for js:
// send("hi", user => user.signed_up_today)
// .waitFor("7d")
// .send("upgrade", user => !user.upgraded)
Just made this up, but something like this is more readable. (PS:would love to be proved wrong by an implementation of exactly the above example here in the comments). The whole point of being smart is for your team at hatchet to absorb difficulty at the benefit of an easy interface that looks simple and magic. Your 5 line examples has types to learn, functions to learn, arguments to know, 5-10 kinds of things to learn. It showed little effort to make it easy for customers.An engineering post on what's under the hood makes sense. But customers really don't care about your cloud infra flexes in a post introducing your company pitching the product. It's just koolaid.
Same with complete rewrite so early. I'm glad you are open to change. But the workflow market today with so many options, i don't belive this is the last rewrite or pivot to come.
The DAGs itself aren't very readable. You are better off switching to something like react flow that lets you nocode edit as well.
Focus on automation journeys that are common. Like cookbooks. And allow folks to just import them or change some configurations. like drip marketing, renewals, expired cards, forgot password handlers, shortlink creators, maybe pdf merging, turning a bunch of saved links to a daily blog post, etc
How does a workflow replace a saas they are paying $99 for. That's powerful.
Tough to serialize a worflow to json . Or atleast didn't see it. this makes it easy to have workflows as code, create nocode editors in your own roadmap. You want people to hop from 1 company to another taking their hatchet workflows with them
Good luck, and sorry for coming off as rude. It's just a space I am very passionate about.
The open source support and QuickStart are excellent. The engineering work put into the system is very noticeable!