If you have an Observable emitting multiple items and you want to map each item by using a function, there are some operators in RxDart that allows you to do so, which includes map
, asyncMap
, flatMap
, flatMapIterable
, concatMap
, switch-map
, and exhaustMap
. If you don't know which one you should use, this tutorial explains the differences between those operators and what cases are suitable for each operator.
For this tutorial, we are going to use the following Observable.
Observable<int> o1 = Observable.fromIterable([1, 2, 3, 4]);
Then, use the above operators on the Observable to understand the behavior of each operator. The function passed on the operators does the same thing: multiple each value by 2. Below are the explanations of each operator along with the examples.
Using map
map
is used for mapping a list of items using a function that returns a new value for each item. While most operators (other than map
and asyncMap
) require the function to return a new Observable (Stream), the function passed on map
should return the mapped value instead of an Observable.
o1.map((value) {
return value * 2;
})
Output:
2
4
6
8
map
is suitable for cases where each item can be processed synchronously. Therefore, it's suitable if you only need to do quick non-asynchronous things such as converting the item type or modifying the value without the need of processes that may take a 'long' time to complete, such as API call or database access, which should be done asynchronously.
Using asyncMap
asyncMap
maps items into a new Stream like map
, but the mapper function can return a Future. In that case, it waits until the Future completes before continuing to the next item. Since it processes items one by one and it waits until the Stream completes, the order of items emitted by the mapper function should be the same as the order from the initial source Stream.
o1.asyncMap((i) => new Future.delayed(new Duration(seconds: 1), () => i * 2))
.listen(print);
Output:
2
4
6
8
asyncMap
is useful if the mapper function needs to return a Future. However, it can't process the items concurrently as it has to wait for each Future to complete before continuing to the next item.
Using flatMap
flatMap
can process multiple items concurrently. Therefore, the order is not guaranteed to be the same as the order in the initial Stream.
o1.flatMap((i) => new Observable.timer(i * 2, new Duration(seconds: 1)))
.listen(print);
Output:
2
4
6
8
Try to run the code above and you'll find out that all items are emitted at almost the same time because flatMap
processes the items in parallel.
If you have something that's need to be performed asynchronously on each item and the order is not important, this is the right operator to use.
Using flatMapIterable
It's similar to flatMap
, but the mapper function returns an Iterable.
o1.flatMapIterable((i) => new Observable.timer([i, i * 2], new Duration(seconds: 1)))
.listen(print);
Output:
1
2
2
4
3
6
4
8
Using concatMap
concatMap
subscribes to a Stream one by one, waiting for a Stream emits all its values before subscribing to the next one. Therefore, unlike flatMap
, it ensures that the order of the emitted items, similar to Dart Stream's asyncExpand
.
o1.concatMap((i) => new Observable.timer(i * 2, new Duration(seconds: 1)))
.listen(print);
Output:
2
4
6
8
If you run the code above and wait for the result, you will get one-second delay between each item. That's because concatMap
can only subscribe to a Stream at a time and it has to wait for the Stream to complete before switching to the next Stream.
If the order of emitted items is important, it's appropriate to use concatMap
. However, it may cause the whole process takes longer time to finish.
Using switchMap
Like the others, switchMap
maps items from a Stream into a new Stream by using a mapper function. The difference with the other operator is switchMap
only listens to the latest created Stream, while the previously created Streams will stop emitting.
o1.switchMap((i) => new Observable.timer(i * 2, new Duration(seconds: 1)))
.listen(print);
Output:
8
As you can see, it only prints the result from the latest Stream whose initial value is 4 and mapped value is 8.
If you only need to get the latest state, this is can be a suitable operator to use.
Using exhaustMap
exhaustMap can be used to control the emission frequency. If an item is being processed, it will ignore other items emitted by the source until the current item completely processed.
To make it more understandable, a 500ms delay is added between element emission from the source and a 750ms delay is added in the mapper.
o1.interval(new Duration(milliseconds: 500))
.exhaustMap((i) {
print(i);
return new Observable.timer(i, new Duration(milliseconds: 750));
})
.listen((e) { print('${DateTime.now()} $e'); });
Output:
1
2019-12-14 16:43:45.367071 1
3
2019-12-14 16:43:46.369698 3
In the above example, the second element is emitted when the map function is still processing the first element and thus the second element is ignored. It's useful for cases where you need to only process a source once in a certain time period.
That's how to map items from a source Stream into a new Stream
in RxDart. Choosing the right operator to use is important to get the expected result.