Hacker Newsnew | past | comments | ask | show | jobs | submitlogin
Building a fair multi-tenant queuing system (inngest.com)
176 points by tonyhb on Jan 22, 2024 | hide | past | favorite | 57 comments


I think you want controls around:

0) Load - I'm too loaded right now, I'm not processing your message. Maybe another worker will. This might be enqueued back into the low latency queue.

1) Role Limit - You've sent too many messages in too short of a time, I'm dequeueing this into the high latency queue.

2) Cost Limit - You've sent too many expensive messages, I'm dequeing this into the high latency queue.

3). Deadletter- Your messages are failing to be processed. We're going to dequeue into deadletter and maybe we're going stop processing them for for now.

So you have the normal low latency queue, the high latency backlog queue, and the deadletter queue. Ideally you have SLOs around end to end queue times and monitoring/alerting to let you and your senders know their expectations may be not being met (i.e 99% of messages are processed within n MS).


As we've moved up in scaling the number of tenants we found tiered queues to be an anti-pattern. Inevitably some worker or upstream will get broken and you end up the high latency or dead letter queues with much more traffic than they're really designed to handle. Clearing it out then becomes a scramble since just scaling the high latency workers will often cause cascading issues. It makes sense if you frame it as "we're having trouble managing our 1 million queues" and the solution is to "create up to 1 million more queues".

Instead we use a priority queue and adaptive concurrency. The priority queue lets you reschedule failed messages with a capped exponential backoff. That way it doesn't significantly impair the good items in the queue while still giving plenty of time to potentially fix the failing items. Adaptive concurrency increases & decreases the concurrency for that queue based on success & failure of workers on the queues items up to some upper & lower bounds. That way a single queue can't waste time scheduling work that's just going to most likely fail.

We still have a dead letter queue, but it only ends up containing bugs since expiration is considered a normal message lifecycle event. So dead letter items just contain messages that were too broken to either send or expire correctly.

The key take away is that as the number of queues increase it's better to just operate on the concurrency & ordering of the queue than to just keep creating more queues for the same work stream.


I found Amazon's Builders Library to have a very insightful list of how to handle building multi tenant queue systems https://aws.amazon.com/builders-library/avoiding-insurmounta...

I think one of the big tools is Shuffle Sharding. The article talks about standard sharding by itself as not being enough to provide robustness in multitenant queues. But Shuffle Sharding I.E. assigning users to virtual groups of underlying queues and enqueueing to the queue with the smallest size gets you pretty far. It can limit throughput for individual users but implementing some simple work stealing logic on the consumer helps make sure you keep your throughput up.


David mentioned it but SQS `MessageGroupId` can get you really far on fair-ish work from the queue. Its effectively a virtual partition key for the queue. Set your customer id, resource id, etc as the MessageGroupId and use that to do the work allocation.

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQS...


That extra layer of indirection, virtual queues, is key.


There are plenty of articles about writing simple job queues in Postgres using SKIP LOCKED, and I personally like that version quite a bit for cases where you don't want to add another separate component just for the queue. But I haven't seen anyone addressing fairness in such a queue, e.g. for the multi-tenant case.

Is there a simple way to get a very rough kind of fairness between tenants in this case? Doesn't have to be actually fair, just fair enough that no tenant gets fully stuck behind a large block of jobs from another tenant.

The idea behind SKIP LOCKED is that you just get the first free job (potentially ordered by e.g. a priority column). Doing that in a way that switches between tenants doesn't seem straightforward. You could execute the query once per tenant, but that would be rather inefficient for many tenants. Is there a reasonably straightforward way to achieve this?


I found a nice article on skip locked last week: https://www.crunchydata.com/blog/message-queuing-using-nativ...

We went ahead and implemented our own little queue in postgres based on that to get rid of a wonky thing we had that built on top of redis. What I like about this is that it's simple, robust, and transactional. So, you get some nice guarantees out of that.

For fairness, you might do stuff with partitioning on the priority column. Simply set up your workers to only hit a certain partition. So you have separate workers for higher and lower priority queue items. If you then dynamically lower the priority of tenants sending you lots of traffic, the system becomes more fair for the others. You could work with some simple moving average here. Just an idea, but something like this might work.


> Is there a simple way to get a very rough kind of fairness between tenants in this case?

Yes, since the SKIP LOCKED query is just a special sort of SELECT that skips over locked rows, you can use all the normal SQL tools such as ORDER BY to accomplish what you want.

One way is to make the top row fair by randomizing over the tenant ID, for example

  SELECT itemid
  FROM queue
  ORDER BY
    md5(tenant_id || current_timestamp::TEXT),
    item_id
  FOR UPDATE SKIP LOCKED
  LIMIT 1;
Here we append the ID to the timestamp and then hash it, so each time the SELECT is run there is effectively a different ordering between tenant IDs, hence fairness.

Note that this will eventually have performance implications for a deep queue since it forces a scan on the whole table. Adding an index on tenant_id may mitigate that, but as always do your own profiling.


Having a query like this that gets slower as the queue gets bigger is a really bad idea. It'll work just fine in the steady state when your queue is short. And then when you get overloaded and start queueing, you will be adding more load into the equation just to drain the queue. It's a recipe for a metastable system that causes a major outage. Adding an index on tenant_id will do nothing because you are always appending the current timestamp before hashing.


The query running in O(queue size) is very likely avoidable if you add an index on the tenant_id.

Ideally the query planner figures this out (see the sibling comment), but if not, the query can likely be written to "encourage" the planner, for example by first selecting the top few values of md5(tenant_id, current_timestamp) within a subquery before lateral joining into the SKIP LOCKED table.

If the concern is only that the queue falls over once it gets large enough, you can also LIMIT to a fixed number of rows before doing the outer SELECT. This gives a weaker fairness bound (tenants that have fewer than O(queue size / selected items) rows are at risk of being ignored) but you get a better guarantee of progress under contention.


Out of curiosity, is Postgres usually able to use a b-tree index's representation of the columnar data for its (set of) columns to calculate a derived value for those column(s), then scan through that in-memory derived data? Certainly not as fast as the index lookup itself, but avoids needing to have the entire table in memory, I would hope?

The documentation under https://www.postgresql.org/docs/16/indexes-types.html implies that it can be used to access the sorted values - but does that feed through the query planner?


I'm not addressing your point on fairness but I've been working on a similar program but instead of taking locks out on the queue table I maintain a consumers position for locked and acknowldeged messages on the consumers table. It makes lots of queries simpler. Here's a simplified schema using SQL Server:

  CREATE TABLE [dbo].[Nodes] (
    [Id]             UNIQUEIDENTIFIER NOT NULL,
    [NetworkId]      UNIQUEIDENTIFIER NOT NULL,    
    [Locked]         INT              NOT NULL,
    [Acknowledged]   INT              NOT NULL,
  );

  CREATE TABLE [dbo].[Messages] (
    [Id]            UNIQUEIDENTIFIER NOT NULL,
    [NetworkId]     UNIQUEIDENTIFIER NOT NULL,
    [NodeId]        UNIQUEIDENTIFIER NOT NULL,    
    [Payload]       NVARCHAR (MAX)   NULL,
    [InsertedOrder] INT DEFAULT (NEXT VALUE FOR dbo.MessageSequence) NOT NULL,
    [TimestampUtc]  DATETIME2 (7)    DEFAULT (sysutcdatetime()) NOT NULL,
  );

  Select * 
  From Nodes With (Rowlock, Holdlock)
  Where NetworkId = @networkId
  And Id = @nodeId
Multi-tenancy is per database.

(edit: simplified sql)


Skip locked seems to be the way that new queueing packages are going, though they don’t solve fairness or multi-tenancy.

It’s nice to keep things simple, but how does that work at scale? If I have 1,000 users I don’t think that these standard queueing systems can prevent one tenant from impacting another. It doesn’t look like that’s possible in SQS, either.

On the other hand, is that even necessary? At what scale is this important?


I'm thinking about relatively simple systems and not "real" fairness. The case where I've seen this being a bit problematic is with such a queue in an application where jobs are sometimes inserted in large bursts. It doesn't actually matter much if they are truly fair, but in the cases where a large number of jobs is inserted at once the result is that for some tenants the queue doesn't seem to move at all.

It's not a huge problem in that case because the regular priority system still puts the urgent jobs at the front of the queue. But it does sometimes cause confusion for someone looking at this from a single-tenant perspective because the queue looks stuck. There are other ways to fix the confusion, but it would be nice to have a very rudimentary fairness in this simple queue that would ensure that the queue iterates a bit over the tenants.


Seems to me a solution is to have a "generic" queue handler that will take any message, and then have tenant specific queue handlers.

Now, mind, I'm not thinking of a dedicated handler for each tenant, rather one that can round robin, or rotate through the tenants to pull items specific to that tenant off the queue. This can be the "fairness" arbiter.

With the Postgres SKIP LOCKED technique, you can be selective as to which messages you pull (you're not limited to the head of the queue, but can dig deep if you want).

So, you have one general purpose "head of the line" handler, and then one (or several) that acts like the fellow at the Post Office "Anyone here just picking up mail?", and pull them out of the queue.

Of course the generic version can just switch modes (every 1m, or 100 messages, go into "fair mode" for a short time), vs having different handlers.


> On the other hand, is that even necessary? At what scale is this important?

I can see it being helpful in front of a shared resource that doesn't have any way to control priority or fairness. My use case would be Elasticsearch. I don't want to block any searches from running if there's capacity, but also I don't want misbehaving clients to monopolise resources, and I want to make sure that all customers/users get to run their searches eventually, though I may prioritise some customers/users over others. And I may need my own internal jobs to run at a lower priority than a user search, but still within a guaranteed window (to avoid timeouts on the client side).

Honestly for my use case this could be useful with just a handful of users - less than 10, even.


> On the other hand, is that even necessary? At what scale is this important?

That's the critical question. To that I had that there might be a myriad of articles on how someone used postgres to build a message queue instead of using a dedicated message broker, but there isn't a single article showing any form of benchmarking, performance tests, load test results, and more importantly report the correlation between load and performance degradation and eventual overload and resulting failure modes. This information is of critical importance to be able to design systems and pick which tool is suited for the job.


This depends largely on your requirements.

* How well can you estimate job cost? How should overestimates and/or underestimates be treated?

* Where, when and how often does resource contention occur between jobs? Is it from a single resource, or from multiple different resources?

* What constraints or behaviors does the contended resource(s) possess (e.g. rate limits, pipelining, etc.) that would materially impact how you determine which job to run next?

* What guarantees do you wish to make (e.g. execution latency, resource availability, etc.)?

* How is resource usage quota determined? Are quotas updated frequently? Are there any circumstances under which you would like to allow users to exceed their quota (e.g. temporary burst workloads, idle periods of time)?


The solution is to maintain two queues.

One queu is your normal queue.

A second queue that is your tenant queue (one record for each tenant that has queued items).

You select from your tenant queue, then select from your normal queue. All the while keeping your tenant invariant (one record for each tenant that has queued items).

---

It's certainly more complex but no one said that fair was easy.


You have the full power of PostgreSQL at your disposal, so there are many ways you can effectively tackle this issue.

- Decrement priority of all tenants jobs by 1 each time their job is executed, or increment other tenants' priorities (more ops but better behavior)

- Maintain a separate tenant priority table and join it + use for ordering when fetching the next job

And so on


I can think of plenty of inefficient ways to do this. The nice thing about the SKIP LOCKED queue is that it is very simple and pretty fast. Postgres just has to use an index to look at the jobs in some defined order and take the first one that isn't locked.

The first option here would create an enormous amount of writes for each job fetched and likely slow it down to a crawl if enough jobs are in the queue.


Many scheduling systems have a "time in queue increases priority" behavior; it is not an exotic proposition and could be implement efficiently in PostgreSQL.

Having too many jobs in queue is a problem on its own that should be addressed. Each tenant should be rate-limited or have a reasonable cap on number of waiting jobs.


As long as you don't need a strict priority and a worker can just grab the next waiting job:

  SELECT * FROM jobs ORDER BY RANDOM() LIMIT 1 SKIP LOCKED
should do the trick


This will give more resources to tenants that schedule more jobs.

If tenant A schedules 99 jobs and tenant B schedules 1 job, a "fair" algorithm would pick B's job either first or second, RANDOM() will not.


First, query total time spent today for each tenant. Then, pick a job from the most unfortunate tenant's queue.


There is probably a better way (I no longer use Postgres for this) but I solved this in the past by evaluating priority after popping an item off the queue. If the item was low priority and there was higher priority work to do, or if a customer was over their quota, I would reschedule the work item.

In that scenario I employed an earliest-deadline-first scheduler table so rescheduling was just re-inserting or updating with a new deadline.


> In that scenario I employed an earliest-deadline-first scheduler...

I did something that sounds similar with the skip locked technique for a request queueing system supporting conveyor routing in a DC.

Priority alone wouldn't work because requests in different areas of the conveyor had different lead times until the response was required, sometimes 5 feet and sometimes 300 feet. If I made the shorter lead times all higher priority then they could all supersede an older request for longer sections that is now nearing the divert.

So the requests get submitted with a deadline datetime, and that is the index used to select the next request (top 1, order by deadline), worked pretty well.


>> with Postgres and SKIP LOCKED always cropping up

Note that SKIP LOCKED is available in Microsoft SQL Server and MySQL and I believe possibly also in Oracle and IBM DB/2.

To the article, I suspect there will be alot of detail challenged here.

There's talk in here about "creating queues" to handle the fairness problem. I might be misunderstanding what the hard bit is, but with a database backed queue you don't have to create a queue - queues are virtual depending on what messages are in there and what other information is available such as username or tag. if you want to be "fair" around username then round robin on that, or fair around some other bit of data then tag the message when it goes into the queue and when you are processing, round robin on that.

What is the hard bit here, what am I missing?


This problem can be extended to systems like project management, where your workers are not computers but people.

For instance: Suppose you have a team of six people, and you're being asked to tackle two projects for two clients, each of equal value to the business. One project will take two weeks to complete; another will take four months to complete.

How do you determine a "fair" way to divide up those projects? Do you do one before the other? Do you divide up your team's capacity equally and do them in parallel?

This is the same sort of queuing problem described in the post. So a solution to the computer queuing problem might have interesting implications for the broader problem.


Is "step function" a term of art I'm just ignorant of in computer science? AWS Step Functions, sure. How it's used here? No idea.

> It enables developers to write declarative step functions...

> With thousands of customers running thousands of step functions containing parallel steps....

> If you're familiar with step functions, you probably already see what's happening here...


"Step function" is sort of a standard term, yeah. Each step is some piece of code that runs at some point in the near future and persists its result for the next step to ingest. It could randomly fail (crashes, preemption, etc) before persisting, in which case the step is retried soon, so it better not have consequential side effects. Most likely you're interacting with external systems (like credit card processors) if you're using step functions in the first place, so idempotency is key.

I've done basically this sorta manually at a smaller scale. Each step was a Postgres table, like "user order initiated," "order confirmed," "order fulfilled," with automated steps (sometimes cronjobs) in between. Sometimes I wanted to expose that status to users. I could see using Inngest at a larger scale.


I don't think that's exactly the meaning on this article in particular, but I've seen people using "step function" as synonym to a step in a Finite-State Machine.

SWE technical terms are used very loosely, especially on blog posts like this.


It also means something unrelated in mathematics


Fair queuing in multi-tenant scenarios is also what describes what is going on in some financial trading systems: these queuing mechanisms are often opened up to regulators to demonstrate the fairness.


Great job abstracting away so much complexity!

> each step is a code-level transaction backed by its own job in the queue. If the step fails, it retries automatically. Any data returned from the step is automatically captured into the function run's state and injected on each step call.

This is one thing I've seen so many companies spending tons of time implementing themselves, and happens _everywhere_ -- no code apps, finance software, hospitals, anything that deals with ordering system...the list goes on.

Glad I no longer need to write this from scratch!


Another place to look for inspiration is the HPC world. Fairshare job queues have been active on multi tenant HPC clusters for decades. Each job typically has a priority value assigned to it based on the submission time, expected resources required, and how much of the cluster the account/user has used recently. Priorities can be continuously updated until the job is released for processing.


For a small side project, instead of queuing all the tasks or writing a custom fair multi-tenant queuing system, I added all the "tasks" to the db, then with a periodic job I query with SKIP LOCKED for pending tasks and enqueue them on my queue system (sidekiq, Goodjob, etc). I only enqueue them if there are less than some amount of jobs pending, it is hardcoded, but I could do it dynamically.

The query logic take into consideration the tenants so, a big tenant (or misbehaving) can't block small ones.

The periodic job is executed every few second, but it's also enqueued when added new tasks to the db (unless it's already enqueued) or when the query logic detect that there is more jobs available, so the lag is minimal.

At the moment I am doing this with only one type of jobs, but I want to expand it to most of the jobs, and let the "query logic" balance the queues.


This is cool but pretty light on the details. Judging by how it reads, they create queues for each function, then create queues of queues for workers. It sounds similar to the QuiCK paper from Apple: https://www.foundationdb.org/files/QuiCK.pdf


A "nuclear option" I've seen in the wild is Queue-It. Puts your entire flow behind a queue that's user-visible, with a waiting list page. Then presumably your own site's logic doesn't do any queuing.


How do they handle multi-tenancy for their users? Any post on that?


I don't know, but the requirements are different. They manage a single huge queue for your website (or part of it), and your own backend is expected to be able to handle some trivially small number of concurrent users. The most common use case is highly contentious online ticket sales, where users might even wait an hour in line. Latency isn't much of a concern.


Sounds a lot like Azure Durable functions. Which we started using recently. Curious to know if inngest could also fit our use case.


NATS has multi tenancy built in.


This honestly sounds like a job for the BEAM (Erlang/Elixir).

The preemptive scheduler would cover almost every need mentioned in the article.


Not sure if this is what you're getting at, but RabbitMQ is indeed pretty great at having thousands of queues, even on modest hardware, and can be used for per-tenant queues in this way.

It does have some drawbacks though: if you have thousands of mirrored or quorum queues, managing the cluster becomes more cumbersome - it can take a while to replace nodes (rebalancing thousands of queues around). Also, the management tools (the web UI especially) don't perform that well if you have 10,000s of queues and 100,000s of consumers


Which part of BEAM are you talking about? I know some cases can be solved by it already, but not almost every need.

The fairness of the BEAM scheduler is not the same as multi-tenant fairness. I'm aware of lcnt in Erlang that helps with contention, but that will have a hit in throughput like any other locks.

Unless I'm missing something?


The way the BEAM scheduler works you get max runtime per process before it switches over to another, which lets you run potentially millions of concurrent processes without having to worry about 1 taking it over because it was heavier. Your big stuff takes longer, but your response times across the board will remain very consistent.

If you wanted to make it truly fair you could spin up a GenServer per tenant and you would have a max concurrency per tenant of 1, but still all executing equally in parallel without 1 tenant stealing CPU time from the others. Contention is a non-issue. Fairness is built in.

You get the fault tolerance, isolation and observability as a by product too.

That's just my first reaction on reading it.


i thought RH had solved that. just write a systemd unit file with the cpu quotas. bam. done. /s?


btw "fair" is an entirely subjective and unproductive way to describe a system. There's simply properties and priorities. Many of us have learned fair to mean a great variance of things. From per capita equal load, to you get what you pay for, to anything goes just don't hurt anyone. In families we might see this as each child gets the same number of cookies. In capitalism fair is what you negotiate and agree to. The point isn't what is or isn't fair, but that there is no universal "fair".

Should a user be able to jump the queue? What if they're your number one driver of revenue? Or they have a time-sensitive function w/ a deadline? Or if they've produced no load on the system in the past $TIME_UNIT. All of these are not fair, just properties to be designed into a product.


Fair queueing is a very old (40 years?) CS / networking paradigm. It's what makes your internet connection feel snappy. It is distinct from order and priority, and is not subjective; there are many forms of it and each has a specific meaning.

There are many other forms of queueing other than fair, so if you're looking for a more specific use case, there may already be an algorithm for it.

https://en.wikipedia.org/wiki/Fair_queuing https://en.wikipedia.org/wiki/Weighted_fair_queueing https://intronetworks.cs.luc.edu/1/html/queuing.html


even in your linked examples Fair queuing boils down to a fairness measure which themselves are defined for certain properties.

https://en.wikipedia.org/wiki/Fairness_measure


Yeah I didn't understand what the author even meant in the article. How is first come first serve not fair?


The author defined fair in a pretty industry standard way:

> One user should not be able to block another's work.

A multi-tenant architecture implies over committing resources to achieve better economics. Once upon a time, decades ago, when computers were so expense that companies payed for batch processing instead of owning their own, FIFO was acceptable. (And for a tiny part of the market it still is.) But ever since the rise of time-sharing operating systems, users have come to expect short delays for small tasks. If you let one customer monopolize your capacity because they submitted a mountain of work before a second customer submits a handful, you probably won't have the second customer much longer.


Fair means that we should be giving users roughly the same capacity, or at least roughly the same chances to be worked on.

In the case of this queue, where each letter represents a user:

[A, Bx10000, C, D, E]

We're being unfair to C, D, and E. Realistically, while working on B's jobs we should have some mechanism to know that latency for C, D, and E are increasing and that we can start assigning them to workers.

Without that, latency for any step function you run is highly variable and impacted by other users. With multi-tenant fairness, latency is purely driven by how well we auto-scale and your own concurrency capacity.

The post here is about multi-tenant fairness in particular, so the intent is that fairness is viewed from a multi-tenant / PaaS lense.


> or at least roughly the same chances to be worked on.

Says who? What if B pays 10000x what ACDE do combined? What if B pays for that capacity?

What if B was silent for the 10000 seconds prior? And is now bursting their load just like ACDE do at other times?

There is no objective fair.

Perhaps the only objectively unfair I could think of is if you paid for a service and it's not rendered, at all, ever. That's probably unfair. But everything else is a grey spectrum in between and depends on the agreed upon properties.


> What if B pays 10000x what ACDE do combined? What if B pays for that capacity?

If B's load is consistently that high compared to the others then presumably the system has enough total capacity that those requests don't impact the latency for everyone else. If B pays for dedicated capacity they should ideally get their own queue.

> What if B was silent for the 10000 seconds prior?

Then B should smooth out their load better.

In this extreme an example, not much is lost for B by prioritizing C, D, E first sometimes anyways.

There might not be one single definition of fairness, but preventing one noisy user from impacting other users is a pretty common way to approach it.


All those shoulds are value statements which are not universal law. They're opinions about what constitutes fair.




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: