Project Reactor - publishOn vs subscribeOn Difference

In Project Reactor, we can use publishOn and subscribeOn to control on which scheduler operators in a chain should be executed. This tutorial gives you explanations and some examples for showing difference between publishOn and subscribeOn.

Reactor is concurrency-agnostic. Rather than enforcing a concurrency model, developers can control how the code should be executed in threads. We can create different kinds of schedulers in Reactor.

For this tutorial, to make it easy to know where the executions take place, we are going to use two schedulers with different names:

  Scheduler schedulerA = Schedulers.newParallel("scheduler-a", 4);
  Scheduler schedulerB = Schedulers.newParallel("scheduler-b", 4);

publishOn

Like other operators in general, publishOn is applied in the middle of a chain. It affects subsequent operators after publishOn - they will be executed on a thread picked from publishOn's scheduler.

  Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .publishOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

The output of the code above is:

  First map - (1), Thread: main
  First map - (2), Thread: main
  Second map - (1), Thread: scheduler-a-1
  Second map - (2), Thread: scheduler-a-1

As you can see, only the second map which is placed after publishOn in the chain is executed on the scheduler. pubilshOn doesn't affect any operator before it.

subscribeOn

subscribeOn is applied to the subscription process. If you place a subscribeOn in a chain, it affects the source emission in the entire chain. However, operators after publishOn do not affected as the execution will be switched to one of the threads from publishOn's schedulers. Meanwhile, operators in the chain that's not affected by publishOn will be executed on a thread picked from subscribeOn's scheduler.

  Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .subscribeOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

Here's the output

  First map - (1), Thread: scheduler-a-1
  Second map - (1), Thread: scheduler-a-1
  First map - (2), Thread: scheduler-a-1
  Second map - (2), Thread: scheduler-a-1

More Advanced Examples

Multiple publishOn Operators

If there is more than one publishOn in a chain, how will it behave? You can see on the following example and its output.

  Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .publishOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .publishOn(schedulerB)
      .map(i -> {
          System.out.println(String.format("Third map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

Output:

  First map - (1), Thread: main
  First map - (2), Thread: main
  Second map - (1), Thread: scheduler-a-2
  Second map - (2), Thread: scheduler-a-2
  Third map - (1), Thread: scheduler-b-1
  Third map - (2), Thread: scheduler-b-1

The first publishOn affects the subsequent operators after it, which means it should affect the second and third maps. However, there is another publishOn which affects the third maps. The result shows us that the third map uses thread from scheduler B. We can conclude that if there is more than one preceding publishOn operators, the nearest preceding publishOn will be used.

Multiple subscribeOn Operators

What if we have multiple subscribeOn operators in a chain.

  Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .subscribeOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .subscribeOn(schedulerB)
      .map(i -> {
          System.out.println(String.format("Third map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

The result is:

  First map - (1), Thread: scheduler-a-2
  Second map - (1), Thread: scheduler-a-2
  Third map - (1), Thread: scheduler-a-2
  First map - (2), Thread: scheduler-a-2
  Second map - (2), Thread: scheduler-a-2
  Third map - (2), Thread: scheduler-a-2

As you can see, all maps are executed on a thread picked from scheduler A which is used by the first subscribeOn. If you define multiple subscribeOn operators in a chain, it will use the first one.

Using Both publishOn and subscribeOn

Now, we have a subscribeOn followed by a publishOn.

  Flux.range(1, 2)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .subscribeOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .publishOn(schedulerB)
      .map(i -> {
          System.out.println(String.format("Third map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .blockLast();

Output:

  First map - (1), Thread: scheduler-a-2
  Second map - (1), Thread: scheduler-a-2
  First map - (2), Thread: scheduler-a-2
  Third map - (1), Thread: scheduler-b-1
  Second map - (2), Thread: scheduler-a-2
  Third map - (2), Thread: scheduler-b-1

Initially, subscribeOn schedules all operators on schedulerA. But the presence of publishOn makes the third map which is placed after it to use its scheduler.

If we switch the position of publishOn and subscribeOn in the code above, the second and third maps will be exucuted by publishOn's schedulers.

Nested Chain

Let's take a look at another example. This time, there is a nested chain.

  Flux.range(1, 5)
      .map(i -> {
          System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
          return i;
      })
      .subscribeOn(schedulerA)
      .map(i -> {
          System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
  
          return Flux
                  .range(1, 2)
                  .map(j -> {
                      System.out.println(String.format("First map - (%s.%s), Thread: %s", i, j, Thread.currentThread().getName()));
                      return j;
                  })
                  .subscribeOn(schedulerB)
                  .map(j -> {
                      System.out.println(String.format("Second map - (%s.%s), Thread: %s", i, j, Thread.currentThread().getName()));
                      return "value " + j;
                  }).subscribe();
      })
      .blockLast();

At first, the second outer map runs on the scheduler A. But there is another chain insideit which uses scheduler B. Here's the output

  First map - (1), Thread: scheduler-a-1
  Second map - (1), Thread: scheduler-a-1
  First map - (2), Thread: scheduler-a-1
  Second map - (2), Thread: scheduler-a-1
  First map - (1.1), Thread: scheduler-b-2
  Second map - (1.1), Thread: scheduler-b-2
  First map - (3), Thread: scheduler-a-1
  Second map - (3), Thread: scheduler-a-1
  First map - (2.1), Thread: scheduler-b-3
  First map - (4), Thread: scheduler-a-1
  Second map - (2.1), Thread: scheduler-b-3
  Second map - (4), Thread: scheduler-a-1
  First map - (3.1), Thread: scheduler-b-4
  Second map - (3.1), Thread: scheduler-b-4
  First map - (5), Thread: scheduler-a-1
  First map - (4.1), Thread: scheduler-b-5
  Second map - (5), Thread: scheduler-a-1
  Second map - (4.1), Thread: scheduler-b-5
  First map - (1.2), Thread: scheduler-b-2
  First map - (3.2), Thread: scheduler-b-4
  First map - (2.2), Thread: scheduler-b-3
  Second map - (1.2), Thread: scheduler-b-2
  Second map - (3.2), Thread: scheduler-b-4
  Second map - (2.2), Thread: scheduler-b-3
  First map - (5.1), Thread: scheduler-b-2
  First map - (4.2), Thread: scheduler-b-5
  Second map - (5.1), Thread: scheduler-b-2
  Second map - (4.2), Thread: scheduler-b-5
  First map - (5.2), Thread: scheduler-b-2
  Second map - (5.2), Thread: scheduler-b-2

The inner chain is subscribed on scheduler B. So, if we have nested chain, we can control the inner chain to use different scheduler.

Conclusion

To determine the execution of operators in a chain if you use publishOn or subscribeOn, what you need to do are:

  1. First, find the topmost subscribeOn. If found, set all operators in the chain to use a thread picked from that its scheduler.
  2. Then, find all publishOn operators from top to bottom. For each, set the subsequent operators below it to use a thread picked from its scheduler.

Please be aware that some operators may change the scheduler. For example, adding .delayElements implicitly calls .publishOn(Schedulers.parallel()).