Understanding Top Core Essentials in RXJS
In this article, we are going to talk about the basics of Reactive programming. RxJS is a library for composing asynchronous and event-based programs by using observable sequences.
Core Essentials in RXJS
- Observables: represents the idea of an invokable collection of future values or events. Observables are pretty useful and are used to handle the asynchronous operations in RxJS.
- Subjects: is the equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.
- Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.
- Schedulers: are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.
We will be taking a deep look into this core essentials in RXjs.
Observables:
Observables represent the idea of an invokable collection of future values or events. Observables are pretty useful and are used to handle the asynchronous operations in RxJS.Observables have 4 lifecycles which are:
- Creation
- Subscription
- Execution
- Destruction
Creation
For creating an Observable, first import Observable from RxJS.
import { Observable } from "rxjs";
let observable = Observable.create((observer:any) => {
observer.next('Intro To RXjs')
})
Subscription
To call the observable to begin execution, use the subscription method:
observable.subscribe(logMessage(message:any) => {
console.log(message);
})
The subscribe function will log Intro To RXjs in the console.
Execution
Observers are responsible for executing instructions in Observable so that each subscriber who subscribes can provide 3 values to Observable.This 3 value are:
- Next value: Observers can pass a value that can be an integer, a string, or an object. There may be more than one next message on a particular observable.
- Error value: The observer sends a JavaScript exception. If there is an error in the observable, nothing else can be delivered to the observable.
- Complete value: Here the observer sends no value. This usually signals that the subscriptions for the observable have been completed. If the complete value is sent, nothing else can be delivered to the observable.
Example:
//index.ts file
//create the observable
import { Observable, fromEvent } from 'rxjs';
let observable = new Observable((observer: any) => {
observer.next('My');
observer.next('My name ');
observer.next('My name is ');
observer.next('My name is Wisdom');
setInterval(() => {
observer.next('Please keep on calling me');
}, 1000);
});
let observer = observable.subscribe((x: string) => console.log(x));
Here the observer in the setInterval function will be called every 1 second.
Destruction
To stop the process of the observable we can use the unsubscribe method.
setTimeout(() => {
observer.unsubscribe();
}, 1001);
Subjects
An RxJS Subject is a special type of Observable that allows multicasting to multiple Observers.Basically a Subject is a type of observable.
Creating Subjects
A Subject is in double nature. Every Subject is an Observable and also, every Subject is an Observer. Just like the Observer, Subjects have next(value), error(err), and complete(). The only difference is the value next(value) is multicasted. You can pass the Subject as an argument to observable$
.
import { Subject } from "rxjs";
const subject = new Subject<Number>();
subject.subscribe({
next: value => {
console.log("First Subject: " + value);
}
});
subject.subscribe({
next: value => {
console.log("Second Subject: " + value);
}
});
subject.next(0.44);
subject.next(Math.random());
BehaviorSubject in RxJS
BehaviorSubject
stores the latest value emitted to subscribers. And whenever a new Observer subscribes, it immediately receives the stored last value from the BehaviorSubject.There represents a value that changes over time. Note that you have to set an initial value while creating a BehaviorSubject.
import { BehaviorSubject } from 'rxjs';
// Initialized with initial value 0
const subject = new BehaviorSubject<Number>(0);
subject.subscribe((data) => {
console.log('ObserverA: ' + data);
});
subject.next(1);
subject.next(2);
subject.subscribe((data) => {
console.log('ObserverB: ' + data);
});
subject.next(3);
console.log('subject.getValue(): ' + subject.getValue());
Schedulers
Rxjs schedulers provide an abstraction that allows work to be scheduled to run, possibly in the future, without the calling code needing to be aware of the mechanism used to schedule the work. Whenever an Rx method needs to generate a notification, it schedules the work on a scheduler. By supplying a scheduler to the Rx method instead of using the default, you can subtly control how those notifications are sent out.
Types of Schedulers
- immediateScheduler – Runs the work synchronously and immediately. Sort of like not using a scheduler at all. Work scheduled thus is guaranteed to run synchronously.
Example:
console.log('Before subscription');
RX.Observable
.range(1, 5)
.do((a: any) => {
console.log('Processing value of', a);
})
.map((value: any) => {
return value * value;
})
.subscribe((value: any) => {
console.log('Emitted', value);
});
console.log('After subscription');
The immediate Scheduler is very well suited for Observables that execute predictable and not-very-expensive operations in each notification
- currentThreadScheduler – Similar to
immediateScheduler
in that, the work is run immediately. However, it does not run work recursively. So, if the work is running and schedules more work, then that additional work is put into a queue to be run after the current work finishes. Thus work sometimes runs synchronously and sometimes asynchronously. This scheduler is useful to avoid stack overflows or infinite recursion.
var scheduler = Rx.Scheduler.currentThread;
Rx.Observable.return(10, scheduler).repeat().take(1).subscribe((value: any) => {
console.log(value);
});
The currentThread Scheduler is useful for operations that involve recursive operators like repeat, and in general for iterations that contain nested operators
- The default Scheduler runs actions asynchronously. You can think of it as a rough equivalent of setTimeout with zero milliseconds delay that keeps the order in the sequence. It uses the most efficient asynchronous implementation available on the platform it runs (for example, process.nextTick in Node.js or setTimeout in the browser).
console.log('Before subscription');
Rx.Observable
.range(1, 5)
.do((value: any) => {
console.log('Processing value', value);
})
.observeOn(Rx.Scheduler.default)
.map((value: any) => {
return value * value;
})
.subscribe((value: any) => {
console.log('Emitted', value);
});
console.log('After subscription');
The default Scheduler never blocks the event loop, so it’s ideal for operations that involve time, like asynchronous requests.
Operators
Operators are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc. We will see some examples of some operators.
Map() Operator
- This is used to manipulate the data from observable return values in observable fashion.
import { map } from 'rxjs/operators';
const data = of('Wisdom Ekpot');
data.pipe(map((x) => x.toUpperCase())).subscribe((d) => {
console.log(d);
});
This will iterate over the string and Convert all the letters to uppercase.
Switch Map Operator:
This cancels one observable and switches to another one.
const reqPosts = this.function1();
const reqUsers = this.function2();
const reqPostsUser = reqPosts.pipe(
switchMap(posts => {
return reqUsers.pipe(tap(users => {
console.log('Posts List ', posts);
console.log('User List ', users);
}));
})
First() operator
No matter how many times an action is called,it will serve the first request only.
const eventSource = fromEvent(document, 'click');
eventSource.pipe(first()).subscribe(() => {
console.log('clicked ', this.count);
this.count++;
});
Takewhile() operator:
This Handles observations in a conditional manner.
const eventSource = fromEvent(document, 'click');
eventSource.pipe(takeWhile(() => this.count < 3)).subscribe(() => {
console.log('clicked ', this.count);
this.count++;
});
Conclusion
Observables provide a bunch of operators for creating, transforming, filtering, and multicasting asynchronous events which have made reactive programming very easy unlike trying to use vanilla javascript to achieve it.