RxDart - Using Subject (Publish, Behavior, Replay)

This tutorial explains what is Subject in RxDart and types of Subject along with usage examples.

In ReactiveX, the term Subject refers to a sort of bridge or proxy that acts as both Observable and Observer. As an Observable, it can emit items. As an observer, it can subscribe to one or more Observables.

ReactiveX has some types of Subject: AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject, UnicastSubject, and SingleSubject. However, AsyncSubject, UnicastSubject, and SingleSubject are not implemented yet in RxDart. So, I will only give examples for the Subject types available in RxDart: BehaviorSubject, PublishSubject, and ReplaySubject.

Those Subject types have some differences especially in terms of how items are stored and delivered to listeners. However all of them are broadcast (hot) controllers which means the stream can be listened to multiple times. They all use Dart's StreamController and have onlisten, onCancel, and sync parameters that will be passed to StreamController.

By reading the below examples, you should be able to understand the differences between Subject types in RxDart.

Using PublishSubject

This is like a standard StreamController, but the stream returns an Observable instead of a Stream.

  factory PublishSubject(
      {void onListen(), void onCancel(), bool sync = false})

Example:

  PublishSubject s = new PublishSubject<int>();

  // Observer1 will receive all data and done events
  s.stream
      .listen((value) {
        print('Observer1: $value');
      });
  s.add(1);
  s.add(2);

  // Observer2 will only receive 3 and done event
  s.stream
      .listen((value) {
        print('Observer2: $value');
      });
  s.add(3);
  s.close();

Output:

  Observer1: 1
  Observer2: 3
  Observer1: 2
  Observer1: 3

Using BehaviorSubject

BehaviorSubject captures only the latest added item. When a new listener starts to listen to the controller, it will receive the stored item. After that, any new event will be sent to all listeners. Optionally you can set a seed value that will be used only if no item has been added to the Subject.

  factory BehaviorSubject({
    void onListen(),
    void onCancel(),
    bool sync = false,
  })

Example:

  BehaviorSubject s = new BehaviorSubject<int>();

  // Observer1 will receive all data and done events
  s.stream
      .listen((value) {
    print('Observer1: $value');
  });
  s.add(1);
  s.add(2);

  // Observer2 will only receive 2, 3,  and done event
  s.stream
      .listen((value) {
    print('Observer2: $value');
  });
  s.add(3);
  s.close();

Output:

  Observer1: 1
  Observer2: 2
  Observer2: 3
  Observer1: 2
  Observer1: 3

Below is an example with seeded value which will be set as the current value

  factory BehaviorSubject.seeded(
    T seedValue, {
    void onListen(),
    void onCancel(),
    bool sync = false,
  })

Replacing the constructor call in the previous example with this one:

  BehaviorSubject s = new BehaviorSubject<int>.seeded(-1);

will change the output to:

  Observer1: -1
  Observer1: 1
  Observer2: 2
  Observer2: 3
  Observer1: 2
  Observer1: 3

When Observer1 listens to the subject, the current value has already been set to -1 (instead of null). As the result, you will see -1 emitted first before 1. But when Observer2 listens to the subject, the current value has already been replaced with 2.

Using ReplaySubject

ReplaySubject captures all items that have been added. When a new listener starts to listen to the controller, it will receive all items. After that, any new event will be sent to all listeners.

  factory ReplaySubject({
    int maxSize,
    void onListen(),
    void onCancel(),
    bool sync = false,
  })

Example:

  ReplaySubject s = new ReplaySubject<int>();

  // Observer1 will receive all data and done events
  s.stream
      .listen((value) {
    print('Observer1: $value');
  });
  s.add(1);
  s.add(2);

  // Observer2 will receive all data and done events
  s.stream
      .listen((value) {
    print('Observer2: $value');
  });
  s.add(3);
  s.close();

Output:

  Observer1: 1
  Observer2: 1
  Observer1: 2
  Observer2: 2
  Observer1: 3
  Observer2: 3

Below is another example, this time with maxSize parameter set to 1. The maxSize parameter is used to set the maximum events to be stored. Replace the constructor call with the below code:

    ReplaySubject s = new ReplaySubject<int>(maxSize: 1);

Output:

  Observer1: 1
  Observer2: 2
  Observer2: 3
  Observer1: 2
  Observer1: 3

That makes Obserer2 only get '2' (because there is only one stored value) and 3 (the value emitted after it listens to the Subject).