Controlling multiple HTTP requests with RxJS
There are several situations where a UI-component depends on data from multiple sources. Maybe it's necessary to load the first chunk of data before we can even decide which other data is required. In those cases it can become confusing and complex to control the order and start of execution of all those requests. In this article we will cover mutiple options to better orchestrate a multitude of requests using the reactive programming library RxJS.
TL;DR
switchMap
if you need to wait for the result of a first request to start the secondcombineLatest
if you need to run requests in parallelconcat
+toArray
if you want your requests to run in serialfrom(list).pipe(mergeMap(() => ..., N))
if you need to limit the amount of concurrent requests- Stackblitz link to a demonstration of the concepts of this article: Stackblitz
Setting the stage
Let's imagine we want to render a page that displays a list of users. For this we send a request to /users
to fetch the required data.
interface UserSummary {
id: number;
name: string;
}
function loadUsers(): Promise<UserSummary[]> {
return fetch("/users").then((response) => response.json());
}
We wrote this data loading function with the fetch API, but will wrap the returned promise into an observable in the next step in order to leverage the RxJS operators that will help us control multiple data dependencies. If you are working in an Angular project, you can of course directly load the data with the built-in HTTPClient.
Now, we can get an observable of the users by using the from
function from RxJS to turn our promise into an observable that emits the loaded data and completes.
import { from } from "rxjs";
const users$: Observable<UserSummary[]> = from(loadUsers());
users$.subscribe((users) => console.log(users));
Note: Observables are lazy! If noone subscribes to the users$
observable, no request will be issued. Additionally, each subscriber will trigger a separate request to the backend.
As you can see above, we only get back a list of user summaries from our endpoint. Sadly, our product manager wants to display the users avatar next to the user name. If only our project used GraphQL, then we could just add the avatarUri field to the query to get all data with a single request... Howoever, in our case, we need to wait for the list of users to return to issue one request per user, to retrieve the required information by using a new function loadUserDetails
:
interface UserDetails {
id: number;
name: string;
avatarUri: string;
// ...
}
function loadUserDetails(userId: number): Promise<UserDetails> {
return fetch(`/users/${userId}`).then((response) => response.json());
}
Chaining explicit requests with switchMap
This will be our first hurdle: We want to wait until we got the response to our first request before we start the second request. We could, of course, subscribe to the first observable, and call the loadUserDetails function in the subscriber. This forces us to handle yet another subscription (never leave a subscription unsubscribed), so we'd prefer to have one single RxJS pipeline. To accomplish this, we can use the switchMap operator to transform each value in our users$
observable (a list of UserSummaries
) to a new observable of a list of UserDetails
.
const userDetails$ = users$.pipe(
switchMap((userSummaries) => {
const userDetailObservables = userSummaries.map((s) =>
from(loadUserDetails(s.id))
);
// ...
})
);
By using switchMap
, we can transform each value of the outer observable, into a new inner observable. switchMap
guarantees, that whenever a new value is emitted from the outer observable, the inner observable is closed and thrown away, before the next inner observable is created. This means that we discard old userDetailRequests as soon as we get a new list of userSummaries.
Until now, our userDetails
code is incomplete. In the first step of the switchMap
we transform each UserSummary
into an Observable<UserDetails>
. We now have a list of observables, that will each emit a separate UserDetail
object. Our task is to somehow combine this list of observables into one single observbable of the underlying list of UserDetails
objects. To do this, we have multiple options:
Running requests in parallel with combineLatest
The first scenario is that we want to fire all possible requests at the same time. The browser will then decide, how many requests are really being sent at the same time but we don't have to worry about that:
const userDetails$ = users$.pipe(
switchMap((userSummaries) => {
const userDetailObservables = userSummaries.map((s) =>
from(loadUserDetails(s.id))
);
const userDetails$ = combineLatest(userDetailObservables);
return userDetails$;
})
);
combineLatest
takes an array of observables, directly subscribes to each and waits until all of those observables emit once. Once each emitted, combineLatest
emits an array of the last emitted values from each observable until all input observables complete. Since we are only using http requests as sources for our observables, each observable will complete after the first emitted value. This means we could also use forkJoin
, which waits until all sources complete before it emits the array of emitted values.
Since both combineLatest
and forkJoin
instantly subscribe to all source observables (once we subscribe to the combined observable), all requests are triggered (more or less) at the same time. This can cause problems: Browsers have a limit of how many requests can run in parallel (somewhere between 3 and 15, depending on the browser). When we now fill the request pipeline with 100 requests (one for each user), the browser may defer any other incoming request until the first 100 are done, which might block other, more critical, requests on our page. To solve this problem, we could opt to run all of our requests in serial:
Running requests in serial with concat
+ toArray
const userDetails$ = users$.pipe(
switchMap(userSummaries => {
const userDetailObservables = userSummaries.map(s =>
from(loadUserDetails(s.id))
);
const userDetails$ = concat(...userDetailObservables).pipe(toArray());
return userDetails$;
})
);
Here we use the concat
function from RxJS to chain all observables in a sequence: concat
takes multiple observables, subscribes to the first one and emits all its values until it is completed. Once it is completed, it subscribes to the next one, again waiting until it is completed. This goes on until the last request is done. concat
produces a new observable emitting all values in a sequence, so that we have to gather them into an array, by using the toArray
operator: This gathers all emitted values from an observable and emits all these values as one array once the source completes.
As with the forkJoin
operator from earlier, this only works when our source observables complete, since we will get stuck at one observable if it emits an endless sequence of values.
Now, we are able to gently request one resource after the other from our server. Of course, this will drastically increase the loading time of the page, since we are not doing any requests in parallel. Ideally we would combine both approaches so that we can specify a maximum number of parallel requests:
Running requests with a maximum concurrency by combining serial and parallel
With RxJS, this quite complex task becomes very easy to manage: All we have to do is create n chunks of observables by splitting the userDetailObservables
into n arrays, while n defines our concurrency parameter. For this task we will use one of Lodash's helper methods chunk
. Of course you could also reimplement that for yourself if you want to skip the dependency.
const userDetails$ = users$.pipe(
switchMap(userSummaries => {
const userDetailObservables = userSummaries.map(s =>
from(loadUserDetails(s.id))
);
const chunkSize = Math.ceil(userDetailObservables.length / 4);
const chunkedUserDetailObservables = _.chunk(
userDetailObservables,
chunkSize
);
// ...
})
);
All of the observables within one chunk will be run in serial (concat
+ toArray
) and combined into one resulting observable with chunks.map(chunk => ...)
. Additionally, all chunks themselves will be run in parallel (combineLatest
or forkJoin
) and combined (chunkResults.flat(1)
) when all data is ready:
const userDetails$ = users$.pipe(
switchMap(userSummaries => {
const userDetailObservables = userSummaries.map(s =>
from(loadUserDetails(s.id))
);
const chunkSize = Math.ceil(userDetailObservables.length / 4);
const chunkedUserDetailObservables = _.chunk(
userDetailObservables,
chunkSize
);
const userDetails$ = combineLatest(
chunkedUserDetailObservables.map(chunk =>
concat(...chunk).pipe(toArray())
)
).pipe(map(chunkResults => chunkResults.flat(1)));
return userDetails$;
})
);
While still being a bit complex, RxJS allowed us to simply combine our logic for running observables serially or in parallel into a solution where you can specify the number of concurrent request depending on your environment.
Running multiple requests with a max concurrency with mergeMap
After I finished with the above solution, I decided to reasearch a bit more about this problem space and, lo and behold, RxJS already has a builtin solution to this problem: mergeMap
. At first I was a bit stumped. I always understood mergeMap
like that: Transform every value of an observable into a new observable. When the input observable emits, keep the output observables of the previously emitted inputs while directly subscribing to the next one. This means run in parallel! This is why I ruled mergeMap
out for serialization of requests and used concat. However, mergeMap
takes an optional second parameter called concurrent
. With this, you can specify, how many output observables should be active at the same time. When this maximum number is reached, a new output observable will only be subscribed, when one other output observable completes. This is exactly what we hand roled earlier! This means, that we can rewrite our code like this:
const userDetails$ = users$.pipe(
switchMap((userSummaries) =>
from(userSummaries).pipe(
mergeMap((summary) => from(loadUserDetails(summary.id)), 4),
toArray()
)
)
);
The pipeline follows these steps: Whenever users$
emits a new list of UserSummary
objects, we complete previous inner observables and create a new one (switchMap
). This new observable emits all single userSummary
objects (by using from
) and transforms each userSummary into an observable of userDetails (mergeMap
). By specifying the concurrent parameter, we define that only 4 userDetails requests should run at the same time. In the end, we gather all completed detail objects into a final array.
The nice thing about this solution is that we can freely decide if we need more or less concurrency: By setting the parameter to 1, all requests will run in serial, while setting it to Infinity will result in parallel requests.
What have I learned?
In the end, the most important lesson from this journey was that you can always do a bit more research before diving into the implementation of a solution. However, by trying to rebuild a complex operation like the maximum concurrent requests, you can definitely improve your understanding of a tool and the problems that it is able to solve elegantly.