Project Reactor - Adding Delay to Mono and Flux

In Project Reactor, we can add delay for a certain duration to Mono or Flux publisher. This tutorial shows you how to do so.

The important thing you need to aware if you use a Reactor's delay method is some of them switches the execution to a parallel scheduler by default. It implicitly calls .publishOn(Schedulers.parallel()) which means the operators below the delay operator are affected. If you don't already understand about publishOn, you can read about it here.

Let's take a look at the example below

  Mono.just("one")
      .delayElement(Duration.ofSeconds(2))
      .log()
      .block();

If you run the code, you will find that onNext signal is received on a parallel thread. That also happens even if you add publishOn or subscribeOn before the delay operator. Adding publishOn after the delay operator allows you to switch the execution to another scheduler you want. However, most delay operators allow us to pass a second argument to set the scheduler, replacing the default setting which uses parallel scheduler.

  Scheduler scheduler = Schedulers.newElastic("es1");

  Mono.just("one")
      .delayElement(Duration.ofSeconds(2), scheduler)
      .log()
      .block();

Delay on Mono

delay

It is used to delay onNext signal by the given Duration.

Variations:

  (Duration length)
  (Duration length, Scheduler timer)

Example:

  Mono.just("one")
      .delay(Duration.ofSeconds(2))
      .log()
      .subscribe();

delayElement

It is used to delay the Mono element by the given Duration. Delay will not be applied on empty Mono or error signal.

Variations:

  (Duration length)
  (Duration length, Scheduler timer)

Example:

  Mono.just("one")
      .delayElement(Duration.ofSeconds(2))
      .log()
      .subscribe();

delaySubscription

It is used to add subscription delay to the Mono by the given Duration.

Variations:

  (Duration length)
  (Duration length, Scheduler timer)
  (Publisher<U> subscriptionDelay)

Example:

  Mono.just("one")
      .delaySubscription(Duration.ofSeconds(2))
      .log()
      .subscribe();

The usage of the last variation is somewhat different to the other two variations. It requires a Publisher in the parameter. It delays the subscription until the passed Publisher sends a signal or complete.

  Mono.just("one")
      .delaySubscription(Mono.delay(Duration.ofSeconds(5)))
      .log()
      .subscribe();

delayUntil

It is used to delay the Mono by generating a trigger Publisher, then wait until the trigger Publisher terminates. If an error occurs, it will be propagated downstream.

Variations:

  (Function<? super T, ? extends Publisher<?>> triggerProvider)

Example:

  Mono.just("one").delayUntil(a -> {
      Mono.delay(Duration.ofSeconds(5)).block();
  
      return Mono.just("two");
  }).log().subscribe();

Delay on Flux

delayElements

It is used to add delay between element emission by the given Duration. It affects the frequency how many elements are emitted per second. Delay will not be applied on empty Flux or error signal.

Variations:

  (Duration length)
  (Duration length, Scheduler timer)

Example:

  Flux.just("one", "two", "three")
      .delayElements(Duration.ofSeconds(2))
      .log()
      .subscribe();

delaySequence

It shifts the Flux forward by the given Duration. It doesn't affect the gap between elements which means it doesn't affect the emission frequency. Delay will not be applied on empty Flux or error signal.

Variations:

  (Duration length)
  (Duration length, Scheduler timer)

Example:

  Flux.just("one", "two", "three")
      .delaySequence(Duration.ofSeconds(2))
      .log()
      .subscribe();

delaySubscription

It's used to add subscription delay to the Flux by the given Duration.

Variations:

  (Duration length)
  (Duration length, Scheduler timer)
  (Publisher<U> subscriptionDelay)

Example:

  Flux.just("one", "two", "three")
      .delaySubscription(Duration.ofSeconds(2))
      .log()
      .subscribe();

The last variation allows us to pass a Publisher. It delays the subscription until the passed Publisher sends a signal or complete.

  Flux.just("one", "two", "three")
      .delaySubscription(Flux.just(1,2,3).delayElements(Duration.ofSeconds(5)))
      .log()
      .subscribe();

delayUntil

It is used to delay the Flux by generating a trigger Publisher for each element, then wait until all trigger Publishers terminates. If an error occurs, it will be propagated downstream.

Variations:

  (Function<? super T, ? extends Publisher<?>> triggerProvider)

Example:

  Flux.just("one", "two", "three").delayUntil(a -> {
      Mono.delay(Duration.ofSeconds(5)).block();

      return Mono.just("two");
  }).log().subscribe();