Loading...

RXJS - ReactiveX for JavaScript - Operators

What is RXJS

Async actions in JavaScript are common and error prune.
We query the server, set timers, get events and we get most of them asynchronously.
In JavaScript we use to deal with async actions using promises but since async actions are so common, hard to debug and easy to make a lot of bugs in them, it might help us to deal with async actions with a stronger tool.
While Promises lets us deal with async event, with RXJS we can deal with async data stream, meaning one or more (can be infinite) async events.
Moreover RXJS brings us a large toolset to solve common async problems very easily.
In this article we are going to focus on RXJS operators, what they are, how to use them, and what are the common operators we can use.

What is RXJS operator

In RXJS an operator either create a new data stream, or must of the operator manipulate a data stream to a different one. An operator usually returns an Observable, so this means you can chain operators together to make multiple transformation

        
            modifiedObservalbe = someObservable
                .operator1
                .operator2
        
    
Common operators

The goal of this article is to go over the common operators that are most used. So let's start experimenting...

combineLatest
  • Gets a list of Observables
  • This operator will return Observable
  • The Observable will emit when the list of observables he got all emitted atleast a single value
  • it will emit an array with the latest value from all the list of observable
        
            import {Subject} from "rxjs/Subject";
            import {combineLatest} from "rxjs/observable/combineLatest";
            import {Observable} from "rxjs/Observable";
        
            const subj1: Subject<string> = new Subject<string>();
            const subj2: Subject<number> = new Subject<number>();
            const combined: Observable<any> = combineLatest(subj1, subj2);
            combined.subscribe((arg) => {
                console.log('combined');
                console.log(arg);
            }, () => {
                console.log('error')
            });
        
            setTimeout(() => {
                console.log('first timer');
                subj1.next('hello');
            }, 1000);
        
            setTimeout(() => {
                console.log('second timer');
                subj2.next(1);
            }, 2000);
        
            setTimeout(() => {
                console.log('3rd timer');
                subj1.next('world');
            }, 3000);
        
            setTimeout(() => {
                console.log('4th timer');
                subj1.error(new Error('stam'));
            }, 3000);
        
    

We create here two Subjects (which inherit from observable)
we combine the subjects with combineLatest.
we set 3 timers one after a second one after 2 seconds and one after 3
The first timer emits a value for the first subject, at this point the combined is not emitting since the other observable didn't emit a vlue
The second timer emits a value for the second subject, at this point the combined has atleast one value from each observables in the list so it will emit the array of latest values ['hello', 1]
The third timer emits a value for the first subject, the combined will log: ['world', 1]
The fourth timer throws an error, if one of the observables return an error the combined will call the error method.

common usage can also be when we send multiple requests to server and we want an observable to emit after all requests got a response.

merge

This operator will combine multiple Observables into one. So if one of the observables emit a value the combined one will emit as well

Use case can be if querying the server with multiple requests and we want a combined observable when each one of the requests if resolved.

        
            import {merge} from "rxjs/observable/merge";
            import {Subject} from "rxjs/Subject";
            import {Observable} from "rxjs/Observable";
    
            const subOdd: Subject<number> = new Subject<number>();
            const subEven: Subject<number> = new Subject<number>();
    
            const mergedObservable: Observable<number> = merge(subEven, subOdd);
            mergedObservable.subscribe((value: number) => {
                console.log(value);
            }, () => {
                console.log('will error out if one of the observables errors')
            });
    
            subOdd.next(1);
            subOdd.next(3);
            subOdd.next(5);
    
            subEven.next(0);
            subEven.next(2);
            subEven.next(4);
    
            subOdd.error(new Error('Yet another error'));
        
    
concat

only when observable completes, it will start with the next observable

        
            import {concat} from "rxjs/observable/concat";
            import {Subject} from "rxjs/Subject";
            import {Observable} from "rxjs/Observable";

            const subOdd: Subject<number> = new Subject<number>();
            const subEven: Subject<number> = new Subject<number>();

            const mergedObservable: Observable<number> = concat(subEven, subOdd);
            mergedObservable.subscribe((value: number) => {
                console.log(value);
            }, () => {
                console.log('will error out if one of the observables errors')
            });

            subOdd.next(1); // will not print since subEven is not complete
            subOdd.next(3); // will not print since subEven is not complete
            subOdd.next(5); // will not print since subEven is not complete

            subEven.next(0); // will be printed
            subEven.next(2); // will be printed
            subEven.next(4); // will be printed
            subEven.complete();

            subOdd.next(7); // now it will be printed
            subOdd.next(9); // now it will be printed


            subOdd.error(new Error('Yet another error'));
        
    
from

This operator will turn array, promise or iterable into an observable

        
            import {from} from "rxjs/observable/from";
            import fetch from "isomorphic-fetch";
    
            // observable from promise
    
            from(fetch('https://nztodo.herokuapp.com/api/task/?format=json'))
                .subscribe((res) => {
                    console.log('got response from server');
                });
    
            from([1,2,3,4])
                .subscribe((value: number) => {
                    console.log(value);
                })
        
    

In the first example we are turning an ajax request to the server, that usually returns a promise, to an Observable.
The transformation from promise to observable is extremely easy

of

of is getting arguments and emits them by order.

        
            import {Observable} from "rxjs";
            import "rxjs/Observable/of";
        
            Observable.of(('first Value') as any, 2, ['third', 'value'])
                .subscribe((value: string | number | string[]) => {
                    console.log(value);
                })
        
    

We are casting the first value cause otherwise typescript will use type prediction and will assume that all value we send should be string.
the output will be: 'first value', 2, ['third', 'value']

throw

Creates an observable that will close with an error

    
        import {_throw} from "rxjs/observable/throw";

        _throw(new Error)
            .subscribe(() => {
                console.log("this won't be called");
            }, (err) => {
                console.log("this will be called first");
            }, () => {
                console.log("this won't be called");
            });
    
    
catchError

This will catch an observable with an error, and return an observable which calls the next with the error and then the complete. the operator gets the error and should return a new observable

    
        import {catchError} from "rxjs/operators";
        import {_throw} from "rxjs/observable/throw";
        import {Observable} from "rxjs/Observable";
        import {of} from "rxjs/observable/of";

        const errorObservalbe: Observable<any> = _throw(new Error('some error message'));

        errorObservalbe.pipe(
            catchError((error: Error) => of(error.message))
        ).subscribe(
            (message: string) => {
                console.log(message);
            },
            () => console.log('error wont be called'),
            () => console.log('complete will be called')
        )
    
    
filter

only emit the value if it meets a certain condition

The following example will create an observable that emit the numbers from 0 to 10. We will use the filter operator to only emit the even numbers

    
        import {Observable} from "rxjs/Observable";
        import 'rxjs/add/observable/range';
        import {filter} from "rxjs/operators";

        const counterObservable: Observable<number> = Observable.range(0,10);

        counterObservable.pipe(
            filter((num: number) => num % 2 === 0)
        ).subscribe((evenNum: number) => {
                console.log(evenNum);
            });
    
    
debounceTime

discard emitted values if a certain time didn't pass between the last input

popular in scenario where you have an event that happens frequent like user typing search in search input.
and you don't want ot query the server everytime only when the user stopped typing.

example we will emit values then wait and then emit again

    
        import {Subject} from "rxjs/Subject";
        import 'rxjs/add/operator/debounceTime';

        const sub: Subject<number> = new Subject<number>();

        sub.debounceTime(1000).subscribe((value: number) => {
            console.log('subscribe');
            console.log(value);
        });

        sub.next(1);
        sub.next(2);
        sub.next(3);

        setTimeout(() => {
            console.log('firsttimer');
            sub.next(4);
        }, 1100);

        sub.next(5);

        setTimeout(() => {
            console.log('secondtimer');
            sub.next(7);
        }, 2200);
    
    

every second our subscribe will pop and emit the last value of the subject.
the result will be: 5, 4, 7

distinctUntilChanged

only emits a value if it is different then the last one

    
        import {Subject} from "rxjs/Subject";
        import 'rxjs/add/operator/distinctUntilChanged';

        const sub: Subject<number> = new Subject<number>();

        sub.distinctUntilChanged().subscribe((val: number) => {
            console.log(val);
        });

        sub.next(0);
        sub.next(0);
        sub.next(1);
        sub.next(2);
        sub.next(2);
        sub.next(2);
        sub.next(3);

        // will output 0,1,2,3
    
    
map

Transforms the data in the observable to a different data

example we can transform an observable sending strings to and observable with numbers of the length of the strings

common usage is to query the server and map the response to class instances

    
    import {Subject} from "rxjs/Subject";
    import {map} from 'rxjs/operators'

    const sub: Subject<string> = new Subject<string>();

    sub.pipe(
        map((str: string) => str.length)
    ).subscribe((val: number) => {
        console.log(val);
    });

    sub.next("hello world");
    sub.next("foo bar");
    
    
mergeMap

given an observable, this operator will accept a function which gets the data of the observable, and returns a new observable or a promise

        
            import {mergeMap} from "rxjs/operators";
            import {Observable} from "rxjs/Observable";
            import "rxjs/add/observable/of";
    
            const helloObservable: Observable<string> = Observable.of('hello');
    
            helloObservable.pipe(
                mergeMap((message: string) => Observable.of('world'))
            ).subscribe((message: string) => {
                console.log(message);
            });
        
    
pluck

select a property to emit

    
        import {of} from "rxjs/observable/of";
        import {Observable} from "rxjs/Observable";
        import {pluck} from "rxjs/operators";

        const objObservable: Observable<any> = of({hello: 'world'});
        objObservable.pipe(
            pluck('hello')
        ).subscribe((val: any) => {
            console.log(val);
        });
    
    
tap

transparently perform action or side-effects when a stream emits value, or error or complete

        
            import {Subject} from "rxjs/Subject";
            import {tap} from "rxjs/operators";
        
            const sub: Subject<number> = new Subject<number>();
        
            sub.pipe(
                tap((val: number) => console.log(`saving value: ` + val))
            ).subscribe((originalValue: number) => {
                console.log(`original action: ` + originalValue);
            });
        
            sub.next(1);
            sub.next(2);
            sub.next(3);
        
    
delay

emits a value with a delay.

common usage is in tests if you want to mimic server response

    
        import {delay} from "rxjs/operators";
        import {Subject} from "rxjs/Subject";

        const sub: Subject<number> = new Subject<number>();

        sub.pipe(
            delay(500)
        ).subscribe((val: number) => {
            console.log(val);
        });

        sub.next(0);
        sub.next(1);
        sub.next(2);

        // after 500ms delay will emit all the values
    
    
finalize

Will be called when observable terminates on complete or error

    
    import {finalize} from "rxjs/operators";
    import {Subject} from "rxjs/Subject";

    const sub: Subject<number> = new Subject<number>();

    sub.pipe(
        finalize(() => {
            console.log('this will be called before subject completes')
        })
    ).subscribe();

    sub.next(0);
    sub.next(1);
    sub.next(2);
    sub.next(3);
    sub.complete();
    
    
Summary

We have a great arsenal to deal with async common actions using operators.
We tried to include examples in this article of the most commonly used operators.
Although RXJS is only at stage 0 currently, It will probably enter ES at some point, moreover since it's much more superior then the good old promises (plus it's super easy to transform promise to observable)
There is no reason why not to use RXJS to solve our async problems.