Part 3: Concurrency in State Management
← All Articles

Concurrency in State Management

How do popular state management libraries handle concurrent async operations? Spoiler: not all of them do.

All analysis is based on reading the source code of these libraries.

In this article:

  • flutter_bloc — 4 EventTransformer strategies, per-event granularity
  • riverpod — why there's no built-in concurrency for methods
  • control — clean Mutex-based mixins as a reference implementation
  • Side-by-side: "Add to Cart" with rapid clicks
  • Concurrency comparison matrix and pitfalls

#flutter_bloc: Concurrency via Stream Transformers

Event Processing Pipeline

Every add(Event) passes through:

add(Event) → StreamController.broadcast
  → .where(type)
  → EventTransformer
  → handler(event, emit)
  → emit(state)

Emitter: Protection Against Invalid Emits

The _Emitter wraps emit() with safeguards:

  • _isCanceled — if emitter is cancelled (restartable), call(state) is a no-op
  • _isCompleted — handler already finished → assert
  • isDone — user check: if (!emit.isDone) emit(newState)

When restartable cancels a previous handler, all its emit() calls are silently ignored.

EventTransformers: 4 Strategies

1. Concurrent (default)

// flatMap — all handlers run in parallel
static EventTransformer<dynamic> transformer = (events, mapper) {
  return events.map(mapper)
    .transform<dynamic>(const _FlatMapStreamTransformer<dynamic>());
};
add(A) → handler_A starts ──────── emit(stateA)
add(B) →   handler_B starts ──────── emit(stateB)
add(C) →     handler_C starts ──────── emit(stateC)

Emit order is non-deterministic.

2. Sequential

EventTransformer<Event> sequential<Event>() {
  return (events, mapper) => events.asyncExpand(mapper);
}

One line. asyncExpand waits for current to complete. Strict FIFO.

add(A) → handler_A ── emit ── done |
add(B)                              → handler_B ── emit ── done |
add(C)                                                          → handler_C

3. Droppable

EventTransformer<Event> droppable<Event>() {
  return (events, mapper) =>
    events.transform(_ExhaustMapStreamTransformer(mapper));
}

If a handler is executing → new event is ignored.

add(A) → handler_A ──── emit ──── done
add(B) → DROPPED
add(C) → DROPPED
add(D)                             → handler_D ──── emit

4. Restartable

EventTransformer<Event> restartable<Event>() {
  return (events, mapper) => events.switchMap(mapper);
}

switchMap — cancel previous, subscribe to new.

add(Search("a"))   → handler_A ── CANCELLED
add(Search("ab"))  →   handler_B ── CANCELLED
add(Search("abc")) →     handler_C ──────── emit(results)
The HTTP request is NOT cancelled automatically. Only the emit is cancelled. For HTTP cancellation, use CancelToken or similar.

Per-Event-Type Granularity

A key Bloc advantage — transformer is set per event type:

class SearchBloc extends Bloc<SearchEvent, SearchState> {
  SearchBloc() : super(SearchInitial()) {
    on<SearchQueryChanged>(_onQuery, transformer: restartable());
    on<FilterChanged>(_onFilter, transformer: sequential());
    on<SearchCleared>(_onCleared); // concurrent (default)
  }
}

This is impossible in riverpod and control.


#riverpod: Concurrency (or Lack Thereof)

Two Levels of Async

Riverpod has two distinct async contexts:

  1. build() method — provider initialization/recreation
  2. Notifier methods — user actions (addTodo, increment, etc.)

They have fundamentally different concurrency behavior.

build() — Pseudo-Restartable

On invalidation (ref.invalidate(), dependency change), build() runs again. The old build() continues executing, but its result is ignored (if (!running) return).

Similar to restartable, but with an important difference: cancellation is cooperative. The Future isn't killed. The HTTP request keeps running, its result just won't be applied.

User Methods — NO Protection

For addTodo(), increment()no built-in concurrency mechanism.

@riverpod
class TodoList extends _$TodoList {
  @override
  Future<List<Todo>> build() async => _fetchTodos();

  Future<void> addTodo(Todo todo) async {
    state = const AsyncLoading();
    state = await AsyncValue.guard(() async {
      await http.post(/* ... */);
      return _fetchTodos();
    });
  }
}

3 rapid addTodo calls:

All three POST requests fire in parallel.
Completion order is NOT guaranteed!

addTodo(B) completes first → state = [A,B]
addTodo(A) completes after → state = [A]     ← REGRESSION!
addTodo(C) completes last  → state = [A,B,C]

UI: Loading → [A,B] → [A] → [A,B,C]  ← flickering!

Fix Attempt 1: ref.onDispose

Future<void> addTodo(Todo todo) async {
  var cancelled = false;
  ref.onDispose(() => cancelled = true);
  // ...
  if (!cancelled) state = result;
}

Problems:

  • ref.onDispose fires on provider invalidation, not on repeated addTodo calls
  • Can't cancel one addTodo from another
  • If the notifier is recreated, state = on old instance throws

Fix Attempt 2: Mutex

@riverpod
class TodoList extends _$TodoList {
  final _mutex = Mutex();

  Future<void> addTodo(Todo todo) async {
    await _mutex.synchronize(() async { /* ... */ });
  }
}

Lifecycle problem: on ref.invalidate(), the Notifier is recreated → new Mutex. Old Mutex orphaned.

TodoList v1: _mutex(v1).lock() → addTodo(A) → awaiting...
ref.invalidate() → v1 disposed, v2 created
TodoList v2: _mutex(v2) — empty, no protection

Practical Workaround

Future<void> addTodo(Todo todo) async {
  if (state is AsyncLoading) return; // manual droppable
  state = const AsyncLoading();
  // ...
}

Works, but requires boilerplate in every method.


#control: Concurrency via Mutex Mixins

Note: control is not a mainstream state management package. It's a well-architected utility library — an excellent reference for how concurrency primitives integrate cleanly into state management.

Sequential

mixin SequentialControllerHandler on Controller {
  final Mutex _$mutex = Mutex();

  @override
  Future<T?> handle<T>(
    Future<T> Function() handler, { ... }
  ) => _$mutex.synchronize<T?>(
    () => super.handle<T>(handler, ...),
  );
}

The entire strategy in one expression.

Droppable

mixin DroppableControllerHandler on Controller {
  final Mutex _$mutex = Mutex();

  @override
  Future<T?> handle<T>(
    Future<T> Function() handler, { ... }
  ) {
    if (_$mutex.locked) return Future<T?>.value(null);
    return _$mutex.synchronize<T?>(
      () => super.handle<T?>(handler, ...),
    );
  }
}

The _$mutex.locked check is synchronous — no race between check and acquisition. In single-threaded Dart, no await means no interleaving.

Clean, minimal, correct.


#Side-by-Side: "Add to Cart" with Rapid Clicks

flutter_bloc

class CartBloc extends Bloc<CartEvent, CartState> {
  CartBloc(this._repo) : super(const CartState.initial()) {
    on<AddToCart>((event, emit) async {
      emit(state.copyWith(status: CartStatus.loading));
      await _repo.addToCart(event.product);
      final cart = await _repo.getCart();
      emit(state.copyWith(status: CartStatus.loaded, items: cart));
    }, transformer: sequential());
  }
}

3 clicks → 3 sequential requests → [A, B, C]. Correct.

riverpod

@riverpod
class CartNotifier extends _$CartNotifier {
  @override
  Future<CartState> build() async => CartState.initial();

  Future<void> addToCart(Product product) async {
    state = const AsyncLoading();
    state = await AsyncValue.guard(() async {
      await _repo.addToCart(product);
      return CartState.loaded(await _repo.getCart());
    });
  }
}

3 clicks → 3 parallel requests → state depends on completion order. Possible data loss.

control

class CartController extends StateController<CartState>
    with SequentialControllerHandler {

  Future<void> addToCart(Product product) => handle(() async {
    setState(state.copyWith(status: CartStatus.loading));
    await _repo.addToCart(product);
    final cart = await _repo.getCart();
    setState(state.copyWith(status: CartStatus.loaded, items: cart));
  });
}

3 clicks → 3 sequential requests → [A, B, C]. Correct. Same as Bloc, via Mutex mixin.


#Concurrency Matrix

Strategies

Strategy flutter_bloc riverpod control
Concurrent ✅ Default ✅ Default (only) ✅ Default
Sequential ✅ asyncExpand ❌ Manual ✅ Mutex mixin
Droppable ✅ exhaustMap ❌ Manual ✅ Mutex mixin
Restartable ✅ switchMap ⚠️ build() only
Custom ✅ Any operator

Key Characteristics

Aspect flutter_bloc riverpod control
Granularity Per-event-type None Per-controller
Emit cancellation ✅ Automatic ⚠️ build() only
HTTP cancellation Manual Manual Manual
Mechanism Stream operators None LinkedMutex
Lifecycle-safe ✅ close() ⚠️ invalidate() ✅ dispose()

#Pitfalls

flutter_bloc

  1. Default is concurrent. No transformer = parallel handlers = race conditions in async handlers.
  2. One handler per event type. Need subtypes for different logic.
  3. Stream overhead. Each on<E>() creates StreamController + Subscription + filter + Emitter.

riverpod

  1. No built-in concurrency for methods. Most serious for write-heavy apps.
  2. Lifecycle vs Mutex. Notifier recreation kills Mutex state.
  3. Complex ref.invalidateSelf() control flow.

control

  1. Per-controller, not per-method. Can't mix strategies within one controller.
  2. No restartable. Can't auto-cancel previous searches.
  3. Handler runs to completion even after dispose (setState becomes no-op).

#Conclusions

Read-heavy apps (data loading, display)

Any library works. Riverpod shines with reactive dependency graph and AsyncValue.

Write-heavy apps (forms, carts, chats, real-time)

flutter_bloc — strongest choice. 4 strategies, per-event granularity, automatic emit cancellation.

control — clean reference for Mutex-based concurrency. Elegant per-controller solution. Great for studying the pattern.

riverpod — requires manual work. No built-in strategies, lifecycle complications with Mutex.

Recommendation

  1. Need flexible concurrency control → flutter_bloc with bloc_concurrency
  2. Want to study clean Mutex architecture → control as a reference
  3. Using riverpod → manage concurrency consciously: state is AsyncLoading checks, UI-level debounce, or Mutex at repository layer
Concurrent by default is dangerous for async operations that depend on state. Conscious strategy choice is a sign of mature architecture.