Javier Casas

A random walk through computer science

Managing concurrency in JavaScript

We want to run some concurrent operation (usually disk or network operation). But we also want some control over it. The classic use case is using an API with some rate limiting, thus forcing us to do one (or a limited amount) request at a time. We want to be able to use language primitives as much as possible, while having control over how many requests are being run in parallel.

This is JavaScript, so the main way to do this is via Promises, as we want to have code that looks kinda linear and try to hide the callbacks as much as possible behind async syntax.

The baseline

The most basic approach is just map async and Promise.all (or Promise.allSettled).

const employeeIds = [33, 47, 112]
await Promise.all(
  employeeIds.map(async (employeeId) => {
    await api.doSomethingWithEmployee(employeeId);
  })
)

The code is quite simple: we just Promise.all a bunch of promises that are the result of requesting multiple operations. If you do this, the system will try to execute all the async operations at the same time and a request will be started immediately for each entry in employeeIds. There are some exceptions, for example the browser will usually run around 4 concurrent requests. But, in any case, it's external things rate limiting us, so it's not our code in control.

Naive approach

We have Array.prototype.forEach that kinda exists for this purpose:

const employeeIds = [33, 47, 112]
employeeIds.forEach(async (employeeId) => {
  await api.doSomethingWithEmployee(employeeId)
})

In this case, we kinda stated a bit better what we want to do in the code, but everything went wrong:

  • We no longer await for the tasks to complete before trying to do other stuff. We "fire and forget" the tasks. I hope you didn't need to do anything with the return values of the tasks.
  • There is no way to wait for a task to finish before starting the next.

This means, effectively, that forEach only works as expected for non-async tasks.

Controlling the execution

The initial problem is that we launch all the promises immediately (in the .map), whereas we may want to delay some requests. So let's start with converting the promises into tasks to be triggered under our control:

const employeeIds = [33, 47, 112]
const tasks = employeeIds.map((employeeId) => async () => {
  await api.doSomethingWithEmployee(employeeId)
})

// Run the tasks one at a time, waiting for it to finish before starting the next
for(let task of tasks) {
  await task()
}

This kinda starts to improve, but we are starting to run into friction. We have to construct a for loop, it's no longer part of a map chain, and our only control is "one at a time".

Trying to put the map chain back

That for loop is ugly. I also dislike that Promise.all. I would like to have something like:

const employeeIds = [33, 47, 112]
// This is not real Javascript
await employeeIds
  .map((employeeId) => api.doSomethingWithEmployee(employeeId))
  .awaitAll(oneAtATime)

That kinda follows what's going on in the system. We moved the Promise.all to the end of the map chain because we expect to await for all the promises after we have started them. But this conflicts with the reality of JavaScript, as, again, we would start all the promises at the same time, then the awaiting is no longer in control of what runs when. But it serves as food for thought, and allows us to try to get somewhere. Instead of the non-existing .awaitAll, let's try to .reduce:

const employeeIds = [33, 47, 112]

// Function to .reduce by constructing a chain of promises
async function oneAtATime(accumulatedPromises, newPromise, index) {
  return async () => {
    // We check for index here because
    // for the first iteration accumulatedPromises don't contain an accumulatedPromise,
    // but just a promise of the first element of the array
    const rest = (index === 1 ? [await accumulatedPromises()] : await accumulatedPromises();
    const current = await newPromise()
    return [...rest, current]
  }
}

const results = await employeeIds
  .map((employeeId) => () => api.doSomethingWithEmployee(employeeId))
  // We still have to invoke the result at the end because oneAtATime returns a function
  .reduce(oneAtATime)()

This still has to do the function trick to delay execution, but we are starting to get somewhere. We still have the .map chaining, something at the end to mean "collect all the results", and execution one at a time. It still has some ugly bits though:

  • We have to construct functions that construct functions so that we can invoke them later. That empty set of parentheses in the .map function are ugly. So are the empty set of parentheses at the .reduce.
  • What if employeeIds is an empty array? Uncaught TypeError: Reduce of empty array with no initial value. That's bad. We should get here a promise that resolves immediately to an empty array.
  • This construction is simple and works, but can only deal with one promise at a time. It will require lots of black magic to do things like two promises at a time.

One style of black magic: monkey-patching

Let's try to fix the Uncaught TypeError: Reduce of empty array with no initial value that happens because of .reduce.

Array.prototype.awaitAll = function() {
  const that = this
  return async () => {
    const result = []
    for(let i of that) {
      result.push(await i())
    }
    return result
  }
}

Now we can do:

const employeeIds = [33, 47, 112]
// This is not real Javascript
const results = await employeeIds
  .map((employeeId) => () => api.doSomethingWithEmployee(employeeId))
  .awaitAll()()

Which is nicer, but creating new methods in Array.prototype is something that crosses a few too many red lines. And we still have to do the extra parentheses all over the place. The good part is that .awaitAll could receive a parameter that specifies how the whole thing is going to be processed, and it has access to the whole array, and it returns a single promise, and it doesn't have to incrementally chain promises. So it should be easy to implement your favorite concurrency algorithms there.

Can we get rid of monkey-patching?

We are not fans of Array.prototype.awaitAll = .... Can we do something about it?

class Concurrent {
  constructor() {
    this.tasks = []
  }
  add(task) {
    return new Promise((resolve, reject) => {
      this.tasks.push({task, resolve, reject})
    })
  }
  async start() {
    for(let {task, resolve, reject} of this.tasks) {
      try {
        resolve(await task())
      } catch(e) {
        reject(e)
      }
    }
  }
}

const concurrent = new Concurrent();
const employeeIds = [33, 47, 112]
const operations = employeeIds
  .map((employeeId) => concurrent.add(() => api.doSomethingWithEmployee(employeeId)))
concurrent.start()
const results = await Promise.all(operations)

That is not very nice. We got rid of the monkey-patching, but now we have to deal with adding, starting and then awaiting. Code got significantly uglier.

To improve this we need a couple of realizations:

  • We can start executing promises as soon as we add the first one. We have to be careful with concurrency to prevent multiple threads of executors.
  • .map() receives a function as a parameter. That function can be constructed on-site.
  • Classes can return different objects via the constructor returning something different. Something like a function. With these realizations, we can do some dark incantations and get somewhere interesting:
class OneAtATime {
  constructor() {
    this.tasks = []
    const that = this
    const adder = async (task) => {
      return new Promise((resolve, reject) => {
        that.tasks.push({task, resolve, reject})
        that.run()
      })
    }
    // `new OneAtATime()` will return the `adder` function
    return adder
  }

  async run() {
    // Runner with lock to prevent multiple threads at the same time
    if(this.running) {
      return
    }
    this.running = true
    while(this.tasks.length > 0) {
      const {task, resolve, reject} = this.tasks.shift()
      try {
        const result = await task()
        resolve(result)
      } catch(e) {
        reject(e)
      }
    }
    this.running = false
  }
}

const employeeIds = [33, 47, 112]
const results = await Promise.all(
  employeeIds
    .map((employeeId) => () => api.doSomethingWithEmployee(employeeId))
    .map(new OneAtATime())
)

That's some interesting trickery there. new OneAtATime() constructs a single concurrency manager while returning a function to add tasks to the concurrency manager. But honestly, there may be a bit too much magic there.

Turning it around

Thinking it in a more broad way, the problem is that if we do .map((employeeId) => api.doSomethingWithEmployee(employeeId)) the api.doSomethingWithEmployee can't stop itself from running immediately. And all the stuff we have constructed is trickery to prevent it from running immediately. What if we make api.doSomethingWithEmployee be a bit more patient? Maybe we can wrap api.doSomethingWithEmployee in something and then we don't have to do other trickery:

class OneAtATime {
  constructor() {
    this.tasks = []
  }

  wrap(f) {
    // Wrap the promise-returning function so that it runs in the OneAtATime context
    const that = this
    return (...params) => {
      return new Promise((resolve, reject) => {
        const task = () => f(...params)
        that.tasks.push({task, resolve, reject})
        that.run()
      })
    }
  }

  async run() {
    // Runner with lock to prevent multiple threads at the same time
    if(this.running) {
      return
    }
    this.running = true
    while(this.tasks.length > 0) {
      const {task, resolve, reject} = this.tasks.shift()
      try {
        const result = await task()
        resolve(result)
      } catch(e) {
        reject(e)
      }
    }
    this.running = false
  }
}

const concurrencyController = new OneAtATime()

const controlledDoSomethingWithEmployee = concurrencyController.wrap(api.doSomethingWithEmployee(employeeId))

// This is the same code that we used as a start
const employeeIds = [33, 47, 112]
const results = await Promise.all(
  employeeIds
    .map(controlledDoSomethingWithEmployee)
)

It requires some significant set-up, but yields code easy to understand that effectively hides away the magics done to control concurrency. We do plain old Promise.all with .map to promise, yet it runs in a controlled way.

Back to index
Copyright © 2018-2023 Javier Casas - All rights reserved.