Posted by abelanger 6 days ago
Just over a year ago, we launched Hatchet as a distributed task queue built on top of Postgres with a 100% MIT license (https://news.ycombinator.com/item?id=39643136). The feedback and response we got from the HN community was overwhelming. In the first month after launching, we processed about 20k tasks on the platform — today, we’re processing over 20k tasks per minute (>1 billion per month).
Scaling up this quickly was difficult — every task in Hatchet corresponds to at minimum 5 Postgres transactions and we would see bursts on Hatchet Cloud instances to over 5k tasks/second, which corresponds to roughly 25k transactions/second. As it turns out, a simple Postgres queue utilizing FOR UPDATE SKIP LOCKED doesn’t cut it at this scale. After provisioning the largest instance type that CloudSQL offers, we even discussed potentially moving some load off of Postgres in favor of something trendy like Clickhouse + Kafka.
But we doubled down on Postgres, and spent about 6 months learning how to operate Postgres databases at scale and reading the Postgres manual and several other resources [0] during commutes and at night. We stuck with Postgres for two reasons:
1. We wanted to make Hatchet as portable and easy to administer as possible, and felt that implementing our own storage engine specifically on Hatchet Cloud would be disingenuous at best, and in the worst case, would take our focus away from the open source community.
2. More importantly, Postgres is general-purpose, which is what makes it both great but hard to scale for some types of workloads. This is also what allows us to offer a general-purpose orchestration platform — we heavily utilize Postgres features like transactions, SKIP LOCKED, recursive queries, triggers, COPY FROM, and much more.
Which brings us to today. We’re announcing a full rewrite of the Hatchet engine — still built on Postgres — together with our task orchestration layer which is built on top of our underlying queue. To be more specific, we’re launching:
1. DAG-based workflows that support a much wider array of conditions, including sleep conditions, event-based triggering, and conditional execution based on parent output data [1].
2. Durable execution — durable execution refers to a function’s ability to recover from failure by caching intermediate results and automatically replaying them on a retry. We call a function with this ability a durable task. We also support durable sleep and durable events, which you can read more about here [2]
3. Queue features such as key-based concurrency queues (for implementing fair queueing), rate limiting, sticky assignment, and worker affinity.
4. Improved performance across every dimension we’ve tested, which we attribute to six improvements to the Hatchet architecture: range-based partitioning of time series tables, hash-based partitioning of task events (for updating task statuses), separating our monitoring tables from our queue, buffered reads and writes, switching all high-volume tables to use identity columns, and aggressive use of Postgres triggers.
We've also removed RabbitMQ as a required dependency for self-hosting.
We'd greatly appreciate any feedback you have and hope you get the chance to try out Hatchet.
[0] https://www.postgresql.org/docs/
- Does it support durable tasks that should be essentially ran forever and produce an endless "stream" of events, self-healing in case of intermittent failures? Or would those be a better fit for some different kind of orchestrator?
- Where and how task inputs and outputs are stored? Are there any conveniences to make passing "weird" (that is, not some simple and reasonably-small JSON-encoded objects) things around easier (like Dagster's I/O managers) or is it all out of scope for Hatchet?
- Assuming that I can get ballpark estimates for the desirable number of tasks, their average input and output sizes, and my PostgreSQL instance's size and I/O metrics, can I somehow make a reasonable guesstimate on how many tasks per second the whole system can put through safely?
I'm currently in search of the Holy Grail (haha), evaluating all sorts of tools (Temporal, Dagster, Prefect, Faust, now looking at Hatchet) to find something that I would like the most. My project is a synchronization+processing system that has a bunch of dynamically-defined workflows that continuously work with external services (stores), look for updates (determine new, updated, or deleted products) and spawn product-level workflows to process those updates (standardize store-specific data into an unified shape, match against the canonical product catalog, etc etc). Surely, this kind of a pipeline can be built on nearly anything - I'm just trying to get a gist of how each of those system feels like to work with, what it's actually good at and what are the gotchas and limitations, and which tool would allow me to have least amount of boilerplate.
Thanks!
All I ever want is a queue where I submit a message and then it hits an HTTP endpoint with that message as POST. It is such a better system than dedicated long running worker listeners, because then you can just scale your HTTP workers as needed. Pairs extremely well with autoscaling Cloud Functions, but could be anything really.
I also find that DAGs tend to get ugly really fast because it generally involves logic. I'd prefer that logic to not be tied into the queue implementation because it becomes harder to unit test. Much easier reason about if you have the HTTP endpoint create a new task, if it needs to.
> It is such a better system than dedicated long running worker listeners, because then you can just scale your HTTP workers as needed.
This depends on the use-case - with long running listeners, you get the benefit of reusing caches, database connections, and disk, and from a pricing perspective, if your task spends a lot of time waiting for i/o operations (or waiting for an event), you don't get billed separately for CPU time. A long-running worker can handle thousands of concurrently running functions on cheap hardware.
> I also find that DAGs tend to get ugly really fast because it generally involves logic. I'd prefer that logic to not be tied into the queue implementation because it becomes harder to unit test. Much easier reason about if you have the HTTP endpoint create a new task, if it needs to.
We usually recommend that DAGs which require too much logic (particularly fanout to a dynamic amount of workflows) should be implemented as a durable task instead.
I used to work for a company that used long running listeners. They would more often than not, get into a state where (for example) they would need to upgrade some code and now they had all these long running jobs (some would go for 24 hours!), that if they stopped them, would screw everything up down the line because it would take so long to finish if they restarted them that it would impact customer facing data. Just like DAG's, it sounds good on paper, but it is a terrible design pattern that will eventually bite you in the ass.
The better solution is to divide and conquer. Break things up into smaller units of work and then submit more messages to the queue. This way, you can break at any point and you won't lose hours worth of work. The way to force this to developers, is to set constraints about how long things can execute for. Make them think about what they are building and build idempotency into things.
The fact that you're building a system that supports all these footguns seems terrifying. "Usually recommend" is undesirable, people will always find ways to use things in the way you don't expect it. I'd much rather work with a more constrained system than one trying to be all things to all people. Cloud Tasks does a really good job of just doing one thing well.
I’ve been occasionally hacking away at a proof of concept built on riverqueue but have eased off for a while due to performance issues obvious with non-partitioned tables and just general laziness.
https://github.com/jarshwah/dispatchr if curious but it doesn’t actually work yet.
To be clear, I really like the model of riverqueue and will keep going at a leisurely pace since this is a personal time interest at the moment. I’m sick of celery and believe a service is a better model for background tasks than a language-specific tool.
If you guys were to build http ingestion and http targets I’d try and deploy it right away.
> Perceived only at this stage, though the kind of volume we’re looking at is 10s to 100s of millions of jobs per day.
Yeah that's a little over 100 jobs/sec sustained :) Shouldn't be much of an issue on appropriate hardware and with a little tuning, in particular to keep your jobs table from growing to more than a few million rows and to vacuum frequently. Definitely hit us up if you try it and start having any trouble!
I built my own super simple router abstraction. Message comes in, goes into my router, which sends it to the right handler.
I only test the handler itself, without any need for the higher level tasks. This also means that I'm only thinly tied to GCP Tasks and can migrate to another system by just changing the router.
Alternatively you can use ngrok(or similar) and a test task queue that is calling your service running on localhost tunneled via ngrok.
[0] https://github.com/oneapplab/lq
P.S: far from being alternative to Hatchet product
1. Are you ordering the jobs by any parameter? I don't see an ORDER BY in this clause: https://github.com/oneapplab/lq/blob/8c9f8af577f9e0112767eef...
2. I see you're using a UUID for the primary key on the jobs, I think you'd be better served by an auto-inc primary key (bigserial or identity columns in Postgres) which will be slightly more performant. This won't matter for small datasets.
3. I see you have an index on `queue`, which is good, but no index on the rest of the parameters in the processor query, which might be problematic when you have many reserved jobs.
4. Since this is an in-process queue, it would be awesome to allow the tx to be passed to the `Create` method here: https://github.com/oneapplab/lq/blob/8c9f8af577f9e0112767eef... -- so you can create the job in the same tx when you're performing a data write.
But that requires you to keep the job history around, which at scale starts to impact performance.
At one point we considered partitioning on the status of a queue item (basically active | inactive) and aggressively running autovac on the active queue items. Then all indexes for monitoring can be on the inactive partitioned tables.
But there were two reasons we ended up going with separate tables:
1. We started to become concerned about partitioning _both_ by time range and by status, because time range partitioning is incredibly useful for discarding data after a certain amount of time
2. If necessary, we wanted our monitoring tables to be able to run on a completely separate database from our queue tables. So we actually store them as completely independent schemas to allow this to be possible (https://github.com/hatchet-dev/hatchet/blob/main/sql/schema/... vs https://github.com/hatchet-dev/hatchet/blob/main/sql/schema/...)
So to answer the question -- you can query both active queues and a full history of queued tasks up to your retention period, and we've optimized the separate tables for the two different query patterns.
Or how would you scale this to support thousands of events per second?
To clarify, Hatchet supports both DAGs and workflows as code: see https://docs.hatchet.run/home/child-spawning and https://docs.hatchet.run/home/durable-execution
There's a lot to go into here, but generally speaking, running an orchestrator as a separate service is easier from a Postgres scaling perspective: it's easier to buffer writes to the database, manage connection overhead, export aggregate metrics, and horizontally scale the different components of the orchestrator. Our original v0 engine was architected in a very similar way to an in-process task queue, where each worker polls a tasks table in Postgres. This broke down for us as we increasing volume.
Outside of durable execution, we're more of a general-purpose orchestration platform -- lots of our features target use-cases where you either want to run a single task or define your tasks as a DAG (directed acyclic graph) instead of using durable execution. Durable execution has a lot of footguns if used incorrectly, and DAGs are executed in a durable way by default, so for many use-cases it's a better option.
Re DBOS: I understood that part of the value proposition there is bundling transactions into logical units that can all be undone if a critical step in the workflow fails - the example given in their docs being a failed payment flow. Does Hatchet have a solution for those scenarios?
The core idea being that you write the "parent" task as a durable task, and you invoke subtasks which represent logical units of work. If any given subtask fails, you can wrap it in a `try...catch` and gracefully recover.
I'm not as familiar with DBOS, but in Hatchet a durable parent task and child task maps directly to Temporal workflows and activities. Admittedly this pattern should be documented in the "Durable execution" section of our docs as well.
Re Dagster - Dagster is much more oriented towards data engineering, while Hatchet is oriented more towards application engineers. As a result tools like Dagster/Airflow/Prefect are more focused on data integrations, whereas we focus more on throughput/latency and primitives that work well with your application. Perhaps there's more overlap now that AI applications are more ubiquitous? (with more data pipelines making their way into the application layer)