We want to run some concurrent operation (usually disk or network operation). But we also want some control over it. The classic use case is using an API with some rate limiting, thus forcing us to do one (or a limited amount) request at a time. We want to be able to use language primitives as much as possible, while having control over how many requests are being run in parallel.
This is JavaScript, so the main way to do this is via Promises, as we want to have code that looks kinda linear and try to hide the callbacks as much as possible behind async
syntax.
The baseline
The most basic approach is just map async
and Promise.all
(or Promise.allSettled
).
const employeeIds = [33, 47, 112]
await Promise.all(
employeeIds.map(async (employeeId) => {
await api.doSomethingWithEmployee(employeeId);
})
)
The code is quite simple: we just Promise.all
a bunch of promises that are the result of requesting multiple operations.
If you do this, the system will try to execute all the async operations at the same time
and a request will be started immediately for each entry in employeeIds
.
There are some exceptions, for example the browser will usually run around 4 concurrent requests.
But, in any case, it's external things rate limiting us, so it's not our code in control.
Naive approach
We have Array.prototype.forEach
that kinda exists for this purpose:
const employeeIds = [33, 47, 112]
employeeIds.forEach(async (employeeId) => {
await api.doSomethingWithEmployee(employeeId)
})
In this case, we kinda stated a bit better what we want to do in the code, but everything went wrong:
- We no longer await for the tasks to complete before trying to do other stuff. We "fire and forget" the tasks. I hope you didn't need to do anything with the return values of the tasks.
- There is no way to wait for a task to finish before starting the next.
This means, effectively, that forEach
only works as expected for non-async tasks.
Controlling the execution
The initial problem is that we launch all the promises immediately (in the .map
), whereas we may want to delay some requests.
So let's start with converting the promises into tasks to be triggered under our control:
const employeeIds = [33, 47, 112]
const tasks = employeeIds.map((employeeId) => async () => {
await api.doSomethingWithEmployee(employeeId)
})
// Run the tasks one at a time, waiting for it to finish before starting the next
for(let task of tasks) {
await task()
}
This kinda starts to improve, but we are starting to run into friction. We have to construct a for
loop, it's no longer part of a map
chain, and our only control is "one at a time".
Trying to put the map chain back
That for
loop is ugly. I also dislike that Promise.all
. I would like to have something like:
const employeeIds = [33, 47, 112]
// This is not real Javascript
await employeeIds
.map((employeeId) => api.doSomethingWithEmployee(employeeId))
.awaitAll(oneAtATime)
That kinda follows what's going on in the system. We moved the Promise.all
to the end of the map
chain because we expect to await
for all the promises after we have started them.
But this conflicts with the reality of JavaScript, as, again, we would start all the promises at the same time,
then the awaiting is no longer in control of what runs when.
But it serves as food for thought, and allows us to try to get somewhere. Instead of the non-existing .awaitAll
, let's try to .reduce
:
const employeeIds = [33, 47, 112]
// Function to .reduce by constructing a chain of promises
async function oneAtATime(accumulatedPromises, newPromise, index) {
return async () => {
// We check for index here because
// for the first iteration accumulatedPromises don't contain an accumulatedPromise,
// but just a promise of the first element of the array
const rest = (index === 1 ? [await accumulatedPromises()] : await accumulatedPromises();
const current = await newPromise()
return [...rest, current]
}
}
const results = await employeeIds
.map((employeeId) => () => api.doSomethingWithEmployee(employeeId))
// We still have to invoke the result at the end because oneAtATime returns a function
.reduce(oneAtATime)()
This still has to do the function trick to delay execution, but we are starting to get somewhere.
We still have the .map
chaining, something at the end to mean "collect all the results", and execution one at a time. It still has some ugly bits though:
- We have to construct functions that construct functions so that we can invoke them later. That empty set of parentheses in the
.map
function are ugly. So are the empty set of parentheses at the.reduce
. - What if
employeeIds
is an empty array?Uncaught TypeError: Reduce of empty array with no initial value
. That's bad. We should get here a promise that resolves immediately to an empty array. - This construction is simple and works, but can only deal with one promise at a time. It will require lots of black magic to do things like two promises at a time.
One style of black magic: monkey-patching
Let's try to fix the Uncaught TypeError: Reduce of empty array with no initial value
that happens because of .reduce
.
Array.prototype.awaitAll = function() {
const that = this
return async () => {
const result = []
for(let i of that) {
result.push(await i())
}
return result
}
}
Now we can do:
const employeeIds = [33, 47, 112]
// This is not real Javascript
const results = await employeeIds
.map((employeeId) => () => api.doSomethingWithEmployee(employeeId))
.awaitAll()()
Which is nicer, but creating new methods in Array.prototype
is something that crosses a few too many red lines.
And we still have to do the extra parentheses all over the place.
The good part is that .awaitAll
could receive a parameter that specifies how the whole thing is going to be processed,
and it has access to the whole array, and it returns a single promise, and it doesn't have to incrementally chain promises.
So it should be easy to implement your favorite concurrency algorithms there.
Can we get rid of monkey-patching?
We are not fans of Array.prototype.awaitAll = ...
. Can we do something about it?
class Concurrent {
constructor() {
this.tasks = []
}
add(task) {
return new Promise((resolve, reject) => {
this.tasks.push({task, resolve, reject})
})
}
async start() {
for(let {task, resolve, reject} of this.tasks) {
try {
resolve(await task())
} catch(e) {
reject(e)
}
}
}
}
const concurrent = new Concurrent();
const employeeIds = [33, 47, 112]
const operations = employeeIds
.map((employeeId) => concurrent.add(() => api.doSomethingWithEmployee(employeeId)))
concurrent.start()
const results = await Promise.all(operations)
That is not very nice. We got rid of the monkey-patching, but now we have to deal with adding, starting and then awaiting. Code got significantly uglier.
To improve this we need a couple of realizations:
- We can start executing promises as soon as we add the first one. We have to be careful with concurrency to prevent multiple threads of executors.
.map()
receives a function as a parameter. That function can be constructed on-site.- Classes can return different objects via the
constructor
returning something different. Something like a function. With these realizations, we can do some dark incantations and get somewhere interesting:
class OneAtATime {
constructor() {
this.tasks = []
const that = this
const adder = async (task) => {
return new Promise((resolve, reject) => {
that.tasks.push({task, resolve, reject})
that.run()
})
}
// `new OneAtATime()` will return the `adder` function
return adder
}
async run() {
// Runner with lock to prevent multiple threads at the same time
if(this.running) {
return
}
this.running = true
while(this.tasks.length > 0) {
const {task, resolve, reject} = this.tasks.shift()
try {
const result = await task()
resolve(result)
} catch(e) {
reject(e)
}
}
this.running = false
}
}
const employeeIds = [33, 47, 112]
const results = await Promise.all(
employeeIds
.map((employeeId) => () => api.doSomethingWithEmployee(employeeId))
.map(new OneAtATime())
)
That's some interesting trickery there. new OneAtATime()
constructs a single concurrency manager
while returning a function to add tasks to the concurrency manager. But honestly, there may be a bit too much magic there.
Turning it around
Thinking it in a more broad way, the problem is that if we do .map((employeeId) => api.doSomethingWithEmployee(employeeId))
the api.doSomethingWithEmployee
can't stop itself from running immediately.
And all the stuff we have constructed is trickery to prevent it from running immediately.
What if we make api.doSomethingWithEmployee
be a bit more patient?
Maybe we can wrap api.doSomethingWithEmployee
in something and then we don't have to do other trickery:
class OneAtATime {
constructor() {
this.tasks = []
}
wrap(f) {
// Wrap the promise-returning function so that it runs in the OneAtATime context
const that = this
return (...params) => {
return new Promise((resolve, reject) => {
const task = () => f(...params)
that.tasks.push({task, resolve, reject})
that.run()
})
}
}
async run() {
// Runner with lock to prevent multiple threads at the same time
if(this.running) {
return
}
this.running = true
while(this.tasks.length > 0) {
const {task, resolve, reject} = this.tasks.shift()
try {
const result = await task()
resolve(result)
} catch(e) {
reject(e)
}
}
this.running = false
}
}
const concurrencyController = new OneAtATime()
const controlledDoSomethingWithEmployee = concurrencyController.wrap(api.doSomethingWithEmployee(employeeId))
// This is the same code that we used as a start
const employeeIds = [33, 47, 112]
const results = await Promise.all(
employeeIds
.map(controlledDoSomethingWithEmployee)
)
It requires some significant set-up, but yields code easy to understand
that effectively hides away the magics done to control concurrency.
We do plain old Promise.all
with .map
to promise, yet it runs in a controlled way.