Async control: Promise Queues and "leaky-buckets" πŸ’§πŸ’¦

Advanced techniques on how / when to queue up functions and Promises (Totally worth mastering!)

Β·

6 min read

Featured on Hashnode

When you want to make multiple HTTP network calls in an application, sometimes you need to know when all the calls are complete. Other times, you need to be extra careful that you don't DOS your own system's back-end API (or someone else's 😐).

In this article I'd like to discuss various scenarios in JavaScript where you want to track the status of multiple network requests.

The first scenario is perhaps the easiest: make all the requests at once, and wait for them all to complete. (We're going to assume a happy-path 200 status response, unless otherwise stated.) To do this, we simply store all the API network call responses in an array, and await for them to resolve:

// represents all the future API calls we want to make
const calls = [
  `https://myapi.com/v1/user?${( new URLSearchParams( { name: "Steven1" }) )}`,
  `https://myapi.com/v1/user?${( new URLSearchParams( { name: "Sam2" }) )}`,
  `https://myapi.com/v1/user?${( new URLSearchParams( { name: "Sylvia3" }) )}`
];
const resultPromises = calls.map(url => fetch(url));
const results = await Promise.all(resultPromises);

By calling .map(), we store each call's resulting Promise in an array, and use Promise.all() to wait until they are all resolved as results. Simple. Also somewhat dangerous as mentioned above!

At the other end of the spectrum, we can make one API call at a time, and wait for it to complete before calling another one.

for (const url of calls) {
   const result = await fetch(url);
  // do something with result
}
const result = await calls.reduce(async (lastPromise, url) => {
    const result = await lastPromise;
    // do something with result
    return fetch(url);
}, null);
// do something with last result

The second approach is just a fancy way of doing it via functional programming (FP). Both loop styles are equally effective.

What if we want something in between? Let's explore a couple of ways of sending more-than-one AND less-than-all calls (i.e. 1 > ??? < all).

First the naΓ―ve approach is where we'll send a call "batch" of size X (e.g. 2), wait for all those requests to complete, then send another "batch".

const resultPromises = [];
const batchSize = 2; // any size we want!

for (const url of calls) {
  const result = fetch(url);

  resultPromises.push(result);
  if (resultPromises.length === batchSize) {
    const results = await Promise.all(resultPromises); // wait here
    resultPromises.length = 0; // reset the array
  }
}
await resultPromises; // ... you could reset the array, too :)

Here we've introduced a little more state in order to track a "batch" of result Promises. When we reach the batch size we want, we await all those Promises before continuing. Note at the end we have to again await the results after the loop, in order to wait on the last batch of calls (which may not have evenly divided between our batchSize and calls length).

This approach works fine. But there are a couple of major drawbacks:

  1. The "weakest link" problem: If any ONE of these requests in the batch takes long, then the ENTIRE batch takes long, which will "delay/block" the remainder calls from getting sent out as quickly. Maybe there's a better way to unblock?

  2. The "mixed concerns" problem: There is a lot of boiler-plate code around this throttling feature, maintaining the number of live network requests. Maybe there's a better way to organize our code?

The solution to #1 is to implement a "leaky bucket", where we can fill the bucket with a buffer of water (i.e. all the API calls we want to make), and it will throttle the amount of drip (i.e. the number of live requests sent).

The solution to #2 is to write a helper class or function, which encapsulates the state and responsibility of tracking live requests away from sending them (via fetch). We'll use a simple Higher Order Function design pattern to encapsulate this behaviour.

First let's think about the ideal API of our helper lib. What we want is something like this made-up createBucket lib:

import createBucket from "leaky-bucket";

const batchSize = 2; // any size we want!
const bucket = createBucket(batchSize);

const resultPromises = calls.map(url => bucket(fetch, url));
const results = await Promise.all(resultPromises);

This is the magic part: bucket(fetch, url). With this API, we're giving to bucket a pointer to the function we want called ASAP (fetch), along with the arguments it should be passed (url). An alternative API could look like this: bucket(() => fetch(url)), where we pass bucket a single callback function called a "thunk". Either API will work just as well (so we'll implement the latter, for simplicity).

From here, it is bucket's job to make sure only 2 live requests happen at a time. How do we do that? With arrays! Let's take a look.

export default function createBucket(maxCurr = 10) {
  let curr = 0; // currently live calls
  const queue = []; // calls to be made

  async function scheduleWork() {
    // snip!
  }

  return (cb) => {
    return new Promise((resolve, reject) => {
      // snip!
    });
  };
}

Here we've defined createBucket factory with two main features: A) a private scheduleWork() function, and B) a public function for adding our callback into the bucket. Note that curr tracks the number of executing functions, and queue is the bucket that contains the callbacks to be called.

Let's start by implementing the B) snip! to register callbacks, which looks like this:

  return (cb) => {
    return new Promise((resolve, reject) => {
      queue.push({ cb, resolve, reject }); // add callback onto queue
      scheduleWork(); // schedule next cb exec
    });
  };

To support async functions, we're returning a Promise to the caller. Our factory function first adds the callback along with its related resolvers onto the queue (for scheduling use), and then it schedules work. So now... let's implement A) snip! to do that work:

async function scheduleWork() {
    if (curr >= maxCurr || queue.length === 0) return;

    curr++;
    const { cb, resolve, reject } = queue.shift();

    try {
      const result = await cb(); // execute callback
      resolve(result);
    } catch (err) {
      reject(err);
    }

    curr--;
    scheduleWork(); // schedule next cb exec
  }

As you can see, the job of scheduleWork is to make sure it has some water in the "bucket" to leak out, plus if there is any curr (i.e. currently live) capacity left. If so, it will increment the curr counter, execute the callback, and resolve its value for the Promise that was originally passed back to the caller. It also handles the error case (πŸŽ‰ bonus! πŸŽ‰). Finally, it recursively calls itself to allow rescheduling any remainder callbacks.

That's it! If you want to see the full versions of the lib, look at this gist. There are some variants in there as well, including how to expose the queue size and active count, plus some different APIs. So check it out!

I hope you had fun learning more about how to handle async functions and Promises with queues. Understanding these advanced techniques and concepts will give you additional tools for many other async situations, so they're πŸ’― worth mastering!

You can follow me on Twitter and YouTube. Let’s continue interesting conversations about Sr JS development together!

Did you find this article valuable?

Support Steven Olsen by becoming a sponsor. Any amount is appreciated!

Β