A simple guide to efficiently canceling and restarting streams for seamless app development
Introduction
In this article, we'll explore the lesser-known aspects of canceling and restarting streams
in Flutter Bloc. This discussion builds on a previous article of mine, which you can find at https://henryadu.hashnode.dev/how-to-use-event-transformers-with-bloc#heading-event-transformers-with-streams. In that article, I briefly covered the interaction between emit.forEach
and emit.onEach
with streams
.
Now, let's dive straight into the topic.
CounterRepository
class CounterRepository {
const CounterRepository();
// a stream of integers
Stream<int> get countStream async* {
var i = 0;
while (true) {
await Future<void>.delayed(const Duration(seconds: 1));
yield i++;
}
}
}
The above method is an asynchronous generator that continuously yields
a stream
of increasing integers, starting from 0, with a delay of one second between each yield.
We're going to listen to that with the help of this event
.
CounterEvent
part of "counter_bloc.dart";
@freezed
class CounterEvent with _$CounterEvent {
const factory CounterEvent.streamStarted({
required bool shouldStart,
}) = _StreamStarted;
}
The shouldStart variable is supposed to help us cancel the stream
or not. We're listening to it in the CounterBloc
class CounterBloc extends Bloc<CounterEvent, CounterState> {
CounterBloc({required CounterRepository counterRepository})
: _counterRepository = counterRepository,
super(const _CounterState()) {
on<_StreamStarted>(_onStreamStarted, transformer: restartable());
}
final CounterRepository _counterRepository;
FutureOr<void> _onStreamStarted(
_StreamStarted event,
Emitter<CounterState> emit,
) async {
final shouldStart = event.shouldStart;
if (!shouldStart) return;
await emit.forEach(
_counterRepository.countStream,
onData: (count) => state.copyWith(count: count),
);
}
@override
void onTransition(Transition<CounterEvent, CounterState> transition) {
super.onTransition(transition);
debugPrint("CounterBloc: $transition");
}
}
When you call the StreamStarted
event in your widget, you can specify if the stream bloc should start listening to the stream or not. If false
is passed to the bloc event, the previous stream is terminated. If it's the value is true
the stream starts if it hasn't started yet or it restarts if it has started.
In the CounterPage
we have this:
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text("Counter"),
),
body: BlocBuilder<CounterBloc, CounterState>(
builder: (context, state) {
return Center(
child: Text(
"Count: ${state.count}",
style: const TextStyle(fontSize: 24),
),
);
},
),
floatingActionButton: Column(
mainAxisAlignment: MainAxisAlignment.end,
children: [
FloatingActionButton(
onPressed: () => context.read<CounterBloc>().add(
const CounterEvent.increment(),
),
child: const Icon(Icons.add),
),
const SizedBox(height: 10),
FloatingActionButton(
onPressed: () => context.read<CounterBloc>().add(
const CounterEvent.decrement(),
),
child: const Icon(Icons.remove),
),
const SizedBox(height: 10),
FloatingActionButton(
onPressed: () => context.read<CounterBloc>().add(
const CounterEvent.streamStarted(
shouldStart: true,
),
),
child: const Icon(Icons.start),
),
const SizedBox(height: 10),
FloatingActionButton(
onPressed: () => context.read<CounterBloc>().add(
const CounterEvent.streamStarted(
shouldStart: false,
),
),
child: const Icon(Icons.stop),
),
],
),
);
}
So, if shouldStart
is true, the function starts monitoring the countStream
and for every data event
on the stream
, it updates the state
by employing the state.copyWith(count: count)
method and broadcasts this updated state
.
Pressing start yields this:
flutter: CounterBloc: Transition { currentState: CounterState(count: 0), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 0) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 0), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 1) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 1), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 2) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 2), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 3) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 3), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 4) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 4), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 5) }
And when the event is added again with shouldStart being true, we get:
flutter: CounterBloc: Transition { currentState: CounterState(count: 5), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 0) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 0), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 1) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 1), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 2) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 2), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 3) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 3), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 4) }
As you can see the count
variable moves from 5 to 0, showing that the stream
has restarted. If you pass false
to the event, the stream
is simply canceled and there's no state
change.
This implementation, coupled with the restartable event transformer, provides the flexibility to either cease monitoring the stream or restart it with a new value.
This feature is particularly valuable in stream based scenarios, where you might need to keep track of a data stream based on a certain value. For example, fetching the most recent weather updates or retrieving the latest user information. For example;
Reminder Stream
Stream<IList<ReminderModel>> getRemindersStream({
required String userId,
}) {
return _firebaseFirestore
.collection("admins")
.doc(userId)
.collection("reminders")
.snapshots()
.map((snapshot) {
return snapshot.docs.map((e) {
return ReminderModel.fromJson(e.data());
}).toIList();
});
}
In the snippet above, we're listening to a stream from Firebase that returns a list of user-generated reminders. When a user logs out and another user logs in, they expect to see their own reminders.
on<_ReminderGenerated>(_onReminderGenerated, transformer: restartable());
...
FutureOr<void> _onReminderGenerated(
_ReminderGenerated event,
Emitter<ReminderState> emit,
) async {
await emit.forEach(
_reminderRepository.getRemindersStream(userID: event.userID),
onData: (reminders) => state.copyWith(reminderList: reminders),
);
}
If you pass this stream
to your bloc without specifying an event transformer, you'll discover that you can still see the reminders of the previously logged-in user. You can test this using the Counter
example mentioned above. This occurs because, by default, bloc events are concurrently added, so the previously running stream
is never canceled and continues running, consequently corrupting your data.
Avoid Cloud Firestore Errors
Another situation that might arise is you getting permission-related cloud firestore errors because the current user has logged but you're still listening to the stream
in your bloc. Now, you can check if the userId is null and then cancel the stream
by returning early.
That's all for now, subscribe to my newsletter if you found this valuable.