Reactive programming: #6 The Observable Monad
Description:
This is the sixth part of the series on reactive programming.
Before facing your resolution, you must have completed the previous katas:
- #1 Create function
- #2 Basic observables creation
- #3 Premier operators
- #4 Pipelining operators
- #5 Observable juggling
In this kata we are going to deal with the M word. Yes, observables are also monads. But do not be scary. It is easier than it seems to be.
Suppose we want to implement an operator that receives a function that receives a value and returns an observable. This is the function part:
f: (x: T) => Observable<R>
The operator will also receive an observable and it will apply the function to each value issued by the observable.
Why would we want something like that?
Let's take an example:
Suppose you want to implement the repeat
operator. To do this, you could start from scratch, or you could define repeat from other operators.
repeat
receives a number and an observable and repeats one emission after another as many times as indicated.
function repeat<T>(times: number): Operator<T, T> {
return (observable: Observable<T>): Observable<T> => {
.
}
}
At first you might think that you could define repeat
like this:
const repeat = times => observable => of(Array(times).fill(null)).pipe(map(_ => concat(observable)));
That is, we generate as many values as times
param and replace each value with the observable
param and then concatenate the observables.
This looks good, but we have a problem. The map
operator preserves the structure (returns an observable) and the function it receives returns observables too, so we will end with an observable whose values are observables as well.
We can deal with this by subscribing to the outer observable, then subscribe to the inner observables. Of course we would have to deal with the unsubscriptions and with the complete and next states of all the observables. It seems a mess, right?
Besides, implementing repeat
in this way we lose the possibility of composing it with pipe
operator. This is not going to work:
// v does not match because is an observable of numbers, it is not a simple number
interval(500).pipe(take(3), repeat(4), map(v => v * 2));
This is what we are talking about when we say that a structure is a monad: A monad is a structure for which there is a function that allows us to compose functions that receive a simple value and return a value within the structure.
In this kata we are going to deal with observables as a monads.
concatMap operator
We can define concapMap
operator like this:
function concatMap<T, R>(f: (x: T) => Observable<R>): Operator<T, R> {
return (o: Observable<T>) => {
.
}
}
An example of use could be the repeat
function that we have proposed before.
function repeat<T>(times: number): Operator<T, T> {
return (observable: Observable<T>): Observable<T> => {
return of(Array(times).fill(null)).pipe(concatMap(_ => observable));
};
}
Now you can compose repeat
operator:
interval(500).pipe(take(3), repeat(4), map(v => v * 2));
Now your job is to implement concatMap
.
mergeMap operator
mergeMap
has the same definition as concatMap
. The behavior differs in that the inner observables do not wait for the completion to begin issuing and they do so as they are available.
function mergeMap<T, R>(f: (x: T) => Observable<R>): Operator<T, R> {
return (o: Observable<T>) => {
.
}
}
To see the difference between concatMap
and mergeMap
, let's suppose that we impliment repeat
with mergeMap
in this way:
function repeat<T>(times: number): Operator<T, T> {
return (observable: Observable<T>): Observable<T> => {
return of(Array(times).fill(null)).pipe(mergeMap(_ => observable));
};
}
Let's see the output of the following code:
const source = interval(2).pipe(take(3));
const example = repeat(3)(source);
// output: 0,0,0,1,1,1,2,2,2
example.subscribe({
onNext: val => conssole.log(val)
});
The implementation with concatMap
would have produced a very different result:
// output: 0,1,2,0,1,2,0,1,2
switchMap operator
And finally we get to the last operator of the kata and the series: switchMap
is one of my favorite operators because it allows us to simplify a task that is usually tedious as cancellations are.
The canonical example that is usually used to explain the problem of cancellations is an autocomplete search. How to ensure that as you type, previous searches that have been sent to the server are not displayed? This is a complex challenge since we can not be sure that the order in which the server resolves requests is preserved. switchMap
and other cancellation operators are perfect for these situations.
This is the switchMap
definition:
function switchMap<T, R>(f: (x: T) => Observable<R>): Operator<T, R> {
return (o: Observable<T>) => {
.
}
}
With switchMap
, every time the external observable emits a new value, the current internal observable emission is canceled and a new inner observable is created with that value and the function, this new inner observable starts to emit again.
For example:
const outer = interval(50).pipe(take(3));
const inner = (x: number) => {
const interval$ = interval(20).pipe(
map(y => [x, y]),
take(4)
);
return interval$;
};
const example: Observable<[number, number]> = outer.pipe(
switchMap(inner)
);
// output: [0, 0] [0, 1] [1, 0] [1, 1] [2, 0] [2, 1] [2, 2] [2, 3]
// Note that [0, 2], [0, 3] are not emitted because the inner observable has been canceled by the arrival of a new value of the outer observable, which in this case has a value of 1.
// [1, 2], [1, 3] are caceled by the 2 outer value.
// However, [2, 0] [2, 1] [2, 2] [2, 3] are all emitted because the outer observable no longer emits more values after 2.
example.subscribe({
onNext: val => console.log(val),
});
Here conclude this series on reactive programming. I hope you have enjoyed doing these katas.
Similar Kata:
Stats:
Created | Mar 21, 2019 |
Published | Mar 21, 2019 |
Warriors Trained | 74 |
Total Skips | 9 |
Total Code Submissions | 19 |
Total Times Completed | 6 |
TypeScript Completions | 6 |
Total Stars | 5 |
% of votes with a positive feedback rating | 75% of 2 |
Total "Very Satisfied" Votes | 1 |
Total "Somewhat Satisfied" Votes | 1 |
Total "Not Satisfied" Votes | 0 |
Total Rank Assessments | 1 |
Average Assessed Rank | 5 kyu |
Highest Assessed Rank | 5 kyu |
Lowest Assessed Rank | 5 kyu |