zum Inhalt

Der lise Blog:
Einsichten, Ansichten, Aussichten

Angular(2+) und der Einstieg in die Welt von RxJS

09. Februar 2018 von Avatar of Alexander W.Alexander W.

Kürzlich haben wir von einem Kunden den Auftrag zur Erweiterung einer bestehenden Anwendung erhalten. Bei dem System handelt es sich um einen Produkt-Konfigurator für einen der vielen Unternehmensbereiche des Kunden. Nun sollte ein Planungsmodul für einen weiteren Unternehmensbereich hinzukommen. Die Anwendung bestand bis dahin aus einer .Net Core MVC6 WebApi, angelehnt an das REST Paradigma für Webschnittstellen sowie einer Single Page Application (SPA) auf Basis von AngularJS (v1.6.x).

Da davon auszugehen ist, dass in Zukunft noch mehr Konfigurator-Module entwickelt werden müssen, haben wir uns entschieden, für die neuen Module nicht mehr das in die Jahre gekommene AngularJs zu verwenden, sondern auf die neuere Version Angular (2+) umzusteigen. Da jedoch eine Migration von AngularJs zur neueren Version nicht ohne weiteres möglich ist, sollten die alte sowie die neue Anwendung parallel laufen. Eine Portierung des vorhandenen Produkt-Konfigurators sollte später ebenfalls auf die neue Basis erfolgen können.

Auf die Unterschiede sowie Vor- und Nachteile von AngularJs zu Angular 2+ möchte ich hier gar nicht genauer eingehen. Dazu gibt es im Internet genügend Informationen. Ein in meinen Augen sehr wichtiger Punkt wird jedoch häufig verschwiegen oder nur am Rande erwähnt: Angular 2+ setzt auf das Framework/Library RxJS und setzt damit in Teilen ein komplett anderes Programmierparadigma ein!

RxJS

RxJS ist der Javascript/Typescript Ableger von ReactiveX. Dies steht für Reactive Extensions, also „Reaktive Erweiterung“. Das beschreibt das Framework auch recht gut. Es handelt sich dabei um den Versuch, funktionale und reaktive Programmierung in etablierte objektorientierte Programmiersprachen einzuführen. ReactiveX steht für verschiedene Plattformen zur Verfügung, darunter Javascript, Java, C#, PHP, Swift, Ruby und vielen mehr. Der Funktionsumfang sowie die Funktionsnamen unterscheiden sich zwischen den einzelnen Ablegern etwas. Die grundsätzliche Idee ist jedoch immer die Gleiche: die Implementierung des Observerpatterns.

RxJS bringt Observables, also zu beobachtende Objekte mit. Diese können von einem Beobachter abonniert werden, um dann auf Ereignisse reagieren zu können. Angular 2+ stellt verschiedene Schnittstellen bereit, welche eben diese Observables zurückliefern, mit denen dann ein Beobachter  auf zum Beispiel die Antwort einer ausgeführten XHR-Anfrage an eine Webschnittstelle, auf Benutzerinteraktion wie Maus- oder Scroll-Events oder auf Veränderungen des DOM reagieren kann.

Observables vs. Promises

Aus AngularJs ist uns der Umgang mit Promises bereits bekannt. Eingeführt mit ECMAScript 6, und inzwischen von den modernen Browsern auch ohne Polyfill unterstützt, bieten sie die Möglichkeit, Programmfunktionen asynchron auszuführen, also ohne, dass die Anwendung auf die Fertigstellung der Funktion warten muss. Grundsätzlich gilt folgendes: Observables können all das, was Promises konnten, nur besser und zusätzlich noch viel mehr!

Hier eine vereinfachte und auf das Wesentliche für einen Vergleich reduzierte Gegenüberstellung:

 

Promise

  • Kennt drei Zustände:
    • in Ausführung
    • erfolgreich beendet (resolved)
    • nicht erfolgreich beendet (rejected)
  • Nimmt in der Then-Funktion zwei Callback-Funktionen entgegen:
    • onResolve (wird ausgeführt, wenn das Promise erfolgreich war)
    • onReject (wird ausgeführt, wenn das Promise nicht erfolgreich war)

Observable

  • Kennt zwei Zustände:
    • aktiv
    • fertig (completed)
  • Nimmt in der Subscribe-Funktion drei Callback-Funktionen entgegen, mit denen ein Beobachter auf Ereignisse reagieren kann:
    • onNext (wird ausgeführt, wenn das Observable einen Wert ausgibt)
    • onError (wird ausgeführt, wenn das Observable einen Error auslöst, der nicht abgefangen wird)
    • onComplete (wird einmalig ausgeführt, wenn das Observable als fertig markiert wird)

Bis hierhin sieht das erstmal recht ähnlich aus. Der zentrale Unterschied ist, dass das Promise nur genau einmal eins der beiden Callbacks aktiviert. Dass Observable hingegen kann mehrfach ein Event auslösen. Bei jeder Auslösung des Events wird dann das onNext Callback aufgerufen. Wenn das Observable fertig ist, und keine neuen Events mehr auslösen wird, kann es auf den Status fertig gesetzt werden. Dann wird anschließend das onComplete Callback ausgeführt, und der Beobachter storniert sein Abonnement.

Hier nun aber eine gute Nachricht: Wer mit Observables nichts weiter zu tun haben möchte, und weiterhin Promises verwenden kann, kann jedes Observable mit der .toPromise() Funktion zu einem Promise casten und erhält dann ein Promise, welchen das letzte Event vor Fertigstellung des Observables zurückliefert.

Beispiel: Im folgendem Code wird ein neues Observable erstellt und unter der Konstante countObservable gespeichert. Das Observable ruft innerhalb einer While-Schleife das next Callback des Beobachters so oft auf, bis i <= 3 ist.

Das onNext Callback schreibt die Nachrichten, die es aus dem Observable bekommt, in die Konsole. Beim Einsatz des Observables ergeben sich daher fünf Konsolenausgaben. Da beim Umwandeln in ein Promise nur der zuletzt ausgegebene next Wert Verwendung findet, ist die einzige Ausgabe nach dem Casten zum Promise die 3.

Durch die Umwandlung zum Promise gehen also nützliche Funktionen der Observables verloren. Aus meiner Sicht sollte dieser Kompromiss daher nur eingegangen werden, wenn eine Library oder ein Angular-Modul verwendet wird, welches explizit ein Promise erwartet. Viele Bibliotheken/Libraries nehmen aber auch wahlweise Observables oder Promises entgegen.

const countObservable = new Observable( observer => {
                                        let i = 0;
                                        while ( i <= 3) {
                                            observer.next(i++);
                                        }
                                        observer.complete();
                                        });

            //Output: 0
            //Output: 1
            //Output: 2
            //Output: 3
            //Output: observable completed
            countObservable.subscribe(result => {
                console.log(result);
            }, (error) => {
                console.error(error);
            }, () => {
                console.log("observable completed");
            });

            //Output: 3
            countObservable.toPromise().then(result => {
                console.log(result);
            }, error => {
                console.log(error);
            });

 

Vorteile von Observables

Observables können Promises vollständig ersetzen. Dies ist jedoch nur ein kleiner Bestandteil des RxJS Frameworks. Die wirklichen Stärken von RxJS kommen erst zum Vorschein, wenn Observables nicht als einmalig ausgeführte Konstrukte, sondern als Stream behandelt werden. Ein Beobachter abonniert ein Observable und kann so über einen längeren Zeitraum auf Ereignisse reagieren.

Beispiel AlertComponent

Als eine der ersten Dateien in unserem neuen Angular5 Projekt habe ich den AlertService erstellt. Dieser beinhaltet ein Subject (Ein Subject ist ein Observable, auf dessen Methoden auch von außen zugegriffen werden kann) und je eine Methode zum Anzeigen einer Erfolgsmeldung sowie einer Fehlermeldung. Eine Angular-Komponente, die AlertComponent, bekommt diesen AlertService über Constructor Injection übergeben und abonniert anschließend das beschriebene Subject und fügt die erhaltenen Alert-Objekte in ein Array des ViewModels ein.

Möchte ich nun innerhalb der Anwendung einen Alert erzeugen, muss ich nur die Methode des AlertService aufrufen und der Alert erscheint auf dem Bildschirm, da die AlertComponent über den Wunsch zur Alert-Erzeugung über das Subject erfährt. Erzeuge ich keinen Alert, tut die Komponente nichts. Sie reagiert also nur dann, wenn tatsächlich etwas getan werden muss. Dies spart Ressourcen.

In AnglarJs hätte man diesen Anwendungsfall möglicherweise unter zu Hilfenahme eines Watchers gelöst, der ständig ein Array oder Objekt auf Veränderung überprüft. Da dies alle paar Millisekunden geschieht, ist dies ein nicht zu verachtender Rechenaufwand, der somit vermieden werden kann.

Die wichtigsten Operatoren im Detail

RxJS stellt zur Erstellung/Bearbeitung/Aggregierung dieser Streams eine Reihe von Operatoren zur Verfügung. Einige der nachfolgenden Operatoren möchte ich im Anschluss kurz vorstellen: 

  • Aggregieren von verschiedenen Observables, zum Beispiel durch folgende Operatoren
    • concat
    • zip
    • combineLatest
  • Manipulieren, der von Observables gelieferten Werte durch die Operatoren
    • map
    • concatMap, mergeMap(flatMap, selectMany), switchMap
    • reduce, scan
  • Realisieren von Intervallen und Timern mit
    • Observable.timer(…)
    • Observable.interval(…)
  • Filtern von Mengen mit
    • first, last, filter
  • Gruppieren, Transformieren und Zerteilen von Mengen mit
    • groupBy, pluck und partition
  • transparentes, nebenläufiges Ausführen von Code mit der .do Funktion

Grundsätzlich gilt: Ein definiertes Observable oder eine Kette von Observables wird erst dann ausgeführt, wenn es einen Beobachter gibt. Dies ist dann der Fall, wenn mindestens an einer Stelle die .subscribe Funktion des Observables ausgeführt wird und der Stream somit geöffnet ist. An dieser Stelle sei gesagt, dass es neben den normalen Observables und Subjects noch andere, leicht abgewandelte Observables, wie zum Beispiel das BehaviorSubject gibt. Auf diese speziellen Typen werde ich in diesem Rahmen jedoch nicht eingehen können. Das Verhalten der nachfolgend vorgestellten Operatoren kann sich daher mit den verschiedenen Variationen von Observables ggf. leicht unterscheiden.

Mithilfe der oben aufgelisteten Operatoren lassen sich die Ausgaben von Observables verändern, zusammenfassen oder auch filtern. Die Liste von Operatoren ist nicht vollständig. Eine vollständige Liste aller RxJS Funktionen kann der offiziellen Dokumentation entnommen werden.

Da RxJS inzwischen bei Version 5.5 angekommen ist, und in der vergangenen Zeit einige Refactorings erfahren hat, haben einige Operatoren mehrere Namen (Aliases). Dies geschieht einerseits, um einen Wiedererkennungswert zwischen den verschiedenen ReactiveX-Versionen zu schaffen, anderseits aber auch, um auf die programmiersprachen-spezifischen Funktionsnamen einzugehen. In der Programmiersprache Scala beispielsweise gibt es die Funktion foldLeft von Haus aus für Arrays. Darum hat RxScala die den Funktionsnamen foldLeft als Alias für reduce adaptiert.

Do

Mit dem Do-Operator können Aktionen ausgeführt werden, ohne dass diese das Observable manipulieren. Im Grunde ist der Do-Operator einfach ein zwischengeschaltetes Observable, welches das vorher definierte abonniert, und selber die Werte mithilfe der next Funktion weiterreicht. Der Do-Operator wird ausgeführt kurz bevor das Event beim Beobachter ankommt. Dieser Operator wird häufig für Log-Ausgaben verwendet.

Map

Mithilfe des Map-Operators können die vom Observable ausgegebenen Ereignissee manipuliert werden, bevor sie beim Beobachter ankommen. Map erwartet eine Funktion, die den Datentyp des Observables zurückgibt. Hier ein Beispiel: Das Quell-Observable gibt „this is a“ aus. Die Map-Funktion manipuliert das Ereignis so, dass es zusätzlich das Wort „string“ anhängt. Beim Beobachter des Observables kommt somit „this is a string“ an.

const stringObservable = Observable.of("this is a")
                            .map(str => {
                                return str + "string";
                            });

//Output: this is a string
stringObservable.subscribe(result => {
    console.log(result);
});

 

flatMap

flatMap sowie dessen Aliases mergeMap und selectMany dient wie map ebenfalls dazu, ein Observable zu manipulieren, akzeptiert jedoch eine Funktion, die ein Observable zurückliefert. FlatMap abonniert dann dieses Observable aus dem flatMap Block (auch inner Observable genannt) und liefert dem Beobachter dessen Ergebnis.

Hier das obige Beispiel noch einmal, nur mit FlatMap anstelle von Map.

Würde anstelle von Observable.of die Anweisung Observable.from([…]) verwendet, also würde ein Observable von einem Array erstellt werden und somit mehrere Ereignisse ausgeben, würde flatMap diese parallel abonnieren und alle ausgegebenen Ereignisse an die Beobachter weiterreichen. Der FlatMap-Operator nimmt als zweites Argument eine Zahl entgegen. Diese bestimmt, wie viele innere Observables zeitgleich abonniert werden sollen. Ist dieser Wert nicht gesetzt, wird der Standardwert von unendlich genommen. Wird der Wert 1 angegeben, ist der Operator identisch zu concatMap. In diesem Fall abonniert flatMap jeweils nur ein inneres Observable zur selben Zeit und nimmt sich erst dann das nächste vor, wenn das vorherige abgeschlossen ist. Dies ist besonders praktisch, wenn zum Beispiel mehrere HTTP-Anfragen gemacht werden müssen, die voneinander abhängen.

switchMap

switchMap funktioniert ähnlich wie flatMap. Es wird ebenfalls eine Funktion erwartet, welche ein Observable zurückgibt. Im Gegensatz zu flatMap leitet switchMap nicht alle Ereignisse der inneren Observables an den Beobachter weiter, sondern lediglich diejenigen, die vom zuletzt ausgegebenen äußeren Observable ausgegeben worden sind. Das folgende Beispiel zeigt dies: Ein IntervalObservable gibt in Abständen von 400 ms seinen Index aus und wird mit switchMap auf je ein Observable gemappt, welches 10 mal einen modifizierten String zurückliefert. Es wird künstlich verzögert um innerIndex * 100 ms. Nach je 400 ms werden die Abonnements der inneren Observables storniert, da das äußere Observable ein neues Ereignis ausgibt. Es werden also nur die inneren Observables beim Beobachter ankommen, die innerhalb der 400-Millisekunden-Grenze ausgegeben werden (außer bei den inneren Observables, die durch das zuletzt ausgegebene Ereignis des äußeren Observables erzeigt wurden).

forkJoin

Der forkJoin Operator ist vergleichbar mit Promise.all. Er akzeptiert als Argument entweder mehrere Observables oder ein Array von Observables. Forkjoin abonniert dann all diese übergebenen, inneren Observables parallel und wartet auf deren Abschluss. ForkJoin gibt ein Observable zurück, welches die Ergebnisse der inneren Observables als Array in der Reihenfolge enthält, wie die inneren Observables an den forkJoin Operator übergeben worden sind. Dieses Array enthält jedoch nur das zuletzt ausgegebene Ereignis der inneren Observables. ForkJoin eignet sich also nicht für Observables, die mehr als einen Wert ausgeben. Zu beachten: Löst ein inneres Observable einen Error aus, ohne dass dieser mittels .catch abgefangen wird, so bricht forkJoin die gesamte Operation ab und leitet den Fehler an die OnError-Funktion des ForkJoin-Beobachters weiter.

combineLatest

combineLatest akzeptiert als Argumente ebenfalls ein Array von Observables oder mehrere Observables. Dieser Operator abonniert alle übergebenen inneren Observables, und wartet darauf, dass jedes einmal einen Wert ausgibt. Ab diesem Moment kombiniert combineLatest die jeweils zuletzt ausgegebenen Werte der inneren Observables in einem Array, welches an den Beobachter von combineLatest ausgegeben wird. Wie das Kugeldiagramm in Abb. 1 zeigt, gibt combineLatest immer dann das kombinierte Array aus, wenn ein inneres Observable einen neuen Wert ausgibt. Mit diesem Operator können beispielsweise sehr praktisch verschiedene DOM Events kombiniert werden wie die x/y Koordinaten der Mausposition in Verbindung mit dem aktuellem pageYOffset.

Abbildung 1: combineLatest-Diagramm
Grafikquelle: reactivex.io/rxjs/class/es6/Observable.js~Observable.html

timer und interval

Observable.timer und Observable.interval sind Observable Wrapper für die nativen Javascript Funktionen setTimeout und setInterval.

Timer ermöglicht das Ausführen einer Aktion nach einem bestimmten Zeitraum oder zu einem definierten Zeitpunkt. Der Timer Operator akzeptiert drei Parameter: die initiale Verzögerung in Millisekunden oder ein Ausführungszeitpunkt als Date-Objekt. Nach Ablauf der Verzögerung oder bei Erreichen des Zeitpunktes gibt das Timer Observable ein Ereignis aus. Argument 2 bestimmt das Ausführungsintervall für alle nachfolgenden Ereignisse. Ist dieser Wert leer, so werden keine weiteren Ereignisse ausgegeben. Für den dritten Operator, der nur selten notwendig sein dürfte, verweise ich auf die offizielle Dokumentation.

Der Interval Operator gibt jeweils ein Ereignis aus nach dem Ablauf der gegebenen Zeit in Millisekunden. Der Interval Operator kann durch den Timer Operator ersetzt werden: Observable.interval(1000) = Observable.timer(1000,1000).

Abschließende Einschätzung

Ich bin Angular dankbar dafür, dass ich gezwungen worden bin, mich mit RxJS auseinanderzusetzen. Die in diesem Blogpost vorgestellten Beispiele zeigen nur die Basics und essentiellen Möglichkeiten, die man mit Observables so anstellen kann. Seine Stärken entfaltet die Library erst dann, wenn die verschiedenen Operatoren kombiniert werden. Solche Code-Anweisungen können dann zwar sehr lang werden, sind aber, wie es für das funktionale Programmierparadigma typisch ist, noch deutlich kürzer als komplett selbst geschriebener Code.

Angular bietet für HTML-Templates die Async-Pipe an. Mit ihr ist es möglich, Promises und vor allem auch Observables auszuwerten und dann die Ergebnisse so zu verwenden, als wären es synchrone Aufrufe aus der Komponente. Grundsätzlich wäre es also möglich, nahezu alle Datenstrukturen, die irgendwo verwendet werden, in Observables zu verpacken. Das ermöglicht nicht nur durchgehend die Nutzung der wirklich praktischen Observable-Operatoren, sondern hätte auch den Vorteil, dass sämtliche Funktionen asynchron ausgeführt werden würden.

Der Einstieg in RxJS ist zwar erst einmal schwer. Nicht zuletzt aufgrund der vielen Operatoren habe ich anfangs lieber weiterhin das altbewährte Promise verwendet. Inzwischen habe ich fast alle Funktionen, die ein Promise zurücklieferten auf Observables umgestellt. Wer einmal das Prinzip und die Vorteile erkannt hat, wird RxJS wohl nie wieder missen wollen.

Rufen Sie uns an: 0221 222 812 - 12
Kommen Sie vorbei!
lise GmbH
Butzweilerhof-Allee 2
50829 Köln