Asynchronous Programming in Flutter (Dart) | Streams

Jackie Moraa
6 min readMay 17, 2024

--

In the last article, we introduced the concept of asynchronous operations with a focus on Futures. In case you missed it, read up on it here.

In today’s article, we’ll be focusing on Streams.

As we previously discussed, futures represent a single asynchronous computation. However, unlike futures, streams represent a sequence of asynchronous events. Imagine a pipe or a tube where water flows in from one end and flows out from the other end — that’s essentially the concept behind streams.

Therefore, a stream can be thought of as a data pipe where values are put on one end from a source and on the other end there is a listener (subscriber). When all pieces of data have been emitted (either as values or errors), an event signalling that the stream is done will notify these listeners. You can then consume/process the data as needed.

It is a unidirectional flow which separates the concerns between the event emitters and event consumers and this in turn, promotes efficient data flow management.

Streams are widely used in scenarios like: fetching data over a network or APIs, real-time communication (e.g., chat applications), processing large datasets etc.

Creating a Stream
To create a stream in Dart you can use the Stream class directly or use the StreamController class.

Stream
The Stream class from the dart:async package provides the basics for async programming using streams in Dart. It is especially used when you have a fixed set of events to emit because it is static. That is; one can’t add new elements to the stream dynamically. (If a more dynamic stream of events is required, consider the StreamController instead.)

import 'dart:async';

void main() {
Stream<int> intStream = Stream<int>.fromIterable([1, 2, 3, 4, 5]);
intStream.listen((data) {
print(data); // Prints numbers from 1 to 5
});
}

In the above example, we have a created a stream called intStream that emits data of type int (integers). .fromIterable([1, 2, 3, 4, 5]) is a factory constructor which will create a stream from a list of integers containing the numbers 1 to 5. This stream will then emit these values to its listeners.

The listen method takes a callback function as an argument which receives the emitted data from the stream and prints it to the console one at a time.

StreamController
As mentioned above, if you are working with a dynamic stream where you could be having multiple events at different times, then use the StreamController. With a StreamController you can add events to the stream at any point in time. To send data events to the stream, use add or addStream.

Let’s rewrite our code above using StreamController instead.

import 'dart:async';

void main() {
StreamController<int> controller = StreamController<int>();
Stream<int> intStream = controller.stream;

// Listen to the stream
intStream.listen((data) {
print(data);
});

// Dynamically adding data to the stream
// This data can be from a previous computation
for (int i = 1; i <= 5; i++) {
controller.add(i);
}

// Close the stream to avoid memory leaks
controller.close();
}

In the above example, declare a StreamController that will manage the stream of integers. controller.stream will then get the stream managed by the StreamController and we listen to this stream just like in the previous Stream demo code.

The major difference between the two implementation comes in with controller.add(i) which dynamically adds data to the stream. The integers 1 through 5 are added to the stream sequentially. This could be any data that comes from the application that is computed or received at different points/times.

Finally, controller.close() closes the stream. Just like with other controllers in Dart, it is important to close the stream when done. This allows the resources associated with the controller to be released and used elsewhere in the system.

Key differences between Stream and StreamController
Though clear from the examples above, let us summarise the key differences between the Stream and StreamController below:

Read/Write: A Stream is read-only while a StreamController on the other hand is writable.

Passive/Active: Inline with point #1 above, a Stream is referred to as a Passive entity since you can only listen/subscribe to a stream to get the data emitted but you can’t directly modify the data in the stream. A StreamController is referred to as an Active entity because you have a way to control the data flow within the stream and you can decide when and what data to add.

Usage: Use a Stream when you are working with static data (or if you already have a known data source) where the only activity is listening, processing and/or reacting to the events. Use a StreamController when you are working with dynamic data and you want to control/manage this data over time i.e., dictate how and when the data will be added to the stream of events.

Analogy from Gemini:
Think of a
Stream as a river. It carries water (data) from its source (data generation mechanism) to the destination (listeners). You can't control the flow of water in the river itself, but you can build structures (stream controllers) along the river to regulate the flow (adding or stopping water flow).

StreamBuilder
Recall FutureBuilder from the previous article? Well, we have a similar widget for working with Streams in Flutter called the StreamBuilder.

The StreamBuilder widget takes in a stream and a builder method and rebuilds the UI based on the latest snapshot of that stream. That is, it listens to the stream and rebuilds the widget tree anytime the stream emits a new event.

The StreamBuilder constructor takes in two required parameters:

  • stream: The Stream which the StreamBuilder will listen to, whose result you want to build your UI based on.
  • builder: This is a callback function that takes two arguments: the BuildContext and the AsyncSnapshot of the data returned by the Future. This method is called anytime the state of the Stream snapshot changes, and it is therefore responsible for building the UI based on that state.

connectionState indicates the current state of the connection in relation to the asynchronous computation. It can be none, waiting, active, or done. (I explained them in the previous article)

Below is a simple UI example, showing the StreamBuilder in action.
We are simulating a “heart rate monitor”.

class PulseReaderScreen extends StatefulWidget {
const PulseReaderScreen({super.key});

@override
State<PulseReaderScreen> createState() => _PulseReaderScreenState();
}

class _PulseReaderScreenState extends State<PulseReaderScreen> {
late StreamController _pulseController;
late Stream _pulseStream;
late Timer _timer;
final Random _random = Random();

@override
void initState() {
super.initState();
_pulseController = StreamController<int>();
_pulseStream = _pulseController.stream;
_startPulseSimulation();
}

@override
void dispose() {
_timer.cancel();
_pulseController.close();
super.dispose();
}

void _startPulseSimulation() {
const duration = Duration(seconds: 1);
_timer = Timer.periodic(duration, (_) {
final int pulse = 60 + _random.nextInt(40);
_pulseController.add(pulse);
});
}

@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('Moraa, Jackie (Adult)'),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
StreamBuilder(
stream: _pulseStream,
builder: (BuildContext context, AsyncSnapshot snapshot) {
if (snapshot.connectionState == ConnectionState.waiting) {
return const Text(
'Waiting for pulse data...',
style: TextStyle(fontSize: 24),
);
} else if (snapshot.hasError) {
return Text(
'Error: ${snapshot.error}',
style: const TextStyle(fontSize: 24),
);
} else if (!snapshot.hasData) {
return const Text(
'No pulse data available',
style: TextStyle(fontSize: 24),
);
} else {
return Column(
children: [
const Text('Current Pulse:', style: TextStyle(fontSize: 24)),
Text(
'${snapshot.data} BPM',
style: const TextStyle(fontSize: 48, fontWeight: FontWeight.bold),
),
],
);
}
},
),
],
),
),
);
}
}

Code explanation
In the above code, we have a StreamController _pulseController which manages the stream which is our pulse data. We are generating random integers between 60 and 100 to be assigned to the pulse variable (BPM) and this is done every second based on the timer duration.

The StreamBuilder listens to the pulse data stream and updates the UI based on the latest data emitted. You can see we are handling the different states such as waiting for data, error, no data, and finally displaying the pulse data.

The resulting UI is shown below …

That’s it for this article! And just like my disclaimer with the last article: There’s quite a lot of information available on Streams which I haven’t even touched on in this article e.g., single subscription vs broadcast, how to subscribe to a stream etc. So take time to read through the docs and read on them. I have linked some of the docs in the reference section below.

A programmer had a problem. He thought to himself, “I know, I’ll solve it with async!” has Now problems. two he
— Anonymous

Once again, thank you for reading! ❤

--

--