Project Reactor - Using ReplayProcessor Examples

This tutorial gives you examples of how to use Project Reactor's ReplayProcessor.

There can be multiple elements emitted by a Reactor's FluxProcessor. What if there are already some emitted elements and you want late subscribers to get the previously emitted elements. It can be done using ReplayProcessor. This tutorial shows you some different ways of how to create ReplayProcessor with different behaviors.

Using cacheLast

Static method cacheLast is used to create a ReplayProcessor that only keeps the last element which will be replayed to late subscribers.

  public static <T> ReplayProcessor<T> cacheLast()

Example:

  ReplayProcessor<Integer> processor = ReplayProcessor.cacheLast();
  
  Flux.range(1, 5)
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .subscribe();

Output:

  A: 5
  B: 5
  C: 5

 

Using cacheLastOrDefault

Like cache, but it will replay a default value if the publisher hasn't emit any element.

  public static <T> ReplayProcessor<T> cacheLast()

Example:

  ReplayProcessor<Integer> processor = ReplayProcessor.cacheLastOrDefault(-1);
  
  processor.subscribe(x -> System.out.println("A: " + x));
  processor.subscribe(x -> System.out.println("B: " + x));
  processor.subscribe(x -> System.out.println("C: " + x));

Output:

  A: -1
  B: -1
  C: -1

 

Using create

There are some static method named create with different signatures. The one without parameter creates a new ReplayProcessor that can replay an unbounded number of elements depending on SMALL_BUFFER_SIZE value.

  public static <E> ReplayProcessor<E> create()

Example:

  ReplayProcessor<Integer> processor = ReplayProcessor.create();
  
  Flux.range(1, 5)
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .subscribe();

Output:

  A: 1
  A: 2
  A: 3
  A: 4
  A: 5
  B: 1
  B: 2
  B: 3
  B: 4
  B: 5
  C: 1
  C: 2
  C: 3
  C: 4
  C: 5

 

The second has a parameter named histroySize. It creates a processor that replays up to historySize elements.

  public static <E> ReplayProcessor<E> create(int historySize)

Example:

  ReplayProcessor<Integer> processor = ReplayProcessor.create(2);
  
  Flux.range(1, 5)
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .subscribe();

Output:

  A: 4
  A: 5
  B: 4
  B: 5
  C: 4
  C: 5

 

The last one has two parameters histroySize and unbounded. The created processor replays up to historySize elements if unbounded is set to false. If unbounded is set to true, it will store and replay unbounded number of elements.

  public static <E> ReplayProcessor<E> create(int historySize, boolean unbounded)

Example with unbounded set to false:

  ReplayProcessor<Integer> processor = ReplayProcessor.create(2, false);
  
  Flux.range(1, 5)
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .subscribe();

Output:

  A: 4
  A: 5
  B: 4
  B: 5
  C: 4
  C: 5

 

Example with unbounded set to true:

  ReplayProcessor<Integer> processor = ReplayProcessor.create(2, true);
  
  Flux.range(1, 5)
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .subscribe();

Output:

  A: 1
  A: 2
  A: 3
  A: 4
  A: 5
  B: 1
  B: 2
  B: 3
  B: 4
  B: 5
  C: 1
  C: 2
  C: 3
  C: 4
  C: 5
  

Using createTimeout

createTimeout creates a processor that only keeps received elements for the given duration. It tags each element using timestamp provided by Schedulers.parallel. If an element's age is older than the given duration value, it will be deleted.

  public static <T> ReplayProcessor<T> createTimeout(Duration maxAge)

To make it obvious, the below example adds one second delay between emitted elements:

  ReplayProcessor<Integer> processor = ReplayProcessor.createTimeout(Duration.ofSeconds(3));
  
  Flux.range(1, 5)
          .delayElements(Duration.ofSeconds(1))
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .blockLast();

Output:

  A: 3
  A: 4
  A: 5
  B: 3
  B: 4
  B: 5
  C: 3
  C: 4
  C: 5

There is another variant of createTimeout that allows you to pass a specific scheduler (instead of using Schedulers.parallel).

  public static <T> ReplayProcessor<T> createTimeout(Duration maxAge, Scheduler scheduler)

Here's an example of how to pass a specific scheduler. The output should be the same.

  ReplayProcessor<Integer> processor = ReplayProcessor.createTimeout(Duration.ofSeconds(3), Schedulers.newElastic("myScheduler"));

Using createSizeAndTimeout

createSizeAndTimeout applies two rules, maximum number of elements and maximum age, to determine which elements will be replayed to late subscribers. So, the subscribers can only keep up to certain number of elements where each element's age must be lower than the given maximum age. Like createTimeout, it uses timestamp from Schedulers.parallel.

  public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge)

In the below example, there is one second delay between each element. Though the maxAge is set to 3, there are only two elements received by the subscribers as the size value is set to 2.

  ReplayProcessor<Integer> processor = ReplayProcessor.createSizeAndTimeout(2, Duration.ofSeconds(3));
  
  Flux.range(1, 5)
          .delayElements(Duration.ofSeconds(1))
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .blockLast();

Output:

  A: 4
  A: 5
  B: 4
  B: 5
  C: 4
  C: 5

 

In the below example, the subscribers only receives the last three elements though the size value is set to 4. That's because the maxAge duration is only 3 seconds.

  ReplayProcessor<Integer> processor = ReplayProcessor.createSizeAndTimeout(4, Duration.ofSeconds(3));
  
  Flux.range(1, 5)
          .delayElements(Duration.ofSeconds(1))
          .doOnNext(integer -> {
              processor.onNext(integer);
          })
          .doOnComplete(() -> {
              processor.subscribe(x -> System.out.println("A: " + x));
              processor.subscribe(x -> System.out.println("B: " + x));
              processor.subscribe(x -> System.out.println("C: " + x));
          })
          .blockLast();

Output:

  A: 3
  A: 4
  A: 5
  B: 3
  B: 4
  B: 5
  C: 3
  C: 4
  C: 5

 

It also has a variant that uses a specific scheduler for the timestamp.

  public static <T> ReplayProcessor<T> createSizeAndTimeout(int size, Duration maxAge, Scheduler scheduler)

Here's an example how to use the variant with scheduler parameter.

  ReplayProcessor<Integer> processor = ReplayProcessor.createSizeAndTimeout(
          4,
          Duration.ofSeconds(3), 
          Schedulers.newElastic("myScheduler")
  );