Project Reactor - Using subscribe on Mono and Flux

In Project Reactor, after you create a Mono or Flux chain, nothing happens until it is being subscribed. This tutorial shows you how to use subscribe method on Mono and Flux, including what parameters can be passed to the method.

By using subscribe, it triggers data flow through the chain. The code after the chain can be executed immediately without waiting for the chain to be completed.

  Flux.range(1, 10).delayElements(Duration.ofSeconds(1))
      .subscribe();

      System.out.println("Below chain");

If you run the code, "Below chain" will be printed immediately without waiting for the chain to be completed which should take at least 10 seconds. If you want to wait for the chain to be completed, you can use block instead.

Below are the methods you can use

  // 1
  Disposable subscribe()

  // 2
  Disposable subscribe(Consumer<? super T> consumer)

  // 3
  Disposable subscribe(@Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer)

  // 4
  Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer)

  // 5
  Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable 
  Consumer<? super Subscription> subscriptionConsumer)

  // 6
  abstract void subscribe(CoreSubscriber<? super T> var1)

Those methods are applicable on both Mono and Flux. This tutorial only gives examples on Flux as you can easily do the same on Mono.

If you only want to make the subscription happens without processing the result, the simplest way is using the first method which takes no argument.

  Flux.range(1, 5)
      .subscribe();

If you need the process the result, you can pass a Consumer (a lambda) as the first argument. It will be invoked on each value.

  Flux.range(1, 5)
      .subscribe(result -> System.out.println(result));

The Consumer above will not be invoked when an error signal received. To handle error, you can pass a second Consumer which takes Throwable as the argument.

  Flux.range(1, 5)
      .map(i -> {
          if (i < 5) return i;
          throw new RuntimeException("Error");
      })
      .subscribe(
          result -> System.out.println(result),
          error -> System.err.println("Error: " + error)
      );

After the Flux completes, it will throw a complete signal that can be handled by passing a third argument.

  Flux.range(1, 5)
                .subscribe(
                        result -> System.out.println(result),
                        error -> System.err.println("Error: " + error),
                        () -> System.out.println("Done")
                );

You can also pass a fourth argument which is a Consumer that will be invoked on subscribe signal.

  Flux.range(1, 5)
      .subscribe(
          result -> System.out.println(result),
          error -> System.err.println("Error: " + error),
          () -> System.out.println("Done"),
          sub -> sub.request(2) // Up to 2 elements from source
      );

The example above only requests up to 2 elements from the source. That causes completed signal not generated and the lambda on the third argument will never be called.

Alternatively, you can pass a CoreSubscriber instead. Inside, you can override its methods to control what to do when a signal received. You can override hookOnSubscribe, hookOnNext, hookOnComplete, hookOnError, and hookFinally.

  Flux.range(1, 5)
      .subscribe(new BaseSubscriber() {
          @Override
          public void hookOnSubscribe(Subscription subscription) {
              System.out.println("Subscribe");
              request(5);
          }
  
          @Override
          public void hookOnNext(Integer value) {
              System.out.println("Value:" + value);
          }
  
          @Override
          public void hookOnComplete() {
              System.out.println("Complete");
          }
  
          @Override
          public void hookOnError(Throwable throwable) {
              System.out.println("Error: " + throwable);
          }
  
          @Override
          public void hookFinally(SignalType signalType) {
              System.out.println("SignalType: " + signalType);
          }
      });