Project Reactor - CacheMono & CacheFlux with Caffeine Examples

This tutorial shows you how to use CacheMono and CacheFlux in Project Reactor, including how to use them with Caffeine.

If you're using Project Reactor, sometimes you may want to cache a Mono or a Flux. Fortunately, it already provides an opinionated caching helper called CacheMono and CacheFlux. They define a cache abstraction for storing and restoring a Mono or a Flux. This tutorial explains how to use those two and also how to integrate them with Caffeine, a popular caching library.

Using CacheMono

Using CacheMono basically consists of three steps. The first one is looking up the value from the source based on the given key. The second step is handling cache missing, which will be done if the first step results in a cache miss. The last step is writing the value to the source in case of cache miss.

There are different ways to use CacheMono. One of which requires you to handle all the steps above (lookup value, handle cache misses, write value to the cache) manually. Alternatively, you can also provide a Map and let Project Reactor handle the lookup and write value to the cache – you only need to handle cache misses.

Manually Handle Lookup and Write

Let's start with the first way in which you need to handle lookup and write data to the cache manually. First, you need to handle how to retrieve the value from the cache using the below lookup method.

  public static <KEY, VALUE> MonoCacheBuilderCacheMiss<KEY, VALUE> lookup(Function<KEY, Mono<Signal<? extends VALUE>>> reader, KEY key);

It requires you to pass a Function as the first argument. The passed Function needs to accept a key as the parameter and returns a Mono. So, you need to pass a function which is responsible for retrieving a value based on the given key. You can use any source to store the value of each key. For example, you can use Reactor's Context, Map, Tree, or any data structure. For the second argument, you need to pass the key of the value to be retrieved.

Using CacheMono also requires you to handle cases when the given key cannot be found in the cache, usually known as cache miss. The return type of the first lookup method is MonoCacheBuilderCacheMiss. You can use one of its onCacheMissResume methods to handle cache misses.

  MonoCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Supplier<Mono<VALUE>> otherSupplier);
  MonoCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Mono<VALUE> other);

There are two onCacheMissResume variants. The first one requires you to pass a Supplier that returns a Mono. It can be used if you don't need to pass any argument for generating the value. However, if you need to pass an argument (usually the key) for generating the value, you should use the variant that accepts a Mono as the argument.

Another thing you need to handle is writing the data to the cache. The MonoCacheBuilderCacheWriter class has a method called andWriteWith. You need to call it and pass a BiFunction that writes the value returned by onCacheMissResume to the cache. The andWriteWith method is only called on cache misses.

  Mono<VALUE> andWriteWith(BiFunction<KEY, Signal<? extends VALUE>, Mono<Void>> writer)

With the above explanation, below is an example of how to use CacheMono using the above lookup method. In the example below, we are going to store the values in a Map<String, String>.

  final Map<String, String> mapStringCache = new HashMap<>();

Below are the methods that can be passed as onCacheMissResume argument. The first one (with 0 parmeter) can be passed as a Supplier, while the second one can be passed as a Mono.

  private Mono<String> handleCacheMiss() {
    System.out.println("Cache miss!");

    return Mono.just(ZonedDateTime.now().toString());
  }

  private Mono<String> handleCacheMiss(String key) {
    System.out.println("Cache miss!");

    return Mono.just(key + ": " + Instant.now().toString());
  }

The code below is a complete chain that uses the lookup, onCacheMissResume, and andWriteWith methods.

  final Mono<String> cachedMono1 = CacheMono
      .lookup(
          k -> Mono.justOrEmpty(mapStringCache.get(key)).map(Signal::next),
          key
      )
//        .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
      .onCacheMissResume(this.handleCacheMiss(key))
      .andWriteWith((k, sig) -> Mono.fromRunnable(() ->
          mapStringCache.put(k, Objects.requireNonNull(sig.get()))
      ));

Provide a Map

Another way to use CacheMono is by providing a Map. Using this way requires you to use one of the lookup methods that accepts a Map.

  public static <KEY, VALUE> MonoCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super Signal<? extends VALUE>> cacheMap, KEY key);
  public static <KEY, VALUE> MonoCacheBuilderMapMiss<VALUE> lookup(Map<KEY, ? super Signal<? extends VALUE>> cacheMap, KEY key, Class<VALUE> valueClass);

The first method requires you to pass a Map as the first argument and a key as the second argument. The key type of the Map must be compatible with the type of the passed key. The Map's value type must be a Signal<T>. If you want to use the first method, you have to be able to provide the cache data representation in a Map<KEY, ? super Signal<? extends VALUE>>. That means you can store any type of Signal including next, complete, and error.

You also need to handle cache misses. The MonoCacheBuilderMapMiss also has onCacheMissResume methods, as shown below. One of which accepts a Supplier and the other accepts a Mono. Below are the methods.

  Mono<VALUE> onCacheMissResume(Supplier<Mono<VALUE>> otherSupplier)
  Mono<VALUE> onCacheMissResume(Mono<VALUE> other)

If a given key doesn't exist in the Map, the onCacheMissResume method will be invoked. Unlike the previous lookup method (which doesn't use Map), the returned value will be stored to the Map cache automatically. Therefore, it doesn't provide andWriteWith method and you don't need to manually save the value to the cache.

Let's start with the example of using the first lookup method (the one with two parameters). First, you need to provide a Map whose value type is Signal<? extends String>.

  final Map<String, Signal<? extends String>> mapStringSignalCache = new HashMap<>();

Below is the usage example.

  final Mono<String> cachedMono2 = CacheMono
      .lookup(mapStringSignalCache, key)
//      .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
      .onCacheMissResume(this.handleCacheMiss(key));

The second method (the one with three parameters) is similar to the first one. The difference is it accepts a third argument whose type is Class<VALUE> that indicates the generic class of the resulting Mono. You can use this method if you want to cast the cached signal value to a given type (must be a subtype of the signal value type).

For the second lookup method, we are going to create another Map whose type value is Signal<?>.

  final Map<String, Signal<? extends Object>> mapObjectSignalCache = new HashMap<>();

Below is the usage example which passes String.class as the third argument of the lookup method. As a result the value is casted to String. Be careful as it may throw ClassCastException if the value cannot be casted to the given class.

  final Mono<String> cachedMono3 = CacheMono
      .lookup(mapObjectSignalCache, key, String.class)
//      .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
      .onCacheMissResume(this.handleCacheMiss(key));

Using CacheFlux

The usage of CacheFlux is similar to CacheMono. You need to handle how to data lookup from cache, handle cache misses, and write data to the cache. The main difference is you need to work with a list of values or Signals. With CacheFlux, you can also choose whether to manually handle the lookup and write process or provide a Map, depending on the used lookup method.

Manually Handle Lookup and Write

The first way is to manually handle lookup and write values to the cache, using the below lookup method.

  public static <KEY, VALUE> FluxCacheBuilderCacheMiss<KEY, VALUE> lookup(
      Function<KEY, Mono<List<Signal<VALUE>>>> reader,
      KEY key
  )

You need to pass a function that accepts a key as the argument. Inside the passed function, you need to obtain the values from the cache based on the given key and return it as a Mono of List of Signal (Mono<List<Signal<VALUE>>>). If the key doesn't exist in the cache, you have to return an empty Mono.

Next, you have to handle how to generate values on cache misses. The FluxCacheBuilderCacheMiss class has two methods named onCacheMissResume. You can choose to pass a Flux or a Supplier. The former should be used if you need to generate the value based on the given key.

  FluxCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Supplier<Flux<VALUE>> otherSupplier);
  FluxCacheBuilderCacheWriter<KEY, VALUE> onCacheMissResume(Flux<VALUE> other);

Lastly, you need to handle how to store the data generated by <onCacheMissResume method by using FluxCacheBuilderCacheMiss's onCacheMissResume method.

  Flux<VALUE> andWriteWith(BiFunction<KEY, List<Signal<VALUE>>, Mono<Void>> writer);

Let's start with the example. This time, we have a Map whose key type is Integer and value type is List<Integer>.

  final Map<Integer, List<Integer>> mapIntCache = new HashMap<>();

Below are the methods that can be passed as onCacheMissResume argument. The first one (with 0 parmeter) can be passed as a Supplier, while the other can be passed as a Mono.

  private Flux<Integer> handleCacheMiss() {
    System.out.println("Cache miss!");
    final List<Integer> values = new ArrayList<>();

    for (int i = 1; i <= 5; i++) {
      values.add(i);
    }

    return Flux.fromIterable(values);
  }

  private Flux<Integer> handleCacheMiss(Integer key) {
    System.out.println("Cache miss!");
    final List<Integer> values = new ArrayList<>();

    for (int i = 1; i <= 5; i++) {
      values.add(i * key);
    }

    return Flux.fromIterable(values);
  }

Below is a complete chain that uses the lookup, onCacheMissResume, and andWriteWith methods.

  final Flux<Integer> cachedFlux1 = CacheFlux
    .lookup(
      k -> {
        if (mapIntCache.get(k) != null) {
        Mono<List<Signal<Integer>>> res = Flux.fromIterable(mapIntCache.get(k))
          .map(Signal::next)
          .collectList();

        return res;
        } else {
        return Mono.empty();
        }
      },
      key
    )
    .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
//    .onCacheMissResume(() -> Flux.defer(() -> this.handleCacheMiss(key)))
    .andWriteWith((k, sig) -> Mono.fromRunnable(() ->
      mapCache.put(
        k,
        sig.stream()
          .filter(signal -> signal.getType() == SignalType.ON_NEXT)
          .map(Signal::get)
          .collect(Collectors.toList())
      )
    ));

Provide a Map

Another way to use CacheFlux is by passing a Map. CacheFlux has another static lookup method that allows you to pass a Map.

  public static <KEY, VALUE> FluxCacheBuilderMapMiss<VALUE> lookup(
      Map<KEY, ? super List> cacheMap,
      KEY key,
      Class<VALUE> valueClass
  )

The Map has to be passed as the first argument. The key type of the Map must be compatible with the type of the key passed as the second argument. The Map's value type must be a List or another type that extends a List. Unfortunately, you cannot define a generic type for the List. However, it doesn't mean you can pass any value as the element of the List. The List can only contain Project Reactor's Signal. For the third argument, you have to pass a Class which is used to cast each Flux element.

Then, you need to handle cache misses by using one of the onCacheMissResume methods. One of the methods requires you to pass a Supplier, while the other requires you to pass a Flux. You should use the latter if you need to generate the values based on the given key. When a cache miss occurs, the onCacehMissResume method will be called and the resulting values will be stored to the Map automatically.

  Flux<VALUE> onCacheMissResume(Supplier<Flux<VALUE>> otherSupplier);
  Flux<VALUE> onCacheMissResume(Flux<VALUE> other);

To use CacheFlux by providing a Map, you need to have a Map whose value type is List.

  final Map<Integer, List> mapCache = new HashMap<>();

Below is the usage example.

  final Flux<Integer> cachedFlux2 = CacheFlux
    .lookup(
      mapCache,
      key,
      Integer.class
    )
    .onCacheMissResume(this::handleCacheMiss);
//    .onCacheMissResume(Flux.defer(() -> this.handleCacheMiss(key)));

  return cachedFlux2
    .doOnNext(res -> System.out.println("Value is " + res));

Using CacheMono and CacheFlux with Caffeine

If you need more advanced features for caching such as expiration time, most likely you'll use a cache library that provides the features you need. The CacheMono and CacheFlux can be used with any caching library, such as Caffeine. This tutorial doesn't explain how to use Caffeine in detail, as it will be very long to explain it. We only focus on how to use it with CacheMono and CacheFlux. First of all, we need to create a Caffeine cache. Caffeine has some classes for creating caches, such as Cache, AsyncCache, and LoadingCache. In this example, we are going to use the Cache class.

  final Cache<String, String> caffeineCache = Caffeine.newBuilder()
      .expireAfterWrite(Duration.ofSeconds(30))
      .recordStats()
      .build();

As I have explained above, to use CacheMono, you can provide a Map whose value type is Reactor's Signal. If not possible, you need to pass a Function for looking up the value and handle write data to the cache manually. Caffeine has a method for converting the cache into a Map, but the value type is not a Signal, which means it's not compatible with the lookup methods that accept a Map parameter. Therefore, you need to handle the lookup by passing a Function and store the generated values to the Caffeine cache.

  final Mono<String> cachedMonoCaffeine = CacheMono
      .lookup(
          k -> Mono.justOrEmpty(caffeineCache.getIfPresent(k)).map(Signal::next),
          key
      )
  //    .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
      .onCacheMissResume(this.handleCacheMiss(key))
      .andWriteWith((k, sig) -> Mono.fromRunnable(() ->
          caffeineCache.put(k, Objects.requireNonNull(sig.get()))
      ));

The usage for CaceFlux is also similar. Below is another Caffeine cache whose value type is List<Integer>.

  final Cache<Integer, List<Integer>> caffeineCache = Caffeine.newBuilder()
      .expireAfterWrite(Duration.ofSeconds(30))
      .recordStats()
      .build();

To use Caffeine cache with CacheFlux, you need to use the lookup method that accepts a Function as the first parameter. The passed Function is responsible to get the value from the cache or return an empty Mono if the key is not present. You also need to store the values to the Caffeine cache.

  final Flux<Integer> cachedFluxCaffeine = CacheFlux
      .lookup(
          k -> {
            final List<Integer> cached = caffeineCache.getIfPresent(k);

            if (cached == null) {
              return Mono.empty();
            }

            return Mono.just(cached)
                .flatMapMany(Flux::fromIterable)
                .map(Signal::next)
                .collectList();
          },
          key
      )
//        .onCacheMissResume(this::handleCacheMiss) // Uncomment this if you want to pass a Supplier
      .onCacheMissResume(this.handleCacheMiss(key))
      .andWriteWith((k, sig) -> Mono.fromRunnable(() ->
          caffeineCache.put(
              k,
              sig.stream()
                  .filter(signal -> signal.getType() == SignalType.ON_NEXT)
                  .map(Signal::get)
                  .collect(Collectors.toList())
          )
      ));

Summary

That's how to use CacheMono and CacheFlux in Project Reactor. Basically, there are two options: handle lookup and store values manually or provide a compatible Map whose value type is a Signal (for CacheMono) or List<Signal> (for CacheFlux). The latter option is preferred if possible because it's simpler. CacheMono and CacheFlux can also be used with any caching library such as Caffeine. The full code of this tutorial is available on GitHub.