Engineered Composable Flows (approx. 200 BCE) - Khuzestan, Iran
1️⃣ Imagine turning this imperative code:
async Task<Guid> SendWelcomeAsync(int userId)
{
User? user;
try
{
user = await _users.GetAsync(userId);
}
catch (Exception ex)
{
_logger.LogError(ex, "User lookup threw for {UserId}", userId);
_metrics.Increment("emails.lookup.exceptions");
throw;
}
if (user is null)
{
_logger.LogWarning("User not found: {UserId}", userId);
_metrics.Increment("emails.lookup.not_found");
throw new NotFoundException("User");
}
EmailMessage email;
try
{
email = user.IsVip
? Templates.VipWelcomeEmailFor(user)
: Templates.StandardWelcomeEmailFor(user);
}
catch (Exception ex)
{
_logger.LogError(ex, "Templating failed for {UserId}", userId);
_metrics.Increment("emails.template.exceptions");
throw;
}
try
{
var messageId = await _emails.SendAsync(email);
_metrics.Increment("emails.sent");
return messageId;
}
catch (Exception ex)
{
_logger.LogError(ex, "Send failed for {UserId}", userId);
_metrics.Increment("emails.send.failures");
throw;
}
}2️⃣ Into this Flow:
var onboardingFlow =
Flow.Succeed(userId)
.Chain(Users.FindUserFlow)
.Validate(
user => user is not null,
_ => new NotFoundException($"{userId}"))
.Chain(user =>
user.IsVip switch {
true => Flow.Succeed(Templates.VipWelcomeEmailFor(user)),
false => Flow.Succeed(Templates.StandardWelcomeEmailFor(user))
})
.Chain(Emails.SendWelcomeEmailFlow)
.DoOnSuccess(_ =>
_metrics.Increment("emails.sent"))
.DoOnFailure(ex =>
_logger.LogError(ex, "Send failed"));
await FlowEngine.ExecuteAsync(onboardingFlow);3️⃣ Here's a quick glance at what happened above:
| Exceptions | ➡️ | Data | Operators (e.g. Chain) can throw. Flow captures them and returns Failure - no manual try‑catch anymore. |
| Guards | ➡️ | Declarative | Validate encodes the pre/post-conditions. When false, the flow turns into Failure with the exception you choose. |
| Side-Effects | ➡️ | Contained | DoOnFailure/DoOnSuccess log/measure without changing outcomes - they cannot control the flow anymore. |
| Alternatives | ➡️ | Explicit | Recover can branch the whole flow on specific errors. |
| Errors | ➡️ | Unswallowed | If you don't 'recover', ExecuteAsync returns Failure with the original exception. |
If you'd rather just dive deep right in, this section is all you'd need!
The main thing you need to remember is that Flow is built around a small, composable set of types and methods.
Here are the core building blocks. Have fun!
// --- 0️⃣ The Core Types ---
public interface IFlow<T> { /* ... */ }
public abstract record Outcome<T>;
public sealed record Success<T>(T Value) : Outcome<T>;
public sealed record Failure<T>(Exception Exception) : Outcome<T>;
// --- 1️⃣ Starting a Flow ---
var a = Flow.Succeed(42);
var b = Flow.Fail<int>(new Exception("..."));
var c = Flow.Create(() => GetValueFromDatabase()); // Synchronous, failable
var d = Flow.Create(ct => GetValueFromApiAsync(ct)); // Asynchronous, cancellable
// --- 2️⃣ Composing Flows and Operations ---
var initialFlow = Flow.Succeed(123);
var transformed = initialFlow.Select(i => i.ToString()); // T -> U
var sequenced = initialFlow.Chain(i => GetNextFlow(i)); // T -> IFlow<U>
var validated = initialFlow.Validate(i => i > 0,
_ => new Exception("..."));
var recovered = initialFlow.Recover(ex => GetFallbackFlow(ex));
var logged = initialFlow.DoOnSuccess(i => Log(i));
// --- 3️⃣ Adding Resiliency ---
var resilient = initialFlow
.WithRetry(3)
.WithTimeout(TimeSpan.FromSeconds(5));
// --- 4️⃣ Executing the Flow ---
Outcome<string> outcome = await FlowEngine.ExecuteAsync(sequenced);
// --- 5️⃣ Handling the Result ---
string result = outcome switch
{
Success<string> s => $"Got {s.Value}",
Failure<string> f => $"Error: {f.Exception.Message}",
};Want to see this in a complete, runnable project? Check out the "Should I Go Outside?" example application.
The previous example was cool: clean and declarative. But the REAL win is in Flow's plug-and-play design 🔌
- A Flow is just a recipe for your business logic.
- Since it is nothing more than a definition, it can be enriched and reused: cheap and simple.
- You can enhance any Flow with new behaviours and operators without ever touching the original code - no, seriously 😎
Assume there's this flow which sends a notification to a user. Oh, and you do not own the code.
1️⃣ Say, you need a retry logic? Easy - you simply enrich your existing flow!
var resilientGetUserFlow =
GetUserAndNotifyFlow(httpRequestParams.userId)
.WithRetry(3);2️⃣ Maybe you want to add a timeout, too? No problem!
var timeoutGetUserFlow =
resilientGetUserFlow
.WithTimeout(TimeSpan.FromSeconds(5));3️⃣ How about logging the failure? Just do it!
var loggedGetUserFlow =
timeoutGetUserFlow
.DoOnFailure(ex => _logger.LogError(ex, "Failed to get user"));4️⃣ I could go on, but you get the idea 😉
A Flow is a recipe which is:
✅ Composable: Mix and match from various services and libraries.
✅ Enrichable: At the call-site/client-side; add new behaviors at the last second, without ever touching the upstream code.
- .NET CLI:
dotnet add package BahmanM.Flow - PackageReference:
<PackageReference Include="BahmanM.Flow" Version="x.y.z" /> - NuGet page: https://www.nuget.org/packages/BahmanM.Flow/
Let’s look at a more involved example. The goal is to see how Flow allows us to compose complex opoerations from smaller, independent pieces.
Say, we're writing a Kafka consumer which receives a DispatchRequestedMessage, looks up the order, fetches a shipping rate, recovers to a safe default on 404, transforms to a dispatch message, and publishes it to another topic.
Note: Admittedly, this is not a production-grade code. I've made quite a few assumptions to keep the snippet fit the README.
class DispatchRequestedConsumer : IKafkaConsumer
{
async Task Consume(DispatchRequestedMessage message)
{
var consumeFlow =
Flow.Succeed(message)
.Select(_adapters.AsOrderId)
.Chain(orderId =>
_orders.FindOrderFlow(orderId))
.Validate(
order => order is not null,
_ => new NotFoundException("Order not found"))
.DoOnFailure(ex =>
_logger.LogWarning($"Order lookup failed: {ex.Message}"))
.Chain(order =>
_rates
.GetShippingRateFlow(order.ShipTo)
.Recover(ex =>
ex is HttpNotFoundException
? Flow.Succeed(ShippingRate.StandardFallback)
: Flow.Fail<ShippingRate>(ex))
.Select(rate =>
(order, rate)))
.Select(x =>
_adapters.AsDispatchMessage(x.order, x.rate))
.Chain(dispatchMessage =>
_producer.ProduceFlow(dispatchMessage))
.DoOnFailure(ex =>
_logger.LogError(ex, "Produce failed"))
.DoOnSuccess(_ =>
_otel.IncreaseCounter("shipping.dispatch.produced"));
await FlowEngine.ExecuteAsync(consumeFlow);
}
}| 🧠 Compose Behaviours and Operations |
Where they are need, not where they are defined. Policies (timeout, retry) and whole‑flow branching are configured in our code and not in the upstream modules. |
| 🧠 Value Introspection | Validate(order is not null ...) encodes a business invariant right in the Flow. |
| 🧠 Branching | Recover swaps the entire downstream when the carrier returns 404 (fallback to default rate). |
You've had the whirlwind tour!
For the full story, including practical recipes, deep-dive articles, and the complete API reference, head over to 🏡 Flow's Home.

