Koordination von mehreren HTTP-Abfragen mit RxJS
Es gibt mehrere Situationen, in denen eine UI-Komponente von Daten von mehreren Quellen abhängt. Vielleicht ist es notwendig, den ersten Datenblock zu laden, bevor wir überhaupt entscheiden können, welche anderen Daten benötigt werden. In solchen Fällen kann es verwirrend und komplex werden, die Reihenfolge und den Beginn all dieser Anfragen zu kontrollieren. In diesem Artikel werden wir mehrere Optionen zur besseren Orchestrierung von Abfragen mithilfe der Bibliothek für reaktive Programmierung RxJS behandeln.
ZL;NG
-
switchMap
wenn du auf das Ergebnis einer Abfrage warten musst, bevor du eine zweite starten kannst -
combineLatest
wenn du mehrere Abfragen parallel starten möchtest -
concat
+toArray
wenn du Abfragen hintereinander starten möchtest -
from(list).pipe(mergeMap(() => ..., N))
wenn du die Anzahl von parallelen Abfragen limitieren möchtest - Stackblitz-Link zur Demonstration der Konzepte dieses Artikels: Stackblitz
Laden von Nutzerdaten
Stellen wir uns vor, wir wollen eine Seite rendern, die eine Liste von Benutzern anzeigt. Dazu wird eine Anfrage an /users
gesendet, um die benötigten Daten zu laden.
interface UserSummary {
id: number;
name: string;
}
function loadUsers(): Promise<UserSummary[]> {
return fetch("/users").then((response) => response.json());
}
Wir haben unsere API-Funktion mit der Fetch-API geschrieben, werden aber im nächsten Schritt das zurückgegebene Promise in ein Observable umwandeln, um RxJS-Operatoren nutzen zu können, mit denen wir dann mehrere Requests miteinander koordinieren. Wenn du an einem Angular-Projekt arbeitest, kannst du die Daten natürlich auch direkt mit dem eingebauten HTTPClient laden.
Jetzt können wir ein Observable der Benutzer erzeugen, indem wir die from
-Funktion von RxJS verwenden, um unser Promise in ein Observable umzuwandeln, das die geladenen Daten emitiert und anschließend geschlossen wird.
import { from } from "rxjs";
const users$: Observable<UserSummary[]> = from(loadUsers());
users$.subscribe((users) => console.log(users));
Beachte: Observables sind "lazy"! Wenn niemand auf das Observable users$
subscribed, wird keine Anfrage abgeschickt. Außerdem löst jeder neue Subscriber eine separate Anfrage an das Backend aus.
Wie du hier sehen kannst, erhalten wir von unserem Endpunkt nur eine Liste von UserSummary
-Objekten zurück. Leider möchte unser Produktmanager zusätzlich zum Namen noch den Avatar des Benutzers anzeigen. Wenn unser Projekt jetzt GraphQL verwenden würde, dann könnten wir einfach das Feld avatarUri
zur Abfrage hinzufügen, um alle Daten mit einer einzigen Anfrage zu erhalten... Leider ist das in unserem imaginären Szenario nicht der Fall. Hier müssen wir warten, bis die Liste der Benutzer zurückkommt, um anschließend eine neue Anfrage pro Benutzer zu stellen, um die benötigten Informationen mit einer neuen API loadUserDetails
abzurufen:
interface UserDetails {
id: number;
name: string;
avatarUri: string;
// ...
}
function loadUserDetails(userId: number): Promise<UserDetails> {
return fetch(`/users/${userId}`).then((response) => response.json());
}
Verkettung von mehreren Requests mit switchMap
:
Unsere erste Herausforderung ist jetzt die Folgende: Wir wollen warten, bis wir die Antwort auf unsere erste Anfrage erhalten haben, bevor wir die zweite starten. Wir könnten natürlich auf das erste Observable subscriben und die Funktion loadUserDetails
im Subscriber aufrufen. Das zwingt uns dazu, noch eine weitere Subscription zu behandeln (Niemals das unsubscribe vergessen!), weswegen ich die Modellierung in nur einer kombinierten RxJS-Pipeline bevorzuge. Um das zu erreichen, können wir den switchMap
-Operator verwenden, um jeden Wert in unserem users$
-Observable (eine Liste von UserSummary
-Objekten) in eine neues Observable einer Liste von UserDetail
-Objekten zu transformieren.
const userDetails$ = users$.pipe(
switchMap((userSummaries) => {
const userDetailObservables = userSummaries.map((s) =>
from(loadUserDetails(s.id))
);
// ...
})
);
Durch die Verwendung von switchMap
können wir jeden Wert des äußeren Observables in ein neues inneres Observable umwandeln. switchMap
garantiert uns dabei, dass jedes Mal, wenn ein neuer Wert vom äußeren Observable ausgegeben wird, das innere Observable geschlossen und weggeworfen wird, bevor das nächste innere Observable erstellt wird. Das bedeutet, dass wir alte userDetail
Requests verwerfen, sobald wir eine neue Liste von userSummary
-Objekten erhalten.
Bis jetzt ist unser userDetails
Code unvollständig. Im ersten Schritt im switchMap
verwandeln wir jede UserSummary
in ein Observable<UserDetails>
. Wir haben nun eine Liste von Observables, die jeweils ein separates UserDetail
-Objekt ausgeben werden. Unsere Aufgabe ist es jetzt, diese Liste von Observables irgendwie zu einem einzigen Observable der zugrundeliegenden Liste von UserDetails
-Objekten zu kombinieren. Um das zu erreichen, haben wir mehrere Möglichkeiten:
HTTP-Abfragen mit combineLatest
parallel laufen lassen:
Das erste Szenario ist, dass wir alle möglichen Anfragen gleichzeitig abfeuern wollen. Der Browser entscheidet dann, wie viele Anfragen wirklich gleichzeitig gesendet werden, aber darum müssen wir uns zunächst nicht kümmern:
const userDetails$ = users$.pipe(
switchMap((userSummaries) => {
const userDetailObservables = userSummaries.map((s) =>
from(loadUserDetails(s.id))
);
const userDetails$ = combineLatest(userDetailObservables);
return userDetails$;
})
);
combineLatest
nimmt ein Array von Observables, subscribed auf jedes direkt und wartet, bis alle diese Observables einmal emittieren. Sobald jedes emittiert hat, emittiert combineLatest
ein Array der zuletzt ausgegebenen Werte von jedem Observable, bis alle Eingabe-Observables abgeschlossen sind. Da wir nur HTTP-Anfragen als Quellen für unsere Observables verwenden, wird jedes Observable nach dem ersten emittierten Wert abgeschlossen. Das bedeutet, dass wir auch den forkJoin
-Operator verwenden könnten, der wartet, bis alle Quellen abgeschlossen sind, bevor er das Array der gesendeten Werte herausgibt.
Da sowohl combineLatest
als auch forkJoin
sofort auf alle Quell-Observables subscriben (sobald wir auf das resultierende Observable subscriben), werden alle Anfragen (mehr oder weniger) zur gleichen Zeit ausgelöst. Dies kann zu Problemen führen: Browser haben ein Limit, wie viele Requests parallel laufen können (irgendwo zwischen 3 und 15, je nach Browser). Wenn wir nun die Request-Pipeline mit 100 Anfragen füllen (eine für jeden Benutzer), kann es passieren, dass der Browser weitere eingehende Anfrage zurückstellt, bis die ersten 100 erledigt sind, was andere, kritischere Anfragen auf unserer Seite blockieren könnte. Um dieses Problem zu lösen, könnten wir uns dafür entscheiden, alle unsere Requests in Serie auszuführen:
Serielle Ausführung von Abfragen mit concat
+ toArray
const userDetails$ = users$.pipe(
switchMap(userSummaries => {
const userDetailObservables = userSummaries.map(s =>
from(loadUserDetails(s.id))
);
const userDetails$ = concat(...userDetailObservables).pipe(toArray());
return userDetails$;
})
);
Hier verwenden wir die Funktion concat
von RxJS, um alle Observables in einer Sequenz zu verketten: concat
nimmt mehrere Observables, subscribed auf das erste und gibt alle seine Werte aus, bis es abgeschlossen ist. Sobald es abgeschlossen ist, abonniert concat
das nächste und wartet wiederum, bis es abgeschlossen ist. Das geht immer so weiter, bis die letzte Anfrage abgeschlossen ist. concat
erzeugt schließlich ein neues Observable, welches alle Werte in einer Sequenz ausgibt, so dass wir diese in einem Array sammeln müssen, indem wir den Operator toArray" verwenden: Dieser sammelt alle emittierten Werte eines Observable und emittiert alle diese Werte als ein Array, sobald die Quelle abgeschlossen ist.
Wie bei dem forkJoin
-Operator von vorhin funktioniert das nur, wenn unsere Quell-Observables endlich sind, da wir bei einem Observable stecken bleiben würden, wenn es eine endlose Folge von Werten ausgibt.
Jetzt sind wir in der Lage, höflich eine Ressource nach der anderen von unserem Server anzufordern. Natürlich wird dies die Ladezeit der Seite drastisch erhöhen, da wir keine Anfragen parallel durchführen. Idealerweise würden wir beide Ansätze kombinieren, so dass wir eine maximale Anzahl von parallelen Anfragen festlegen können:
Maximale Anzahl paralleler Abfragen durch serielle und parallele Konzepte limitieren
Mit RxJS wird diese recht komplexe Aufgabe relativ einfach zu bewältigen: Alles, was wir tun müssen, ist n Blöcke von Observables zu erstellen, indem wir die userDetailObservables
in n Listen aufteilen, wobei n unseren "Parallelitätsfaktor" definiert. Für diese Aufgabe werden wir eine von Lodashs Hilfsmethoden chunk
verwenden. Natürlich kannst du das auch selbst in 5 Zeilen implementieren, um dir eine weitere Dependency zu sparen.
const userDetails$ = users$.pipe(
switchMap(userSummaries => {
const userDetailObservables = userSummaries.map(s =>
from(loadUserDetails(s.id))
);
const chunkSize = Math.ceil(userDetailObservables.length / 4);
const chunkedUserDetailObservables = _.chunk(
userDetailObservables,
chunkSize
);
// ...
})
);
Alle Observables innerhalb eines Blocks werden seriell ausgeführt (concat
+ toArray
) und mit chunks.map(chunk => ...)
zu einem resultierenden Observable kombiniert. Zusätzlich werden alle Blöcke selbst parallel zueinander ausgeführt (combineLatest
oder forkJoin
) und kombiniert (chunkResults.flat(1)
), wenn alle Daten geladen sind:
const userDetails$ = users$.pipe(
switchMap(userSummaries => {
const userDetailObservables = userSummaries.map(s =>
from(loadUserDetails(s.id))
);
const chunkSize = Math.ceil(userDetailObservables.length / 4);
const chunkedUserDetailObservables = _.chunk(
userDetailObservables,
chunkSize
);
const userDetails$ = combineLatest(
chunkedUserDetailObservables.map(chunk =>
concat(...chunk).pipe(toArray())
)
).pipe(map(chunkResults => chunkResults.flat(1)));
return userDetails$;
})
);
Auch wenn das Resultat immer noch etwas kompliziert aussieht, ist es uns mit RxJS möglich, unsere Logik für die serielle und parallele Ausführung von Observables einfach zu einer Lösung kombinieren, bei der wir die Anzahl der gleichzeitigen Anfragen selbst festlegen können.
Eingebaute Limitierung der Parallelität mit mergeMap
Nachdem ich mit der obigen Lösung fertig war, habe ich schlossen, ein wenig mehr über diesen Problembereich zu recherchieren und siehe da, RxJS hat bereits eine eingebaute Lösung: mergeMap
. Zuerst war ich ein wenig überrascht. Ich habe mergeMap
immer so verstanden: Man transformiert jeden Wert eines Observables in ein neues Observable. Wenn das Input-Observable emittiert, behalte ich die Output-Observables der vorher emittierten Inputs und erzeuge parallel dazu ein neues Output-Observable. Das bedeutet: parallel laufen! Aus diesem Grund habe ich mergeMap
für die Serialisierung von Anfragen ausgeschlossen und concat verwendet. Allerdings nimmt mergeMap
einen optionalen zweiten Parameter namens concurrent
. Mit diesem können wir einfach direkt angeben, wie viele Output-Observables gleichzeitig aktiv sein sollen. Wenn diese maximale Anzahl erreicht ist, wird ein neues Output-Observable erst dann subscribed, wenn ein vorheriges Output-Observable abgeschlossen ist. Das ist genau das, was wir vorher mit Hand gemacht haben! Das bedeutet, dass wir unseren Code wie folgt vereinfachen können:
const userDetails$ = users$.pipe(
switchMap((userSummaries) =>
from(userSummaries).pipe(
mergeMap((summary) => from(loadUserDetails(summary.id)), 4),
toArray()
)
)
);
Diese Pipeline folgt diesen Schritten: Immer wenn users$
eine neue Liste von UserSummary
-Objekten ausgibt, beenden wir die vorherigen inneren Observables und erstellen ein neues (switchMap
). Dieses neue Observable gibt alle einzelnen UserSummary
-Objekte aus (durch Verwendung von from
) und wandelt jede userSummary
in ein Observable von userDetails
um (mergeMap
). Durch die Angabe des concurrent
-Parameters definieren wir, dass nur 4 userDetails
-Anfragen zur gleichen Zeit laufen sollen. Am Ende sammeln wir alle fertigen Detailobjekte in einem finalen Array.
Das Schöne an dieser Lösung ist, dass wir frei entscheiden können, ob wir mehr oder weniger Parallelität benötigen: Wenn wir den Parameter auf 1 setzen, werden alle Anfragen seriell ausgeführt, während das Setzen auf Infinity zu parallelen Anfragen führt.
Was haben wir gelernt?
Am Ende war die wichtigste Lehre aus dieser Reise, dass man immer ein bisschen mehr Recherche betreiben kann, bevor man sich in die Implementierung einer Lösung stürzt. Wenn man jedoch versucht, eine komplexe Operation wie die Limitierung der gleichzeitig laufenden Abfragen nachzubauen, kann man definitiv sein Verständnis für ein Werkzeug und die Probleme, die es elegant lösen kann, verbessern.