Flutter - Using StreamController and StreamSubscription

Sometimes you may want to send data in a stream. A stream consists of data sequence. Data can be pushed to a stream, while one or more listeners subscribe to the stream and get aware every time a new data added to the stream. In this tutorial, I'm going to show you how to send data using a stream, including how to create a stream controller, send data to the stream, and create the listener which includes function for handling events like data received, error occurred and task finished. This tutorial is for Flutter, but it can be implemented on any framework using Dart language.

Dependencies

Stream is a built-in Dart library. So, we don't need to install any dependency. To be able to use stream, import the async library.

  import 'dart:async';

Code

Below is the code structure of this tutorial. We need to store a variable streamController which will be explained later.

  import 'dart:async';
  import 'package:flutter/material.dart';
  
  void main() => runApp(MyApp());
  
  class MyApp extends StatelessWidget {
    @override
    Widget build(BuildContext context) {
      return MaterialApp(
        title: 'Stream Example',
        home: StreamExample(),
      );
    }
  }
  
  class StreamExampleState extends State<StreamExample> {
    StreamController<String> streamController;
  
    @override
    void initState() {
      super.initState();

// Implement stream here } @override Widget build(BuildContext context) { print("build"); return Scaffold( appBar: AppBar( title: Text('Stream Example'), ), body: Center( child: Text( "Woolha.com" ) ), ); } Future<String> getData() async { await Future.delayed(Duration(seconds: 5)); //Mock delay print("Fetched Data"); return "This a test data"; } } class StreamExample extends StatefulWidget { @override StreamExampleState createState() => new StreamExampleState(); }

Create a StreamController

 To create a data stream, we can use StreamController. The constructor supports the following parameters.

Parameter Type
onListen Function
onPause Function
onResume Function
onResume Function
onCancel Function
sync boolean (default: false)

Below is the constructor example.

  class StreamExampleState extends State<StreamExample> {
void onPauseHandler() { print('on Pause'); } StreamController streamController; @override void initState() { super.initState(); streamController = new StreamController( onPause: onPauseHandler, ); }
}

To send data, use add method with the data as the argument.

  streamController.add("This a test data");

If error occurs, instead of sending data, you can send error by using addError method.

  streamController.addError(new Exception('An exception'));

Create a StreamSubscription

After creating a StreamController, We need something that can subscribe to the stream. We can create a StreamSubscription<T>, with T is data type (String in this example). To create a StreamSubscription, use streamController.stream.listen.

  StreamSubscription<String> subscription;
  subscription = streamController.stream.listen((data) {
    print("DataReceived: " + data);
  }, onDone: () {
    print("Task Done");
  }, onError: (error) {
    print("Some Error");
  });

The first parameter must be onListen function which will be executed every time a data received. There are three optional parameters: onDone onError and cancelOnError

Below is the list of supported parameters

Parameter Type
onListen (Required as first parameter) Function
onDone (Optional) Function
onError (Optional) Function
cancelOnError (Optional) bool

Below is the full code

  import 'dart:async';
  import 'package:flutter/material.dart';
  
  void main() => runApp(MyApp());
  
  class MyApp extends StatelessWidget {
    @override
    Widget build(BuildContext context) {
      return MaterialApp(
        title: 'Stream Example',
        home: StreamExample(),
      );
    }
  }
  
  class StreamExampleState extends State {
    void onPauseHandler() {
      print('on Pause');
    }
  
    StreamController streamController;
  
    @override
    void initState() {
      super.initState();
  
      streamController = new StreamController(
        onPause: onPauseHandler,
      );
  
      StreamSubscription subscription;
      subscription = streamController.stream.listen((data) {
        print("DataReceived: " + data);

        // Add 5 seconds delay
        // It will call onPause function passed on StreamController constructor
        subscription.pause(Future.delayed(const Duration(seconds: 5)));
      }, onDone: () {
        print("Task Done");
      }, onError: (error) {
        print("Some Error");
      });
  
      streamController.add("This a test data");
      streamController.addError(new Exception('An exception'));
      streamController.add("This a test data 2");
      streamController.close(); //Streams must be closed when not needed
      streamController.add("This a test data 3");
    }
  
    @override
    void dispose() {
      streamController.close();
      super.dispose();
    }
  
    @override
    Widget build(BuildContext context) {
      print("build");
  
      return Scaffold(
        appBar: AppBar(
          title: Text('Stream Example'),
        ),
        body: Center(
            child: Text(
                "Woolha.com"
            )
        ),
      );
    }
  }
  
  class StreamExample extends StatefulWidget {
    @override
    StreamExampleState createState() => new StreamExampleState();
  }

That's how to use StreamController along with StreamSubscription.