RxDart - Error Handling Examples

This tutorial shows you how to handle error in RxDart.

Error can be thrown in the middle of a reactive chain and it may affect the flow. In some cases, you need to define what should be done if an error occurs, either to catch it and return another value, map the error into another error, or just let the error thrown to the next operator. This tutorial has some operators that can be used to handle error in an RxDart reactive chain along with the examples.

The operators covered in this tutorial include:

For example, there's an Observable that emits value 1, 2, 3, 4 followed by a flatMap that throws error if the value is not less than 4. Then, use the above operators to handle the error.

Using doOnError

doOnError is used to invoke the given callback function when an error is emitted by the stream. It doesn't catch the error, so the error is still uncaught when it goes to the next operator in the chain.

  Observable<T> doOnError(Function onError) =>
      transform(DoStreamTransformer<T>(onError: onError));

The onError function must be a void onError(error) or void onError(error, StackTrace stackTrace). Keep in mind that stackTrace might be null if the received error doesn't have stack trace.

Example:

  Observable.range(1, 4)
      .flatMap((value) {
        return value < 4 ? Observable.just(value) : throw Observable.error(new MyException());
      })
      .doOnError((err, stacktrace) {
        print('doOnError: $err');
        print(stacktrace);
      })
      .listen(print);

Output:

  1
  2
  3
  doOnError: Instance of 'Observable<dynamic>'
  #0      main.<anonymous closure> (file:///home/ivan/try-dart/src/rxdart-error.dart:12:53)
  #1      FlatMapStreamTransformer._buildTransformer.<anonymous closure>.<anonymous closure>.<anonymous closure> (package:rxdart/src/transformers/flat_map.dart:45:45)
  #2      _RootZone.runUnaryGuarded (dart:async/zone.dart:1316:10)
  #3      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:338:11)
  #4      _IterablePendingEvents.handleNext (dart:async/stream_impl.dart:539:18)
  #5      _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:669:7)
  #6      _microtaskLoop (dart:async/schedule_microtask.dart:43:21)
  #7      _startMicrotaskLoop (dart:async/schedule_microtask.dart:52:5)
  #8      _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:118:13)
  #9      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:175:5)
  
  Unhandled exception:
  Instance of 'Observable<dynamic>'
  #0      main.<anonymous closure> (file:///home/ivan/try-dart/src/rxdart-error.dart:12:53)
  #1      FlatMapStreamTransformer._buildTransformer.<anonymous closure>.<anonymous closure>.<anonymous closure> (package:rxdart/src/transformers/flat_map.dart:45:45)
  #2      _RootZone.runUnaryGuarded (dart:async/zone.dart:1316:10)
  #3      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:338:11)
  #4      _IterablePendingEvents.handleNext (dart:async/stream_impl.dart:539:18)
  #5      _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:669:7)
  #6      _microtaskLoop (dart:async/schedule_microtask.dart:43:21)
  #7      _startMicrotaskLoop (dart:async/schedule_microtask.dart:52:5)
  #8      _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:118:13)
  #9      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:175:5)

Using handleError

handleError is used to intercept error from the source Observable by using a callback function. It catches the error, so the next operator will not get the error.

  Observable<T> handleError(Function onError, {bool test(dynamic error)}) =>
      Observable<T>(_stream.handleError(onError, test: test));

The onError function must be a void onError(error) or void onError(error, StackTrace stackTrace). Keep in mind that stackTrace might be null if the received error doesn't have stack trace.

handleError has an optional parameter test which accepts a function bool test(dynamic error). The handle function only intercepts if the test function returns true. By default the test function always returns true if you don't pass it.

Example:

  Observable.range(1, 4)
      .flatMap((value) {
        return value < 4 ? Observable.just(value) : Observable.error(new MyException());
      })
      .handleError((err, stacktrace) {
        print('handleError: $err');
        print(stacktrace);
      },
      test: (dynamic e) {
        return e is MyException;
      })
      .listen(print);

Output:

  1
  2
  3
  handleError: An exception
  null

Using onErrorReturn

onErrorReturn intercepts the error event from the source Observable and returns a particular value to the next operator. The flow will continue normally as the error has already been caught.

  Observable<T> onErrorReturn(T returnValue) =>
      transform(OnErrorResumeStreamTransformer<T>(
          (dynamic e) => Observable<T>.just(returnValue)));

Example:

  Observable.range(1, 4)
      .flatMap((value) {
    return value < 4 ? Observable.just(value) : Observable.error(new MyException());
  })
      .onErrorReturn(-1)
      .listen(print);

Output:

  1
  2
  3
  -1

Using onErrorReturnWith

onErrorReturnWith is similar to onErrorReturn, but instead of providing a particular value, you need to pass a function.

  Observable<T> onErrorReturnWith(T Function(dynamic error) returnFn) =>
      transform(OnErrorResumeStreamTransformer<T>(
          (dynamic e) => Observable<T>.just(returnFn(e))));

The function receives the error and returns a Stream. Inside, you can write your own logic to set what should be returned based on the emitted error.

Example:

  Observable.range(1, 4)
      .flatMap((value) {
        return value < 4 ? Observable.just(value) : Observable.error(new MyException());
      })
      .onErrorReturnWith((dynamic e) {
        return e is MyException ? -1 : 0;
      })
      .listen(print);

Output:

  1
  2
  3
  -1

Using onErrorResume

onErrorResume intercepts the error event from the source Observable and uses a function that returns a Stream to replace the value.

  Observable<T> onErrorResume(Stream<T> Function(dynamic error) recoveryFn) =>
      transform(OnErrorResumeStreamTransformer<T>(recoveryFn));

The function receives a parameter of type dynamic which is the emitted error and it needs to return a Stream. The error is caught and the value returned by the Stream will be passed to the next operator, given the Stream doesn't throw error.

Example:

  Observable.range(1, 4)
      .flatMap((value) {
        return value < 4 ? Observable.just(value) : Observable.error(new MyException());
      })
      .onErrorResume((dynamic e) => new Observable.just(e is MyException ? -1 : 0))
      .listen(print);

Output:

  1
  2
  3
  -1

Using onErrorResumeNext

onErrorResumeNext is similar to onErrorResume, but instead of passing a function, you can only pass the Stream directly.

  Observable<T> onErrorResumeNext(Stream<T> recoveryStream) => transform(
      OnErrorResumeStreamTransformer<T>((dynamic e) => recoveryStream));

Example:

  Observable.range(1, 4)
      .flatMap((value) {
        return value < 4 ? Observable.just(value) : Observable.error(new MyException());
      })
      .onErrorResumeNext(new Observable.just(-1))
      .listen(print);

Output:

  1
  2
  3
  -1