Project Reactor - Using block on Mono and Flux

In Project Reactor, we can create a chain of operators. If we use subscribe on the chain, the next code outside chain will be executed without waiting the operations in chain to be finished. If you want to wait the chain to finish first, you can use block method instead of subscribe. This tutorial shows you how to use block and its variations on both Mono and Flux, including how to set timeout.

In Reactor, in order for data to flow through a chain, there must be a subscription process. You can use subscribe to make it happens. But besides subscribe you can also use block, which also does the subscription. However, it blocks the code outside chain.

To give you a better understanding, look at the exaample below

  Mono.delay(Duration.ofSeconds(2))
      .then(Mono.just("text"))
      .block();

  System.out.println("After chain")

If you run the code, "After chain" will be printed after the chain completed which is after 2 seconds. Below are some examples how to use block on Mono and Flux.

Using block on Mono

Using .block()

This method is used to subscribe to the Mono and wait until the next signal is received. It returns the value or null if the Mono returns empty.

  String result = Mono.just("text")
      .block(); // text

  String resultEmpty = Mono.empty()
      .block(); // null

You can also set a Duration to set how long it should wait. If it's still waiting after the wait duration, it will throw IllegalStateException.

  String result = Mono.delay(Duration.ofSeconds(2))
      .then(Mono.just("text"))
      .block(Duration.ofSeconds(1));

The code above throws:

  java.lang.IllegalStateException: Timeout on blocking read for 1000 MILLISECONDS

.

Using .blockOptional()

It's similar to the previous method, but it wraps the value inside java.util.Optional instead. So, you need to use .isPresent() to check whether the value is null or not and use .get() to get the value.

  Optional result = Mono.just("text")
      .blockOptional();
  
  System.out.println(result.isPresent()); // true
  System.out.println(result.get()); // text
  
  Optional emptyResult = Mono.empty()
      .blockOptional();
  
  System.out.println(emptyResult.isPresent()); // false
  System.out.println(emptyResult.get()); // NoSuchElementException

Setting duration is also possible for blockOptional. It willl also throw java.lang.IllegalStateException if maximum waiting time period reached and no signal received.

  Mono.delay(Duration.ofSeconds(2))
      .blockOptional(Duration.ofSeconds(1));

Using block on Flux

Using .blockFirst()

This method is used to subscribe to the Flux and wait until the first value from upstream received.

  Flux.just("one", "two", "three")
      .delayElements(Duration.ofSeconds(2))
      .blockFirst();

  System.out.println("After chain");

Right after the first element emitted, at that moment you should be able see "After chain" printed.

Optionally, you can set waiting duration before it throws IllegalStateException

  Flux.just("one", "two", "three")
      .delayElements(Duration.ofSeconds(2))
      .blockFirst(Duration.ofSeconds(1));

  System.out.println("After chain");

Using .blockLast()

This method is used to subscribe to the Flux and wait until the last value from upstream received.

  Flux.just("one", "two", "three")
      .delayElements(Duration.ofSeconds(2))
      .blockLast();

  System.out.println("After chain");

You have to wait for 6 seconds (3 x 2 seconds) to see "After chain" printed.

Like blockFirst, you can also set waiting duration for blockLast.

  Flux.just("one", "two", "three")
      .delayElements(Duration.ofSeconds(2))
      .blockLast(Duration.ofSeconds(3));