Project Reactor - Processing Flux in Parallel

This tutorial shows you how to process data from a Flux in parallel including how to control concurrency, using ParallelFlux and flatMap.

Flux is a reactive data type in Reactor containing 0..N values. Sometimes we may need to process the values in parallel. By default, Reactive flow is sequential. In order to make it parallel, you can convert it to ParallelFlux. Alternatively, you can use flatMap. This tutorial gives you examples how to process elements emitted by Flux in parallel. If you have a Mono<List<T>>, you need to convert it to Flux<T> first.

Using ParallelFlux

By converting a Flux to ParallelFlux, the data is divided into multiple 'rails' using round-robin algorithm. To perform parallel execution using ParallelFlux, first you need to convert a Flux<T> into a ParallelFlux<T>. After that, you have to call runOn(Scheduler) to specify in which scheduler the elements should be processed. Without that, it cannot be processed in parallel.

To convert a Flux<T> into a ParallelFlux<T>, you can apply .parallel() method to the Flux. That method has some variations

  .parallel()

  .parallel(int parallelism)

  .parallel(int parallelism, int prefetch)

parallelism is parameter is used to set the number of parallelism - how many tasks can be executed simultaneously (the number of 'rails'). By default, if you don't pass parallelism argument (using .parallel() without parameter), it uses the number of available processor threads. Please be aware that modern processor can have multiple threads per core.

Another parameter you can pass is prefetch which is used to set the initial request numbers. If you don't pass that argument (using the first and second methods), Queues.SMALL_BUFFER_SIZE will be used. The value itself is calculated as below:

  Math.max(16, Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

For the examples below, we assume there is a method doSomething that needs some time to finish.

Here is a complete example how to use ParallelFlux. This example use a parallelism of 5 and it uses elastic scheduler passed on runOn. If you run the code, you will see that each element is processed in the thread defined on runOn. First, it processes 5 elements simultaneously. The next element can only be processed after one of the first 5 elements has been finished, giving its slot to the next element.

  Flux.range(1, 10)
      .parallel(5)
      .runOn(Schedulers.elastic())
      .doOnNext(i -> {
          System.out.println(String.format("Executing %s on thread %s", i, Thread.currentThread().getName()));
  
          doSomething();
  
          System.out.println(String.format("Finish executing %s", i));
      })
      .subscribe();

The second example below uses parallel scheduler. Even if the scheduler's parallelism is greater than 5, there can only 5 elements processed at the same time.

  Flux.range(1, 10)
      .parallel(5, 1)
      .runOn(Schedulers.newParallel("parallel", 10))
      .flatMap(
          i -> {
              System.out.println(String.format("Start executing %s on thread %s", i, Thread.currentThread().getName()));
  
              myClass.doSomething();
  
              System.out.println(String.format("Finish executing %s", i));
  
              return Mono.just(i);
          }
      ).subscribe();

How about changing the scheduler's parallelism to 1 (using .runOn(Schedulers.newParallel("parallel", 1)))? Though the parallelism of ParallelFlux is set to 5, the scheduler becomes the bottleneck as there is a blocking process which blocks the thread until it finishes. Therefore, it only executes one task at the same time.

Using flatMap

The second way is using flatMap. Compared to ParallelFlux, the use of flatMap is more about orchestration. Some variants of Flux's flatMap method have concurrency option which is used to set the maximum number of in-flight inner sequences.

Below are two method signatures of flatMap supporting maximum concurrency option.

  flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency)
  flatMap(Function<? super T, ? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)

The first argument of flatMap is mapper. To define in which scheduler the mapping should run, you can wrap it in a provider using defer, then use subscribeOn with the scheduler you want to use.

concurrency and prefetch arguments are used to set parallelism and the initial request numbers respectively, as explained on ParallelFlux section.

The example below sets the concurrency to 5, each process runs on a thread from elastic scheduler. There is a blocking process. As there can only be 5 in-process elements, the next element must wait for one of the in-process element to finish.

  Scheduler scheduler = Schedulers.elastic();
  
  Flux.range(1, 10)
      .flatMap(
          i -> Mono.defer(() -> {
              System.out.println(String.format("Executing %s on thread %s", i, Thread.currentThread().getName()));
  
              myClass.doSomething();
  
              System.out.println(String.format("Finish executing %s", i));
  
              return Mono.just(i);
          }).subscribeOn(scheduler)
          ,
          5
      )
      .log()
      .subscribe(System.out::println);

Below is just another example that uses parallel scheduler. The concurrency is set to 5 as well. There is no process blocking the thread. If you try to change the scheduler's parallelism to be lower than 5, there are still 5 elements processed simultaneously. It means a thread from the scheduler can handle multiple elements simultaneously.

  Scheduler scheduler = Schedulers.newParallel("parallel", 2);
  
  Flux.range(1, 10)
      .flatMap(
          i -> Mono.defer(() -> {
              System.out.println(String.format("Executing %s on thread %s", i, Thread.currentThread().getName()));
  
              return Mono.delay(Duration.ofSeconds(i))
                      .flatMap(x -> {
                          System.out.println(String.format("Finish executing %s", i));
  
                          return Mono.just(i);
                      });
          }).subscribeOn(scheduler),
          5
      )
      .log()
      .subscribe(System.out::println);

In the previous example, there is no process blocking the thread. What if we add a blocking process?

  Scheduler scheduler = Schedulers.newParallel("parallel", 2);
  
  Flux.range(1, 10)
      .flatMap(
          i -> Mono.defer(() -> {
              System.out.println(String.format("Executing %s on thread %s", i, Thread.currentThread().getName()));

              myClass.doSomething();

              System.out.println(String.format("Finish executing %s", i));
          }).subscribeOn(scheduler),
          5
      )
      .log()
      .subscribe(System.out::println);

As doSomething blocks the thread, it can't be used to process the next element. So, the number of threads in the parallel scheduler (2) is the limitation.

That's all about how to process Project Reactor's Flux in parallel.