Saturday, May 9, 2020

RxJS a deep dive with Angular 8


What and Why RxJS?

Reactive programming was first implemented by Microsoft engineer Eric Meijer while working in C#, LINQ. LINQ in the .NET framework provides the first kind of reactive experience to developers. Later in 2012, it was open-sourced and now used by all big companies including github, Netflix, etc.

RxJS implements reactive extension for TypeScript and JavaScript. ReactiveX is a combination of the observer pattern, iterator pattern, and functional programming. check reactive.io for more details.

Below attached image to view RxJS world!

  • Observable pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically. In Angular, there are many scenarios where we have to deal with events, streams of events.
  • Iterator Pattern: Provide a way to access the elements of an aggregate object sequentially without exposing its underlying representation.
  • Functional Programming: Functional programming is a programming paradigm where we use a pure function, avoiding the shared state, and mutable data. Currying in JavaScript is one of the great examples of functional programming.

RxJS is an event-based and asynchronous program that uses the observable sequence. Observables can be subscribed by a method, changed by a set of methods, and can return an asynchronous set of streams which further changed by a set of methods.

You can see the word subscribe here, wait it looks like you are talking something like Promises?

Observing Universe depends upon time and space

Observable vs Promise:

In general, promises handle single events that fail or succeed. Though now there are many libraries which can make the promise more powerful.

Observable is a set of the stream that can handle no to many events. Observables are preferred over promises because they can be aborted, retry in case if it fails, and much more!

Example:

In the below code, we are creating one service ObservableDemoService which is implementing HttpClient. This service getting initial page info to load data. Method getinfo$ getting array stream of DemoInterface[] and in pipe handling error. In the component constructor injecting dependency of ObservableDemoService. Also creating an error subject errorMessageSubject to hold or show error in view. Now calling getinfo$ method of ObservableDemoService with pipe. Pipe takes a bunch of other methods. Pipe is very useful when we are performing some data messaging on the response, here we are initializing the data grid component.

Here we are creating promiseinAction which returns a resolve and reject promise when supplied true and false respectively. We are implementing ngOnInit method to call getPromise(true || false) method. We can see clearly here we have only two response success or failure.

Good News! Promises can be converted to Observables:

The good part is a promise can be converted to Observables when required!

RxJS operators that accept Observables can also accept a promise as well. You just need to change promise to observable. The best part is you don’t need to change all the code in the Angular app!

OR


Observable vs Subject

Subject is a special type of Observable. Observable is one of the base classes in RxJS and Subject is one of its derived types. Subjects implement observer design pattern which is required to set some value i.e. this.Subject.next(message). Observable always need a subscription and implement the observable pattern. Observable is always unidirectional meaning it flows from source to subscriber. Subject is bidirectional, information flow from the source to subscriber, and vice versa.

Types of Subject:

  1. BehaviourSubject: A BehaviourSubject stores the latest value and immediately sends it to all subscribers. i.e. I have created a message service that can be consumed from two Angular components and its bidirectional.

Component1:

Component2:

2. ReplaySubject: A replay subject stores n number of values and immediately send it to the subscriber. i.e. in the above data service if we change messageSource as ReplaySubject and pass 3 then it will cache the last three values on subscription.

3. AsyncSubject: Async subject waits until it completely and then sends the final value to the subscriber. The keyword here is the final value. If we add two values in the below example “Hello 1” and “Hello Again”, When we do asyncsub.complete() then only last value will be printed. The last value is the final value in our case.

4. ConnectableObservable: A wrapper that transforms observable behave like a Subject. It provides a .connect() method to transform. Very less likely you will be using it in Angular application.

observable.connect();


Managing Stream Greater Power !!

RxJS operators provide managing streams, combining the stream into a single stream. It also helps in calling APIs (set of observables) sequential and parallel. But the main problem is there are so many operators. Combining operators is always a challenge. RxJs provides a set of the wizard on their website for combining these operators. If the wrong operator is combined it will lead to race conditions. Also, all the subscriptions need to unsubscribe when components are out of scope, failing this will led to memory leaks. Memory leaks are really really scary especially when the application is running in production environments.

Combining Observables into one Stream:

  • mergeMap: Creates new observables for any given source. All previous streams/observable keep alive. There is no order in the returned observables, moreover, the order is not preserved. The best use case for mergeMap is when combining a click event with API calls.
  • concatMap: Similar to mergeMap but the order of observables is well preserved. Preserve the order and emits all observable value, works synchronously. Execute slowly because it works synchronously, waits for first observable to complete then only start new observables stream. The best use case is when you are calling an API which gives you id and that id is used in another API.
  • switchMap: Immediately creates new observables and completes the old observables. The best use case for switchMap is search auto-complete. Whenever the user starts typing a new keyword for search, a new observable is created and the old one is completed. Check combineLatest example below with mergeMap and switchMap.
  • flatMap: Immediately creates observables and previous observables are kept alive. fatmap is an alias of mergemap, mergeMap accepts an optional parameter concurrency, which defines how many Observables can be subscribed at the same time.
  • exhaustMap: Creates observable and waits for it until it complete. All other observable is ignored while waiting for the observable to complete. Best use case is to use login in the Angular app. Once the user clicks on login then wait until authentication is done! exhaustMap is just the opposite of switchMap. switchMap immediately creates a new observable and completes the old ones however exhaustMap first complete the initial observable and ignores the new ones.

Joining Observables into Array:

  • forkJoin: Calls all observable parallel. Returns all observables as an array once all call is completed. The best use case is when you want to call APIs that are not depends on each other. Multiple uploads are one of the scenarios where we can use forkJoin. Also, we can combine insert and update APIs in forkJoin based on conditions.
  • combineLatest: Begins when all observables fired at least once. Afterwards it fires when any of the event changes.

The best use case in an Angular application is combining a data table and dropdown events action. Dropdown represents the product with category and category ids and once selected we have to show product details based on category id. Now check the below code:

Once merged with combined with the latest whenever the user selects a product category dropdown data table data automatically filtered. No need to write any event in Angular component. This is sometimes called a reactive style of development.

Now we can use mergeMap while filtering the selectedCategory. If the user is clicks on the drop-down again and the previous request is not completed, then switchMap will cancel the previous request and start a new Observables.


With great power comes great responsibility! Memory Leaks are real!

I know you already heard this phase. But in the RxJS world, it’s very important to always unsubscribe to open observables and subjects. In Angular, implement ngOnDestroy and unsubscribe.

However many times it is difficult to remember which teardown method to unsubscribe.

How about if the subscription is automatically unsubscribed rather than doing it manually?

It is possible in frameworks like Angular if we are using async pipes. Async pipes automatically unsubscribe open observables and subject once a component is out of scope. Now we already created products$ in the above example which holds the product list coming from getting service. Using product$ | async pipe we can show the product details. Below is the view code snippet.

Level Up Coding

Coding tutorials and news.

Angular 8 Tutorial: Observable and RXJS Examples


Angular 8 Tutorial:  Observable and RXJS Examples

Learning Angular 8 Observable and RXJS with examples that might be useful for building Angular application

In this Angular 8 tutorial, we will show you how to learn or understand Angular 8 Observable and RxJS by a few examples. Maybe you already know that Observable and RxJS use with HttpClient. This time, we will show you the example of Angular Observable and RxJS with HttpClient, data exchange between components, Async pipe, Router, and Reactive Forms.

Table of Contents:

Angular Observable use as an interface to handle a variety of common asynchronous operations such as send observable data from child to parent component by defining custom events, handle AJAX or HTTP requests and responses, listen and respond user input in Angular Router and Forms. RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables that makes it easier to compose asynchronous or callback-based code. The following tools, frameworks, and libraries or modules are required for this tutorial.

  1. Node.js
  2. Angular 8
  3. Terminal (Linux/Mac) or Node Command Line (Windows)
  4. Text Editor or IDE (We use VSCode)


Preparation

To make all examples runnable, we need to prepare an Angular 8 application to implements all Angular Observable and RxJS. We are creating an Angular 8 app using Angular CLI which needs Node.js and NPM to install or update it. Make sure you have to install Node.js and NPM before installing or updating Angular CLI. To install or update an Angular CLI type this command.

sudo npm install -g @angular/cli

Next, create a new Angular 8 app using Angular CLI by type this command.

ng new angular-observable-rxjs

That command will create a new Angular 8 app with the name `angular-observable-rxjs` and pass all questions as default then the Angular CLI will automatically install the required NPM modules. After finished, go to the newly created Angular 8 folder then run the Angular 8 app for the first time.

cd ./angular-observable-rxjs
ng serve --open

Using that "--open" parameters will automatically open the Angular 8 in your default web browser. Here's the Angular 8 default page look like.

Angular 8 Tutorial:  Observable and RXJS Examples - Home Page


Basic Angular Observable and RxJS

In Angular Observable, there are a publisher and subscriber. The publisher can create an Observable instance that defines a subscriber function. The subscriber is receiving notification by executing the observable using subscribe() method and stop receiving the notification using the unsubscribe() method. To practice a Basic Angular Observable and RxJS we will use existing `src/app/app.component.ts`. Open and edit that file to implement these basic examples.

Observe Timer Example

This example demonstrates the basic usage model by showing how the RxJS timer works by subscribing it and stopped after unsubscribing it. Add this import of RxJS timer.

import { timer } from 'rxjs';

Add a constant variable after the imports that declare the RxJS timer.

const source = timer(1000, 2000);

Subscribe the RxJS timer then print out to Javascript console.

const subscribe = source.subscribe(val => console.log(val));

Add a function that will unsubscribe the RxJS timer.

setTimeout(() => { subscribe.unsubscribe(); }, 10000);

Now, you will see the timer print the increased steps and stopped after 10 seconds.

Angular 8 Tutorial:  Observable and RXJS Examples - Output Console

Basic Subscribing using Observer

This example shows an example of subscribing using Observer. Add this RxJS `of` operator.

import { of } from 'rxjs';

RxJS `of` operator used to emit a variable amount of values in a sequence and then emits a complete notification. Next, declare a constant variable before the Angular @Component that contains the sequence of observable string that will emit using RxJS `of` operator.

const myObservable = of('apple', 'orange', 'grappe');

Add a constant observer object variable after the above constant.

const myObserver = {
  next: (x: string) => console.log('Observer got a next value: ' + x),
  error: (err: string) => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

Inside the class, the body adds a constructor that will execute the observable by subscribing to the above observer object.

constructor() {
  myObservable.subscribe(myObserver);
}

That observable subscribing example will show output like this.

Observer got a next value: apple
app.component.ts:11 Observer got a next value: orange
app.component.ts:11 Observer got a next value: grappe
app.component.ts:13 Observer got a complete notification
core.js:38780 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
client:52 [WDS] Live Reloading enabled.

Observable with Constructor Example

We can use the observable constructor to create an observable stream of any type. The observable’s subscribe() executes with the constructor argument. The Observer object received by a subscriber function then publishes values using the observer's next() method. Add or modify the RxJS import to add Observer function.

import { Observable } from 'rxjs';

Add a function that runs the sequence of string synchronously then unsubscribe after values completed delivered.

function sequenceSubscriber(observer) {
  observer.next('Apple');
  observer.next('Orange');
  observer.next('Grappe');
  observer.complete();

  return {unsubscribe() {}};
}

Instantiate Observable that will deliver the above sequence of string.

const sequence = new Observable(sequenceSubscriber);

Execute the observable by print out the string sequence to the Javascript console.

sequence.subscribe({
  next(msg) { console.log(msg); },
  complete() { console.log('Finished sequence'); }
});

The Observable that Publishes Events Example

This example is taken from the official Angular docs with a little modification to work in `src/app/app.component.ts`. First, add an HTML input element to the existing `src/app/app.component.html`.

<div>
  <input type="text" id="yourname" placeholder="Type your name here"/>
</div>

Back to `src/app/app.component.ts`, modify this import to add Angular AfterViewInit lifecycle interface.

import { Component, AfterViewInit } from '@angular/core';

Implement that AfterViewInit in the main class.

export class AppComponent implements AfterViewInit {
...
}

Add a function that executes the Observable of the event from the target element.

fromEvent(target: HTMLInputElement, eventName: string) {
  return new Observable((observer) => {
    const handler = (e: unknown) => observer.next(e);

    target.addEventListener(eventName, handler);

    return () => {
      target.removeEventListener(eventName, handler);
    };
  });
}

Add ngAfterViewInit() function call the above function to create an observable that publishes key-down events.

ngAfterViewInit() {
  const ESC_KEY = 27;
  const nameInput = document.getElementById('yourname') as HTMLInputElement;
  this.fromEvent(nameInput, 'keydown')
  .subscribe((e: KeyboardEvent) => {
    if (e.keyCode === ESC_KEY) {
      nameInput.value = '';
    }
  });
}

Now, every ESC key push in the HTML input element, it will clear the input value.


Observable and RxJS with HttpClient

Almost our Angular tutorial involves REST API requests using Angular HttpClient, Observable, and RxJS. So, this example is a combination of these Angular features. This time we will put all in `src/app/app.component.html`, in the real world, you can separate the REST API access using their own services. First, open and edit `src/app/app.module.ts` then add this import of HttpClientModule that is part of @angular/common/http.

import { HttpClientModule } from '@angular/common/http';

Add it to @NgModule import array after the BrowserModule.

imports: [
  BrowserModule,
  HttpClientModule
],

Back to `src/app/app.component.ts` then add these imports of the required Angular HttpClient, RxJS, and Observable.

import { Observable, of, throwError } from 'rxjs';
import { HttpClient, HttpHeaders, HttpErrorResponse } from '@angular/common/http';
import { catchError, tap, map } from 'rxjs/operators';

Add these constants before the `@Component`.

const httpOptions = {
  headers: new HttpHeaders({'Content-Type': 'application/json'})
};
const apiUrl = 'http://localhost:3000/api/v1/products';

Inject `HttpClient` module to the constructor.

constructor(private http: HttpClient) { }

Add the error handler function that returns as an Observable.

private handleError<T> (operation = 'operation', result?: T) {
  return (error: any): Observable<T> => {

    // TODO: send the error to remote logging infrastructure
    console.error(error); // log to console instead

    // Let the app keep running by returning an empty result.
    return of(result as T);
  };
}

Add the functions to get data from the REST API and return the response as Observable. It also, extract or read data using the Pipe method and tap operator and catch the error by calling the error handler. RxJS pipe method is an Observable method to make data reading easier. RxJS Tap operator uses to perform a side effect for every emission on the source Observable but returns an Observable that is identical to the source.

getProducts(): Observable<any[]> {
  return this.http.get<any[]>(apiUrl)
    .pipe(
      tap(product => console.log('fetched products')),
      catchError(this.handleError('getProducts', []))
    );
}

getProduct(id: number): Observable<any> {
  const url = `${apiUrl}/${id}`;
  return this.http.get<any>(url).pipe(
    tap(_ => console.log(`fetched product id=${id}`)),
    catchError(this.handleError<any>(`getProduct id=${id}`))
  );
}

To run that function simply call the subscribe method and put the result to the variable that declared before the constructor.

data: any[] = [];

constructor(private http: HttpClient) {
  this.getProducts()
  .subscribe((res: any) => {
    this.data = res;
    console.log(this.data);
  }, err => {
    console.log(err);
  });
}


Data Exchange between Components

This example shows data exchange between components using service. Data emitted by service that subscribes by components. Component able to change data and data changes received by another component. For that, add a service and two components.

ng g service Shared
ng g component Acomp
ng g component Bcomp

Open and edit `src/app/shared.service.ts` then add this import of RxJS BehaviorSubject. BehaviorSubject is a Subject that requires an initial value and emits its current value to new subscribers.

import { BehaviorSubject } from 'rxjs';

Declare a variable before the constructor that instantiates BehaviorSubject with object data. Also, a variable that converts BehaviorSubject as Observable.

private dataSource = new BehaviorSubject({name: 'Maradona'});
currentData = this.dataSource.asObservable();

Add a function to change the data of BehaviorSubject.

changeData(data: any) {
  this.dataSource.next(data);
}

Next, open and edit `src/app/acomp/acomp.component.ts` then add these imports of Router and SharedService.

import { Router } from '@angular/router';
import { SharedService } from '../shared.service';

Inject that module to the constructor.

constructor(private router: Router, private sharedData: SharedService) { }

Declare a variable before the constructor to hold the received data.

data: any;

Subscribe the observable data from the service inside the NgOnInit function.

ngOnInit() {
  this.sharedData.currentData.subscribe(data => this.data = data);
}

Add a function to change the shared data then emit to its subscriber then navigate to the other component to see the data changes.

changeData() {
  this.sharedData.changeData({name: 'Eric Cantona'});
  this.router.navigate(['/bcomp']);
}

Next, open the `src/app/acomp/acomp.component.html` then replace all HTML tags with these tags.

<p>acomp works!</p>
<p>{{data.name}}</p>
<p>
  <button (click)="changeData()">Send Data</button>
</p>

Do a similar way for the other component, open and edit `src/app/bcomp/bcomp.component.ts` then replace all codes with these.

import { Component, OnInit, Output, EventEmitter } from '@angular/core';
import { Router } from '@angular/router';
import { SharedService } from '../shared.service';

@Component({
  selector: 'app-bcomp',
  templateUrl: './bcomp.component.html',
  styleUrls: ['./bcomp.component.scss']
})
export class BcompComponent implements OnInit {

  data: any;

  constructor(private router: Router, private sharedData: SharedService) { }

  ngOnInit() {
    this.sharedData.currentData.subscribe(data => this.data = data);
  }

  changeData() {
    this.sharedData.changeData({name: 'Romario Varia'});
    this.router.navigate(['/acomp']);
  }

}

Next, open and edit `src/app/bcomp/bcomp.component.html` then replace all HTML tags with these.

<p>acomp works!</p>
<p>{{data.name}}</p>
<p>
  <button (click)="changeData()">Send Data</button>
</p>


Observable and RxJS in Router

In this example, we will show you how to convert Router events as Observable. RxJS filter() use to look for events of interest and subscribe to them in order to make decisions based on the sequence of events in the navigation process. Still using the previously created components, open and edit `src/app/acomp/acomp.component.ts` then add or modify these imports of @angular/router Router, NavigationStart, RxJS filter, and Observable.

import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';
import { Observable } from 'rxjs';

Inject the constructor with Router and add to the constructor body the Router events that converted as Observable.

constructor(private router: Router) {
  this.navStart = router.events.pipe(
    filter(evt => evt instanceof NavigationStart)
  ) as Observable<NavigationStart>;
}

Add NavigationStart subscriber to NgOnInit function.

ngOnInit() {
  this.navStart.subscribe(evt => console.log('Navigation Started!'));
}

Next, the example of ActivateRoute that injected router service that makes use of observables to get information about a route path and parameters. Open and edit `src/app/bcomp/bcomp.component.ts` then add/modify these imports of @angular/router ActivatedRoute.

import { ActivatedRoute } from '@angular/router';

Inject above ActivatedRoute to the constructor.

constructor(private activatedRoute: ActivatedRoute) { }

Add ActivatedRoute URL subscribing to the NgOnInit function.

ngOnInit() {
  this.activatedRoute.url
    .subscribe(url => console.log('The URL changed to: ' + url));
}


Observable and RxJS in Reactive Forms

This example will show you how Observable detect the FormControl value changes. The Observable is used by the property in ReactiveForms. For that, we will register the ReactiveForm by open and edit `src/app/app.module.ts` then add/modify this import of ReactiveFormsModule, and FormModule.

import { FormsModule, ReactiveFormsModule } from '@angular/forms';

Register the above modules to `@NgModule` imports.

imports: [
  BrowserModule,
  HttpClientModule,
  FormsModule,
  ReactiveFormsModule,
  AppRoutingModule
],

Next, open and edit `src/app/app.component.ts` then add this import of @angular/forms FormGroup and FormControl.

import { FormGroup, FormControl } from '@angular/forms';

Add these variables of FormGroup and log for every FormControl value change.

inputChangeLog: string[] = [];
inputForm: FormGroup;

Add a function that detects the value changes of FormControl then saves to log array variable.

logInputChange() {
  const nameControl = this.inputForm.get('name');
  nameControl.valueChanges.forEach(
    (value: string) => {
      this.inputChangeLog.push(value);
      console.log(this.inputChangeLog);
    }
  );
}

Initialize the FormGroup and call log input changes function inside the NgOnInit function (don't forget to add the OnInit module to this component).

ngOnInit() {
  this.inputForm = new FormGroup({
    name: new FormControl()
  });
  this.logInputChange();
}

Next, open and edit `src/app/app.component.html` then add the FormGroup and InputControl to the main <div>.

<div class="content" role="main">

  <form [formGroup]="inputForm">
    <input type="text" placeholder="Your Name" formControlName="name" />
  </form>

</div>

Now, every FormControl changes will be displayed in the browser console.

That it's for now, it just a few examples of Angular Observable and RxJS. You can get the full source codes from our GitHub.

If you don’t want to waste your time design your own front-end or your budget to spend by hiring a web designer then Angular Templates is the best place to go. So, speed up your front-end web development with premium Angular templates. Choose your template for your front-end project here.