All analysis is based on reading the source code of these libraries.
In this article:
- flutter_bloc — 4 EventTransformer strategies, per-event granularity
- riverpod — why there's no built-in concurrency for methods
- control — clean Mutex-based mixins as a reference implementation
- Side-by-side: "Add to Cart" with rapid clicks
- Concurrency comparison matrix and pitfalls
#flutter_bloc: Concurrency via Stream Transformers
Event Processing Pipeline
Every add(Event) passes through:
add(Event) → StreamController.broadcast → .where(type) → EventTransformer → handler(event, emit) → emit(state)
Emitter: Protection Against Invalid Emits
The _Emitter wraps emit() with safeguards:
_isCanceled— if emitter is cancelled (restartable),call(state)is a no-op_isCompleted— handler already finished → assertisDone— user check:if (!emit.isDone) emit(newState)
When restartable cancels a previous handler, all its emit() calls are silently ignored.
EventTransformers: 4 Strategies
1. Concurrent (default)
// flatMap — all handlers run in parallel
static EventTransformer<dynamic> transformer = (events, mapper) {
return events.map(mapper)
.transform<dynamic>(const _FlatMapStreamTransformer<dynamic>());
};
add(A) → handler_A starts ──────── emit(stateA) add(B) → handler_B starts ──────── emit(stateB) add(C) → handler_C starts ──────── emit(stateC)
Emit order is non-deterministic.
2. Sequential
EventTransformer<Event> sequential<Event>() {
return (events, mapper) => events.asyncExpand(mapper);
}
One line. asyncExpand waits for current to complete. Strict FIFO.
add(A) → handler_A ── emit ── done | add(B) → handler_B ── emit ── done | add(C) → handler_C
3. Droppable
EventTransformer<Event> droppable<Event>() {
return (events, mapper) =>
events.transform(_ExhaustMapStreamTransformer(mapper));
}
If a handler is executing → new event is ignored.
add(A) → handler_A ──── emit ──── done add(B) → DROPPED add(C) → DROPPED add(D) → handler_D ──── emit
4. Restartable
EventTransformer<Event> restartable<Event>() {
return (events, mapper) => events.switchMap(mapper);
}
switchMap — cancel previous, subscribe to new.
add(Search("a")) → handler_A ── CANCELLED
add(Search("ab")) → handler_B ── CANCELLED
add(Search("abc")) → handler_C ──────── emit(results)
The HTTP request is NOT cancelled automatically. Only the emit is cancelled. For HTTP cancellation, use CancelToken or similar.
Per-Event-Type Granularity
A key Bloc advantage — transformer is set per event type:
class SearchBloc extends Bloc<SearchEvent, SearchState> {
SearchBloc() : super(SearchInitial()) {
on<SearchQueryChanged>(_onQuery, transformer: restartable());
on<FilterChanged>(_onFilter, transformer: sequential());
on<SearchCleared>(_onCleared); // concurrent (default)
}
}
This is impossible in riverpod and control.
#riverpod: Concurrency (or Lack Thereof)
Two Levels of Async
Riverpod has two distinct async contexts:
build()method — provider initialization/recreation- Notifier methods — user actions (
addTodo,increment, etc.)
They have fundamentally different concurrency behavior.
build() — Pseudo-Restartable
On invalidation (ref.invalidate(), dependency change), build() runs again. The old build() continues executing, but its result is ignored (if (!running) return).
Similar to restartable, but with an important difference: cancellation is cooperative. The Future isn't killed. The HTTP request keeps running, its result just won't be applied.
User Methods — NO Protection
For addTodo(), increment() — no built-in concurrency mechanism.
@riverpod
class TodoList extends _$TodoList {
@override
Future<List<Todo>> build() async => _fetchTodos();
Future<void> addTodo(Todo todo) async {
state = const AsyncLoading();
state = await AsyncValue.guard(() async {
await http.post(/* ... */);
return _fetchTodos();
});
}
}
3 rapid addTodo calls:
All three POST requests fire in parallel. Completion order is NOT guaranteed! addTodo(B) completes first → state = [A,B] addTodo(A) completes after → state = [A] ← REGRESSION! addTodo(C) completes last → state = [A,B,C] UI: Loading → [A,B] → [A] → [A,B,C] ← flickering!
Fix Attempt 1: ref.onDispose
Future<void> addTodo(Todo todo) async {
var cancelled = false;
ref.onDispose(() => cancelled = true);
// ...
if (!cancelled) state = result;
}
Problems:
ref.onDisposefires on provider invalidation, not on repeatedaddTodocalls- Can't cancel one
addTodofrom another - If the notifier is recreated,
state =on old instance throws
Fix Attempt 2: Mutex
@riverpod
class TodoList extends _$TodoList {
final _mutex = Mutex();
Future<void> addTodo(Todo todo) async {
await _mutex.synchronize(() async { /* ... */ });
}
}
Lifecycle problem: on ref.invalidate(), the Notifier is recreated → new Mutex. Old Mutex orphaned.
TodoList v1: _mutex(v1).lock() → addTodo(A) → awaiting... ref.invalidate() → v1 disposed, v2 created TodoList v2: _mutex(v2) — empty, no protection
Practical Workaround
Future<void> addTodo(Todo todo) async {
if (state is AsyncLoading) return; // manual droppable
state = const AsyncLoading();
// ...
}
Works, but requires boilerplate in every method.
#control: Concurrency via Mutex Mixins
Note: control is not a mainstream state management package. It's a well-architected utility library — an excellent reference for how concurrency primitives integrate cleanly into state management.
Sequential
mixin SequentialControllerHandler on Controller {
final Mutex _$mutex = Mutex();
@override
Future<T?> handle<T>(
Future<T> Function() handler, { ... }
) => _$mutex.synchronize<T?>(
() => super.handle<T>(handler, ...),
);
}
The entire strategy in one expression.
Droppable
mixin DroppableControllerHandler on Controller {
final Mutex _$mutex = Mutex();
@override
Future<T?> handle<T>(
Future<T> Function() handler, { ... }
) {
if (_$mutex.locked) return Future<T?>.value(null);
return _$mutex.synchronize<T?>(
() => super.handle<T?>(handler, ...),
);
}
}
The _$mutex.locked check is synchronous — no race between check and acquisition. In single-threaded Dart, no await means no interleaving.
Clean, minimal, correct.
#Side-by-Side: "Add to Cart" with Rapid Clicks
flutter_bloc
class CartBloc extends Bloc<CartEvent, CartState> {
CartBloc(this._repo) : super(const CartState.initial()) {
on<AddToCart>((event, emit) async {
emit(state.copyWith(status: CartStatus.loading));
await _repo.addToCart(event.product);
final cart = await _repo.getCart();
emit(state.copyWith(status: CartStatus.loaded, items: cart));
}, transformer: sequential());
}
}
3 clicks → 3 sequential requests → [A, B, C]. Correct.
riverpod
@riverpod
class CartNotifier extends _$CartNotifier {
@override
Future<CartState> build() async => CartState.initial();
Future<void> addToCart(Product product) async {
state = const AsyncLoading();
state = await AsyncValue.guard(() async {
await _repo.addToCart(product);
return CartState.loaded(await _repo.getCart());
});
}
}
3 clicks → 3 parallel requests → state depends on completion order. Possible data loss.
control
class CartController extends StateController<CartState>
with SequentialControllerHandler {
Future<void> addToCart(Product product) => handle(() async {
setState(state.copyWith(status: CartStatus.loading));
await _repo.addToCart(product);
final cart = await _repo.getCart();
setState(state.copyWith(status: CartStatus.loaded, items: cart));
});
}
3 clicks → 3 sequential requests → [A, B, C]. Correct. Same as Bloc, via Mutex mixin.
#Concurrency Matrix
Strategies
| Strategy | flutter_bloc | riverpod | control |
|---|---|---|---|
| Concurrent | ✅ Default | ✅ Default (only) | ✅ Default |
| Sequential | ✅ asyncExpand | ❌ Manual | ✅ Mutex mixin |
| Droppable | ✅ exhaustMap | ❌ Manual | ✅ Mutex mixin |
| Restartable | ✅ switchMap | ⚠️ build() only | ❌ |
| Custom | ✅ Any operator | ❌ | ❌ |
Key Characteristics
| Aspect | flutter_bloc | riverpod | control |
|---|---|---|---|
| Granularity | Per-event-type | None | Per-controller |
| Emit cancellation | ✅ Automatic | ⚠️ build() only | ❌ |
| HTTP cancellation | Manual | Manual | Manual |
| Mechanism | Stream operators | None | LinkedMutex |
| Lifecycle-safe | ✅ close() | ⚠️ invalidate() | ✅ dispose() |
#Pitfalls
flutter_bloc
- Default is concurrent. No transformer = parallel handlers = race conditions in async handlers.
- One handler per event type. Need subtypes for different logic.
- Stream overhead. Each
on<E>()creates StreamController + Subscription + filter + Emitter.
riverpod
- No built-in concurrency for methods. Most serious for write-heavy apps.
- Lifecycle vs Mutex. Notifier recreation kills Mutex state.
- Complex
ref.invalidateSelf()control flow.
control
- Per-controller, not per-method. Can't mix strategies within one controller.
- No restartable. Can't auto-cancel previous searches.
- Handler runs to completion even after dispose (
setStatebecomes no-op).
#Conclusions
Read-heavy apps (data loading, display)
Any library works. Riverpod shines with reactive dependency graph and AsyncValue.
Write-heavy apps (forms, carts, chats, real-time)
flutter_bloc — strongest choice. 4 strategies, per-event granularity, automatic emit cancellation.
control — clean reference for Mutex-based concurrency. Elegant per-controller solution. Great for studying the pattern.
riverpod — requires manual work. No built-in strategies, lifecycle complications with Mutex.
Recommendation
- Need flexible concurrency control → flutter_bloc with
bloc_concurrency - Want to study clean Mutex architecture → control as a reference
- Using riverpod → manage concurrency consciously:
state is AsyncLoadingchecks, UI-level debounce, or Mutex at repository layer
Concurrent by default is dangerous for async operations that depend on state. Conscious strategy choice is a sign of mature architecture.