Effortless Stream Management: Cancelling and Restarting Streams in Flutter Bloc

Effortless Stream Management: Cancelling and Restarting Streams in Flutter Bloc

·

5 min read

A simple guide to efficiently canceling and restarting streams for seamless app development

Introduction

In this article, we'll explore the lesser-known aspects of canceling and restarting streams in Flutter Bloc. This discussion builds on a previous article of mine, which you can find at https://henryadu.hashnode.dev/how-to-use-event-transformers-with-bloc#heading-event-transformers-with-streams. In that article, I briefly covered the interaction between emit.forEach and emit.onEach with streams.

Now, let's dive straight into the topic.

CounterRepository

class CounterRepository {
  const CounterRepository();

  // a stream of integers
  Stream<int> get countStream async* {
    var i = 0;

    while (true) {
      await Future<void>.delayed(const Duration(seconds: 1));
      yield i++;
    }
  }
}

The above method is an asynchronous generator that continuously yields a stream of increasing integers, starting from 0, with a delay of one second between each yield.

We're going to listen to that with the help of this event.

CounterEvent

part of "counter_bloc.dart";

@freezed
class CounterEvent with _$CounterEvent {
  const factory CounterEvent.streamStarted({
    required bool shouldStart,
  }) = _StreamStarted;
}

The shouldStart variable is supposed to help us cancel the stream or not. We're listening to it in the CounterBloc

class CounterBloc extends Bloc<CounterEvent, CounterState> {
  CounterBloc({required CounterRepository counterRepository})
      : _counterRepository = counterRepository,
        super(const _CounterState()) {
    on<_StreamStarted>(_onStreamStarted, transformer: restartable());
  }

  final CounterRepository _counterRepository;

  FutureOr<void> _onStreamStarted(
    _StreamStarted event,
    Emitter<CounterState> emit,
  ) async {
    final shouldStart = event.shouldStart;

    if (!shouldStart) return;

    await emit.forEach(
      _counterRepository.countStream,
      onData: (count) => state.copyWith(count: count),
    );
  }

  @override
  void onTransition(Transition<CounterEvent, CounterState> transition) {
    super.onTransition(transition);
    debugPrint("CounterBloc: $transition");
  }
}

When you call the StreamStarted event in your widget, you can specify if the stream bloc should start listening to the stream or not. If false is passed to the bloc event, the previous stream is terminated. If it's the value is true the stream starts if it hasn't started yet or it restarts if it has started.

In the CounterPage we have this:

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text("Counter"),
      ),
      body: BlocBuilder<CounterBloc, CounterState>(
        builder: (context, state) {
          return Center(
            child: Text(
              "Count: ${state.count}",
              style: const TextStyle(fontSize: 24),
            ),
          );
        },
      ),
      floatingActionButton: Column(
        mainAxisAlignment: MainAxisAlignment.end,
        children: [
          FloatingActionButton(
            onPressed: () => context.read<CounterBloc>().add(
                  const CounterEvent.increment(),
                ),
            child: const Icon(Icons.add),
          ),
          const SizedBox(height: 10),
          FloatingActionButton(
            onPressed: () => context.read<CounterBloc>().add(
                  const CounterEvent.decrement(),
                ),
            child: const Icon(Icons.remove),
          ),
          const SizedBox(height: 10),
          FloatingActionButton(
            onPressed: () => context.read<CounterBloc>().add(
                  const CounterEvent.streamStarted(
                    shouldStart: true,
                  ),
                ),
            child: const Icon(Icons.start),
          ),
          const SizedBox(height: 10),
          FloatingActionButton(
            onPressed: () => context.read<CounterBloc>().add(
                  const CounterEvent.streamStarted(
                    shouldStart: false,
                  ),
                ),
            child: const Icon(Icons.stop),
          ),
        ],
      ),
    );
  }

So, if shouldStart is true, the function starts monitoring the countStream and for every data event on the stream, it updates the state by employing the state.copyWith(count: count) method and broadcasts this updated state.

Pressing start yields this:

flutter: CounterBloc: Transition { currentState: CounterState(count: 0), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 0) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 0), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 1) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 1), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 2) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 2), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 3) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 3), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 4) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 4), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 5) }

And when the event is added again with shouldStart being true, we get:

flutter: CounterBloc: Transition { currentState: CounterState(count: 5), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 0) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 0), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 1) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 1), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 2) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 2), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 3) }
flutter: CounterBloc: Transition { currentState: CounterState(count: 3), event: CounterEvent.streamStarted(shouldStart: true), nextState: CounterState(count: 4) }

As you can see the count variable moves from 5 to 0, showing that the stream has restarted. If you pass false to the event, the stream is simply canceled and there's no state change.

This implementation, coupled with the restartable event transformer, provides the flexibility to either cease monitoring the stream or restart it with a new value.

This feature is particularly valuable in stream based scenarios, where you might need to keep track of a data stream based on a certain value. For example, fetching the most recent weather updates or retrieving the latest user information. For example;

Reminder Stream

  Stream<IList<ReminderModel>> getRemindersStream({
    required String userId,
  }) {
    return _firebaseFirestore
        .collection("admins")
        .doc(userId)
        .collection("reminders")
        .snapshots()
        .map((snapshot) {
      return snapshot.docs.map((e) {
        return ReminderModel.fromJson(e.data());
      }).toIList();
    });
  }

In the snippet above, we're listening to a stream from Firebase that returns a list of user-generated reminders. When a user logs out and another user logs in, they expect to see their own reminders.

on<_ReminderGenerated>(_onReminderGenerated, transformer: restartable());

...  

FutureOr<void> _onReminderGenerated(
    _ReminderGenerated event,
    Emitter<ReminderState> emit,
  ) async {
    await emit.forEach(
      _reminderRepository.getRemindersStream(userID: event.userID),
      onData: (reminders) => state.copyWith(reminderList: reminders),
    );
  }

If you pass this stream to your bloc without specifying an event transformer, you'll discover that you can still see the reminders of the previously logged-in user. You can test this using the Counter example mentioned above. This occurs because, by default, bloc events are concurrently added, so the previously running stream is never canceled and continues running, consequently corrupting your data.

Avoid Cloud Firestore Errors

Another situation that might arise is you getting permission-related cloud firestore errors because the current user has logged but you're still listening to the stream in your bloc. Now, you can check if the userId is null and then cancel the stream by returning early.

That's all for now, subscribe to my newsletter if you found this valuable.

Did you find this article valuable?

Support Henry's blog by becoming a sponsor. Any amount is appreciated!