This tutorial shows you how to combine multiple Observables in RxDart.
Having multiple Observables running at the same time makes it possible to run tasks simultaneously with ease. If you have some Observables and you need to combine the results, there are some methods you can use. You should choose the method depending on the expected behavior. This page explains the methods you can use for combining Observables along with the examples.
For this tutorial, we are going to use the following Observables which are created using fromIterable
factory method. But you can also create the Observables in other ways.
Observable<String> o1 = Observable.fromIterable(['a1', 'b1', 'c1', 'd1']);
Observable<String> o2 = Observable.fromIterable(['a2', 'b2', 'c2']);
Observable<String> o3 = Observable.fromIterable(['a3', 'b3', 'c3', 'd3']);
Using concat
, concatWith
To concatenate multiple streams, you can use concat
factory method. It works by waiting for a stream to emit all items before subscribing to the next one. You need to pass the streams (Iterable<Stream<T>> streams
) to be combined as the only argument.
Observable.concat([o1, o2, o3])
.listen(print);
Output:
a1
b1
c1
d1
a2
b2
c2
a3
b3
c3
d3
You can also concatenate an Observable instance with multiple streams. Below is the example for the instance method concatWith
.
o1.concatWith([o2, o3])
.listen(print);
The output should be the same as the output of using factory method.
Using merge
, mergeWith
The factory method merge
flattens the items from multiple streams into one Observable. It accepts Iterable<Stream<T>>
as the parameter. Unlike concat
, it doesn't wait for one stream to finish before subscribing to the next one. It subscribes to multiple streams at the same time and the order of items depends on the emitted time.
To make it easier to understand, the streams on the below example add delay between element emissions. Example:
Observable<String> o1 = Observable.fromIterable(['a1', 'b1', 'c1', 'd1'])
.concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 50)));
Observable<String> o2 = Observable.fromIterable(['a2', 'b2', 'c2'])
.concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 80)));
Observable<String> o3 = Observable.fromIterable(['a3', 'b3', 'c3', 'd3'])
.concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 190)));
Observable.merge([o1, o2, o3])
.listen(print);
Output:
a1
a2
b1
b2
c1
a3
d1
c2
b3
c3
d3
At 50ms, a1
is emitted. But before b1
emitted at 100ms, a2
is emitted at 80ms. Therefore, a2
printed before b1
.
The method is also available for instance as mergeWith
whose parameter is an Iterable<Stream<T>>
as well.
o1.mergeWith([o2, o3])
.listen(print)
Using zip
, zipWith
zip
is used for merging multiple streams into one by using zipper function when every stream has produced the element at a certain index. So, in order for the zipper function to produce element at index i
, all the sequences must have produced the element at index i
. That means the number of produced elements is the number of elements produced by the stream with least elements.
To use zip
, pass Iterable<Stream<T>> streams
as the first argument and a zipper function as the second argument. The zipper function itself has one parameter (List<T> values)
which is the values from all streams at a certain index as a List
.
Observable.zip(
[
o1,
o2,
o3,
],
(values) => values
.map((value) => value.toString())
.reduce((acc, value) => acc + value),
)
.listen(print);
Output:
a1a2a3
b1b2b3
c1c2c3
You can also use instance method zipWith
which only accepts a stream (Stream<S>)
as the first argument. The zipper
function is a bit different as it requires two parameters. The first one is the value from the Observable where zipWith
is applied on, while the other is the value from the stream passed as the first argument of zipWith
.
o1.zipWith(o2, (x, y) => x + y)
.listen(print);
a1a2
b1b2
c1c2
Other variants of zip
have number suffix, such as zip2
, zip3
, ..., up to zip9
. The number represents the number of streams to be combined. zip2
is for combining two streams, zip3
is for combining three streams, and so on. Each method has different number of parameters, including the number of parameters for the zipper function, as you can see on the below examples.
Observable.zip2(o1, o2, (v1, v2) => v1 + v2)
.listen(print);
Output:
a1a2
b1b2
c1c2
Observable.zip3(o1, o2, o3, (v1, v2, v3) => v1 + v2 + v3)
.listen(print);
Output:
a1a2a3
b1b2b3
c1c2c3
Using combineLatest
This is a factory method that merges multiple streams into an Observable every time one of the streams emits an item. The first parameter is the streams (Iterable<Stream<T>> streams
). The second parameter is a combiner function which takes one argument (List<T> values
). To make it easier to understand, see the below example.
Observable<String> o1 = Observable.fromIterable(['a1', 'b1', 'c1', 'd1'])
.concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 50)));
Observable<String> o2 = Observable.fromIterable(['a2', 'b2', 'c2'])
.concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 80)));
Observable<String> o3 = Observable.fromIterable(['a3', 'b3', 'c3', 'd3'])
.concatMap((i) => Observable.just(i).delay(Duration(milliseconds: 190)));
Observable.combineLatest([o1, o2, o3], (values) => values.join())
.listen(print);
Output:
c1b2a3
d1b2a3
d1c2a3
d1c2b3
d1c2c3
d1c2d3
To emit the first combination, it has to wait until all of the streams emit at least one element. In the above case, it happens at 190ms. At that time, the latest element emitted by o1
, o2
, and o3
are c1
, b2
, and a3
respectively. At 200ms, o1
emits d1
. At 240ms, o2
emits c2
and so on.
Using startWith
, startWithMany
startWith
is used to prepend a value at the beginning of the source, while startWithMany
is for prepending multiple values. Both are instance methods which accepts one parameter, a value (T startValue
) or an array of values (List<T> startValues
) respectively.
o1.startWith('start1')
.listen(print);
Output:
start1
a1
b1
c1
d1
o1.startWithMany(['start1-1', 'start1-2'])
.listen(print);
Output:
start1-1
start1-2
a1
b1
c1
d1
That's how to combine Observables in RxDart.