Project Reactor - Combining Multiple Flux and Mono Publishers

When using Project Reactor, you may need to combine the result of more than one publishers, either Mono or Flux. In this tutorial, I'm going to show you some built-in Reactor methods that can be used for it, including the differences between those methods and examples for each method.

Combining Flux Publilshers

For this tutorial, we are going to create these Flux publishers 

  Flux<Integer> numbers1 = Flux
      .range(1, 3);
  
  Flux<Integer> numbers2 = Flux
      .range(4, 2);

  Flux strings = Flux.fromIterable(new ArrayList(
      Arrays.asList("Woolha", "dot", "com")
  ));

Using concat

concat forwards elements emitted by the sources downstream. It subscribes to sources sequentially, waiting for a source to finish before continuing to the next source. The parameter is a varargs, so you can pass as many arguments as you want as long as you don't break Java method's 64K size limit.

  Flux.concat(numbers1, numbers2)
      .subscribe(System.out::println);

Output:

  1
  2
  3
  4
  5

Using concatWith

concatWith works like concat, but it is an instance method and only accepts one argument.

  numbers1.concatWith(numbers2)
      .subscribe(System.out::println);

Output:

  1
  2
  3
  4
  5

Using merge

merge subscribes to sources eagerly. It combines them into an interleaved merged sequence. To make it easy to get the difference between concat and merge, add delay between element emissions.

  Flux.merge(
      numbers1.delayElements(Duration.ofMillis(150L)),
      numbers2.delayElements(Duration.ofMillis(100L))
  )
      .subscribe(System.out::println);

Output:

  4
  1
  5
  2
  3

Using mergeSequential

Like concat, the result is an ordered sequence. However, it subscribes to sources eagerly.

  Flux.mergeSequential(
      numbers1.delayElements(Duration.ofMillis(150L)),
      numbers2.delayElements(Duration.ofMillis(100L))
  )
      .subscribe(System.out::println);

Output:

  1
  2
  3
  4
  5

Using mergeDelayError

mergeDelayError is a variation of merge. If an error occurs, the error will be delayed until all sources have been processed.

  Flux.mergeDelayError(1,
      numbers1.delayElements(Duration.ofMillis(150L)),
      numbers2.delayElements(Duration.ofMillis(100L))
  )
      .subscribe(System.out::println);

Output:

  4
  1
  5
  2
  3

Using mergeWith

Similar to merge, but this is an instance method and only accepts one argument.

  numbers1.delayElements(Duration.ofMillis(150L)).mergeWith(numbers2.delayElements(Duration.ofMillis(100L)))
      .subscribe(System.out::println);

Output:

  4
  1
  5
  2
  3

Using mergeOrderedWith

Similar to mergeWith, but you can pass a comparator for sorting the result.

 
 numbers1.delayElements(Duration.ofMillis(150L)).mergeOrderedWith(
      numbers2.delayElements(Duration.ofMillis(100L)),
      comparator
  )
      .subscribe(System.out::println);

Using zip

It zips multiple sources by waiting for each source to at least emit one element. It produces combinations from the source, where each combination must have result from all sources.

  Flux.zip(numbers1, numbers2).subscribe(System.out::println);

Output:

  [1,4]
  [2,5]
  Flux.zip(
      numbers1,
      numbers2,
      (a, b) -> a + b
  )
      .subscribe(System.out::println);

Output:

  5
  7

The code above sums the output of the first and second sources. The first source has three elements, while the second has two elements. Therefore, it only produces two combinations as the third element from the first source doesn't have the pair from second source to make a complete combination.

If you need to combine more than two sources, look at the example below. The result is a Tuple (either Tuple, Tuple2, ..., Tuple6) depending on the number of publishers. You can use tuple.getTn() (replace n with the index which starts from 1) to get the result of n-th publisher.

  Flux.zip(
      numbers1,
      numbers2,
      strings
  )
      .flatMap(tuple -> {
            System.out.println(tuple.getT1());
            System.out.println(tuple.getT2());
            System.out.println(tuple.getT3());
            System.out.println("-----------");
  
            return Mono.empty();
      })
      .subscribe();

Output:

For combining two sources, you can pass a BiFunction as the third argument.

  1
  4
  Woolha
  -----------
  2
  5
  dot
  -----------

The above way only supports up to six sources. To combine more than six sources, you can pass an Iterable instead.

  ArrayList<Flux<Integer>> publishers = new ArrayList<>();
  publishers.add(numbers1);
  publishers.add(numbers2);
  publishers.add(numbers1);
  
  Flux.zip(
          publishers,
          arr -> {
  
              System.out.println(arr[0]);
              System.out.println(arr[1]);
              System.out.println(arr[2]);
              System.out.println("-----------");
  
              return Mono.empty();
          }
  ).subscribe();

Output:

  1
  4
  1
  -----------
  2
  5
  2
  -----------

Using zipWith

Similar to zip, but this is an instance method and only supports one source to be zipped with.

  numbers1.zipWith(numbers2, (a, b) -> a * b)
      .subscribe(System.out::println);

Output:

  5
  7

Using combineLatest

combineLatest is used to combine with the latest data from sources. If there are two sources, you can use a BiFunction like below.

  Flux.combineLatest(
      numbers2,
      numbers1,
      (a, b) -> a + b
  )
      .subscribe(System.out::println);

Output:

  6
  7
  8

First, it waits until the first source emits all values. Then the last element from the first source is combined with the elements from the second source.

If there are more than two sources, you need to pass a Function that accepts one parameter of type Array.

  Flux.combineLatest(
      numbers1,
      numbers2,
      numbers1,
      (arr) -> (int) arr[0] + (int) arr[1] + (int) arr[2]
  )
      .subscribe(System.out::println);

Output:

  9
  10
  11

It uses the latest value from the first and second sources, then using those values to be combined with each value from the third source.

The above way supports up to six sources. If you need more than that, you can pass an Iterablle instead.

  ArrayList<Flux<Integer>> publishers = new ArrayList<>();
  publishers.add(numbers1);
  publishers.add(numbers2);
  
  Flux.combineLatest(
      publishers,
      (arr) -> (int) arr[0] + (int) arr[1]
  )

Output:

  7
  8

 

Combining Mono Publishers

  Mono<String> firstMono = Mono.just("one");
  Mono<String> secondMono = Mono.just("two");
  Mono<Integer> thirdMono = Mono.just(3);

Using concatWith

You can concat a Mono with another Mono using concatWith instance method. The result is a Flux.

  firstMono.concatWith(secondMono)
      .subscribe(System.out::println);

Output:

  one
  two

Using mergeWith

This instance method is used to combine two Mono publishers and the elements may be interleaved. The result is a Flux.

  firstMono.mergeWith(secondMono)
      .subscribe(System.out::println);

Output:

  one
  two

To see the difference between mergeWith and concatWith, you can add delay to the Mono.

  firstMono.delayElement(Duration.ofSeconds(5)).mergeWith(secondMono)
      .subscribe(System.out::println);

Output:

  two
  one

Using zip

It is used to aggregate sources by waiting all Mono publishers to have emitted the resullt. If you only pass two sources, you can use a BiFunction to combine the result.

  Mono.zip(firstMono, secondMono, (a, b) -> a + b)
      .subscribe();

Output:

  onetwo

Below is an example how to combine more than two sources. The result is a Tuple (either Tuple, Tuple2, ..., Tuple6) depending on the number of publishers. You can use tuple.getTn() (replace n with the index which starts from 1) to get the result of n-th publisher.

  Mono.zip(
      firstMono,
      secondMono,
      thirdMono
  )
      .flatMap(tuple -> {
  
          System.out.println(tuple.getT1());
          System.out.println(tuple.getT2());
          System.out.println(tuple.getT3());
  
          return Mono.empty();
      })
      .subscribe();

Output:

  one
  two
  3

You can have up to six sources with the above way. If you need to combine more sources, you can pass an Iterable instead.

  ArrayList<Mono<String>> publishers = new ArrayList<>();
  publishers.add(firstMono);
  publishers.add(secondMono);
  
  Mono.zip(
      publishers,
      arr -> {
          System.out.println(arr[0]);
          System.out.println(arr[1]);
  
          return Mono.empty();
      }
  ).subscribe();

Output:

  one
  two

Using zipDelayError

Similar to zip, but if an error occurs it will be delayed until all sources have been processed.

The usage is also similar, except it doesn't support combining two sources with BiFunction as the third parameter (the first Mono.zip example).