How to use event transformers with bloc

How to use event transformers with bloc

·

11 min read

A comprehensive guide for the concurrent, sequential, droppable and restartable transformers

Introduction

Event transformers in Flutter's Bloc library offer powerful tools for managing and manipulating asynchronous events in your application. However, most guides talk only about the droppable event transformer for debouncing asynchronous events.

Concurrent

The default transformer for all Bloc events is the concurrent one. This event transformer allows multiple events to be processed concurrently in a Bloc.

When you use the concurrent transformer, the Bloc does not wait for the current event to finish processing before it starts processing the next event. Instead, it starts processing each event as soon as it is added. This means that multiple events can be processed at the same time.

Let's take a look at a normal counter bloc

Concurrent CounterBloc

class CounterBloc extends Bloc<CounterEvent, CounterState> {
  CounterBloc() : super(const _CounterState()) {
    // No need to explicity pass concurrent
    on<_Increment>(_onIncrement, transformer: concurrent());
    on<_Decrement>(_onDecrement, transformer: concurrent());
  }

  FutureOr<void> _onIncrement(
    _Increment event,
    Emitter<CounterState> emit,
  ) async {
    debugPrint("CounterBloc: _onIncrement");
    emit(state.copyWith(count: state.count + 1));
  }

  FutureOr<void> _onDecrement(
    _Decrement event,
    Emitter<CounterState> emit,
  ) async {
    debugPrint("CounterBloc: _onDecrement");
    emit(state.copyWith(count: state.count - 1));
  }

  @override
  void onTransition(Transition<CounterEvent, CounterState> transition) {
    super.onTransition(transition);
    debugPrint("CounterBloc: $transition");
  }
}

If we were to add the _Increment event three times this is what we'll get in our debug console

Concurrent CounterBloc logs without delay

flutter: CounterBloc: _onIncrement

flutter: CounterBloc: Transition { currentState: CounterState(count: 0),
 event: CounterEvent.increment(), nextState: CounterState(count: 1) 
}

flutter: CounterBloc: _onIncrement

flutter: CounterBloc: Transition { currentState: CounterState(count: 1),
 event: CounterEvent.increment(), nextState: CounterState(count: 2) }

flutter: CounterBloc: _onIncrement

flutter: CounterBloc: Transition { currentState: CounterState(count: 2),
 event: CounterEvent.increment(), nextState: CounterState(count: 3) }

Each time an increment button is pressed, our message, CounterBloc: _onDecrement is printed to the console, and so is the transition.

Let us add a delay of two seconds before incrementing the count in our CounterState.

Concurrent CounterBloc with delay

class CounterBloc extends Bloc<CounterEvent, CounterState> {
  CounterBloc() : super(const _CounterState()) {
    // No need to explicity pass concurrent
    on<_Increment>(_onIncrement, transformer: concurrent());
    on<_Decrement>(_onDecrement, transformer: concurrent());
  }

  FutureOr<void> _onIncrement(
    _Increment event,
    Emitter<CounterState> emit,
  ) async {
    debugPrint("CounterBloc: _onIncrement");
    await Future<void>.delayed(const Duration(seconds: 2), () {
      debugPrint("Two seconds later...");
    });
    emit(state.copyWith(count: state.count + 1));
  }

  FutureOr<void> _onDecrement(
    _Decrement event,
    Emitter<CounterState> emit,
  ) async {
    debugPrint("CounterBloc: _onDecrement");
    await Future<void>.delayed(const Duration(seconds: 2), () {
      debugPrint("Two seconds later...");
    });

    emit(state.copyWith(count: state.count - 1));
  }

  @override
  void onTransition(Transition<CounterEvent, CounterState> transition) {
    super.onTransition(transition);
    debugPrint("CounterBloc: $transition");
  }
}

Concurrent CounterBloc logs with delay

3 flutter: CounterBloc: _onIncrement

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 0),
 event: CounterEvent.increment(), nextState: CounterState(count: 1) }

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 1),
 event: CounterEvent.increment(), nextState: CounterState(count: 2) }

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 2),
 event: CounterEvent.increment(), nextState: CounterState(count: 3) }

As you can see the statement CounterBloc: _onIncrement was printed three times before our events were added in quick succession.

This transformer can be useful in situations where you want to optimize for speed and don't care about the order in which events are processed like in this context where were just want to increment the counter three times.

However, it can also lead to race conditions if the processing of one event depends on the result of another event. In such cases, you might want to use the sequential transformer instead, which ensures that events are processed one after another in the order they were added. As an example;

Concurrent BalanceBloc

class BalanceBloc extends Bloc<BalanceEvent, BalanceState> {
  BalanceBloc() : super(BalanceState(balance: 0)) {
    on<Deposit>(_onDeposit, transformer: concurrent());
    on<Withdraw>(_onWithdraw, transformer: concurrent());
  }

  FutureOr<void> _onDeposit(Deposit event, Emitter<BalanceState> emit) async {
    final newBalance = state.balance + event.amount;
    await Future.delayed(Duration(seconds: 2));
    emit(BalanceState(balance: newBalance));
  }

  FutureOr<void> _onWithdraw(Withdraw event, Emitter<BalanceState> emit) async {
    if (state.balance >= event.amount) {
      final newBalance = state.balance - event.amount;
      await Future.delayed(Duration(seconds: 2));
      emit(BalanceState(balance: newBalance));
    } else {
      emit(BalanceState(balance: state.balance, error: 'Insufficient funds'));
    }
  }
}

Imagine the balance is $100, and two events are dispatched almost simultaneously: Withdraw($50) and Withdraw($80).

With the concurrent transformer, these two events can be processed at the same time. Even if the _onWithdraw events check the balance, both events will see $100 and you'll end up with a negative balance.

This is a race condition caused by the concurrent transformer. To avoid this, you could use the sequential transformer, which would ensure that the first Withdraw event finishes before the second Withdraw event starts. Once the second Withdraw events it'll see the insufficient funds and respond accordingly.

Sequential

The sequential transformer provided ensures that events are processed one after another in the order they were added. This is useful when the processing of one event depends on the result of another event. This can be useful in situations where you want to ensure that events are processed in the order they were added like in the BalanceBloc above.

  CounterBloc() : super(const _CounterState()) {
    on<_Increment>(_onIncrement, transformer: sequential());
    on<_Decrement>(_onDecrement, transformer: sequential());
  }

The sequential transformer ensures that each _Increment or _Decrement event waits for the previous event to finish before it starts. We're still delaying our state update by two seconds.

 await Future<void>.delayed(const Duration(seconds: 2), () {
      debugPrint("Two seconds later...");
    });

Adding the _Increment event three times in a duration shorter than two seconds we get this:

Sequential CounterBloc logs

flutter: CounterBloc: _onIncrement

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 0),
 event: CounterEvent.increment(), nextState: CounterState(count: 1) }

flutter: CounterBloc: _onIncrement

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 1),
 event: CounterEvent.increment(), nextState: CounterState(count: 2) }

flutter: CounterBloc: _onIncrement

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 2),
 event: CounterEvent.increment(), nextState: CounterState(count: 3) }

Each event comes one after the other as you'd expect.

This transformer might be fine in scenarios like the BalanceBloc but what if you have a Chat Application?

Let's say you have a ChatBloc that manages sending messages in a chat application. Each time a user sends a message, a SendMessage event is dispatched. The processing of a SendMessage event involves a network request to send the message to the server.

Sequential ChatBloc

class ChatBloc extends Bloc<ChatEvent, ChatState> {
  ChatBloc() : super(ChatInitial()) {
    on<SendMessage>(_onSendMessage, transformer: sequential());
  }

  Future<void> _onSendMessage(SendMessage event, Emitter<ChatState> emit) async {
    emit(MessageSending());
    try {
      await Future.delayed(Duration(seconds: 2));
      emit(MessageSent(event.message));
    } catch (e) {
      emit(MessageFailed(e.toString()));
    }
  }
}

If you use the sequential transformer for the SendMessage event, it means that if a new SendMessage event is dispatched while a previous SendMessage event is still being processed (i.e., a network request is still in progress), the new event will wait for the previous event to finish before it starts processing.

This could make your app less responsive in situations where the network is slow or unreliable. If a user tries to send multiple messages quickly, they might have to wait a long time for all the messages to be sent, because each message has to wait for the previous message to be sent before it can be sent itself.

I know WhatsApp doesn't do this so it might be best for you to use the concurrent transformer instead, which allows multiple SendMessage events to be processed at the same time. This would allow all the messages to be sent simultaneously, making the app more responsive. However, it could also lead to messages being sent out of order if the network requests complete in a different order than they were started.

Droppable

The droppable transformer is useful when you want to ignore any events that are added while an event is currently being processed.

  CounterBloc() : super(const _CounterState()) {
    on<_Increment>(_onIncrement, transformer: droppable());
    on<_Decrement>(_onDecrement, transformer: droppable());
  }

In the context of our CounterBloc, using the droppable transformer for the _Increment event, it means that if an _Increment event is added while another _Increment event is still being processed (i.e., during the 2-second delay), the new event will be ignored and won't be processed.

Adding the _Decrement event three times in durations shorter than two seconds we get this:

Droppable CounterBloc logs

flutter: CounterBloc: _onIncrement

flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 0),
 event: CounterEvent.increment(), nextState: CounterState(count: 1) }

That's all we get. Pressing the increment button three times in a row yields only one state update because the other two were dropped.

This transformer is most useful where the order of events and their processing matters. Consider a scenario where you have a Firebase login functionality in your application .

If you call signInWithEmailAndPassword() too frequently from the same IP address, Firebase may start throttling or blocking requests. This is to prevent brute force attacks.

Droppable LoginBloc

class LoginBloc extends Bloc<LoginEvent, LoginState> {
  final FirebaseAuth _firebaseAuth;

  LoginBloc(this._firebaseAuth) : super(LoginInitial()) {
    on<LoginRequested>(_onLoginRequested, transformer: droppable());
  }

  Future<void> _onLoginRequested(LoginRequested event, Emitter<LoginState> emit) async {
    emit(LoginLoading());
    try {
      await _firebaseAuth.signInWithEmailAndPassword(email: event.email, password: event.password);
      emit(LoginSuccess());
    } catch (e) {
      emit(LoginFailure(e.toString()));
    }
  }
}

A LoginRequested event is dispatched whenever the user attempts to log in. The _onLoginRequested method attempts to sign in with Firebase Auth. If a new LoginRequested event is dispatched while a previous event is still being processed (i.e., a sign-in request is still in progress), the new event is ignored thanks to the droppable transformer. This can help prevent rate limiting by Firebase Auth if the user attempts to log in multiple times in quick succession.

Keep in mind that using the droppable transformer can lead to events being lost if they are dispatched too rapidly like if you're updating a user's location on a server.

For example, if you have a Location Bloc where a LocationUpdated event is dispatched every time the user's location changes. If the user is moving quickly because they're in a car, their location could change rapidly, leading to many LocationUpdated events being dispatched in a short period.

Droppable LocationBloc

class LocationBloc extends Bloc<LocationEvent, LocationState> {
  LocationBloc() : super(LocationInitial()) {
    on<LocationUpdated>(_onLocationUpdated, transformer: droppable());
  }

  Future<void> _onLocationUpdated(LocationUpdated event, Emitter<LocationState> emit) async {
    emit(LocationLoading());
    try {
      // Simulate a network request
      await Future.delayed(Duration(seconds: 2));
      emit(LocationSuccess(event.latitude, event.longitude));
    } catch (e) {
      emit(LocationFailure(e.toString()));
    }
  }
}

Using the droppable transformer for the LocationUpdated event means that if a new LocationUpdated event is dispatched while a previous LocationUpdated event is still being processed (i.e., a network request is still in progress to update the user's location on a server), the new event will be ignored.

This could be beneficial in this scenario because it prevents unnecessary network requests and possibly saves money. However, it also means that some location updates could be lost if they occur too rapidly. If every location update must be processed, you might want to use the sequential or concurrent transformer instead, depending on your specific use case.

Restartable

This transformer is best for when you want to cancel the processing of the current event and start processing a new event because you only care about the latest value.

  CounterBloc() : super(const _CounterState()) {
    on<_Increment>(_onIncrement, transformer: restartable());
    on<_Decrement>(_onDecrement, transformer: restartable());
  }

This is the output you'll get when you try to add three _Increment events in a short period with a two second delay.

 await Future<void>.delayed(const Duration(seconds: 2), () {
      debugPrint("Two seconds later...");
    });

Restartable CounterBloc logs

3 flutter: CounterBloc: _onIncrement

3 flutter: Two seconds later...

flutter: CounterBloc: Transition { currentState: CounterState(count: 0),
 event: CounterEvent.increment(), nextState: CounterState(count: 1) }

Our Two seconds later... message got called three times, unlike with the other transformers. This tells you that the processing of the event started, got cancelled, started again, got cancelled again, started for the third time and finally succeeded.

If you have an onChanged callback for a TextField or a search functionality in your app, this event fits it nicely. In these scenarios, since the most important event is the latest one, discarding the current ongoing event is not an issue.

Restartable SearchBloc

class SearchBloc extends Bloc<SearchEvent, SearchState> {
  SearchBloc() : super(SearchInitial()) {
    on<SearchUpdated>(_onSearchUpdated, transformer: restartable());
  }

  Future<void> _onSearchUpdated(SearchUpdated event, Emitter<SearchState> emit) async {
    emit(SearchLoading());
    try {
      await Future.delayed(Duration(seconds: 2));
      final results = ['Result 1', 'Result 2', 'Result 3'];
      emit(SearchSuccess(results));
    } catch (e) {
      emit(SearchFailure(e.toString()));
    }
  }
}

In the SearchBloc above if a new SearchUpdated event is dispatched while a previous event is still being processed, the processing of the previous event is cancelled and the new event starts processing immediately, thanks to the restartable transformer. Your Algolia queries might be cheaper now.

It looks like this transformer might be the easiest to use wrongly so make sure you know what you're doing.

Event transformers with Streams

Using emit.onEach and emit.forEach are the recommended patterns for subscribing to a stream since it allows the bloc to manage the subscription internally.

This is how they interact with event transformers:

  • Concurrent: With the concurrent event transformer you can have multiple stream subscriptions. Keep in mind that If you try to listen to a single-subscription stream more than once, you'll get a StateError.

  • Sequential and droppable: This transformer will cause new events i.e new stream subscriptions to wait until the current one emits done before it starts processing/subscribing. With these two you won't get a StateError if you call your stream event multiple times but there might be a backlog of stream subscriptions/events with the sequential transformer.

  • Restartable: The restartable event transformer will cancel the old subscription before resubscribing. This is good if you want to cancel a stream and listen to an identical one with different parameters or if you just want to stop listening to a stream based on a certain condition.

Conclusion

Mastering the event transformers comes with understanding how streams work. Once you have that understanding you can create customer stream transformers for any use-case. A debouncer example.

throttleDroppable

EventTransformer<E> throttleDroppable<E>(Duration duration) {
  return (events, mapper) {
    return droppable<E>().call(events.throttle(duration), mapper);
  };
}

With this custom transformer, all events occurring within a duration after the last one are cancelled.

For more ideas take a look at rxdart. Experimenting with custom transformers will not only optimize your application's performance but also enhance its responsiveness and user experience.

Did you find this article valuable?

Support Henry's blog by becoming a sponsor. Any amount is appreciated!