← Back to home

Minimal impact domain events — Tracking domain events from your domain model across an asynchronous execution context

View the full source code on GitHub

As my interest in Domain Driven Design grew I learned more about the aggregate pattern and the event-sourced domain model. After watching many videos on the subject, one thing always bothered me; keeping track of domain events. Every video I watched or person I talked to suggested to make a public collection on the aggregate, so that at the end of your processing pipeline a middleware or behavior can read that collection and either dispatch the domain events right away, or more preferably persist the domain events in an outbox. Nearly every video used EntityFramework’s change tracking mechanism to automate finding the added domain events before dispatching or persisting them in an outbox. The rest of the videos I watched manually dispatched them at the end of the execution pipeline.

What bothered me with both of these approaches is the fact that you need to create a publicly accessible domain event collection on your aggregate, whose sole purpose is to expose the domain events to a dispatching or persistence mechanism. To me this didn’t feel right. Domain events are a crucial part of your domain model, but having a property on your aggregate exposing them purely for application logic (i.e. for dispatching or persisting) did not sit well with me. The domain shouldn’t concern itself with application logic such as dispatching domain events, and thus should not provide functionality to do so. We need a way to notify the application layer about a new domain event, so that it can decide how to handle it, without any impact on the domain model. This means no public domain event collection and no dependency injection into the aggregate.

As domain events are not persisted on the aggregate, but rather in an outbox or dispatched immediately, they exist only in the scope of the processing pipeline. So we also need to find a way to make the domain events only available in the current processing pipeline scope (i.e. an HTTP request, an incoming integration event, a MediatR pipeline, etc.). I wanted to make a scoped dependency that exposes the domain events that were raised in the pipeline. This dependency can then be used to make behaviors and middleware that can be used to handle domain events that were raised in the execution context of that pipeline.

After a bit of thinking, I was thinking of thread-static property bags. The idea being that each HTTP request or incoming integration event uses it’s own thread, so the thread static property bag should only hold data belonging to that processing pipeline. The problem with this solution is that as soon as you leverage the Task Parallel Library, i.e. async and await, you are not guaranteed to continue on the same thread after awaiting asynchronous code, meaning you are not guaranteed access to the same property bag as from before.

This is when I found out about the AsyncLocal class. The AsyncLocal class provides a mechanism to store data across an asynchronous execution context. Once data is stored in the AsyncLocal, a new ExecutionContext is created for the current call chain. This data is propagated down the call stack, but does not flow up the call stack. [1] [2]

What does this all mean? This means that you can initialize a domain event collection at any level in your pipeline, and then append domain events from any method called (indirectly) from there. You don’t even need to worry about memory usage, as the value of the AsyncLocal is automatically cleared as soon as you exit the execution context in which the AsyncLocal was assigned a value. [3]

So back to the original problem. We need a way to notify the application layer about domain events, without changing the design of our aggregate, and the domain events need to exist only in the scope of the execution context in which they were created. What would satisfy both of these conditions? A static AsyncLocal with a domain event collection as value, initialized at the beginning of the execution context using a behavior or middleware, so that at the end of the execution context we can read the domain event collection and handle all newly raised domain events for that execution context. This solution offers us a thread-safe, TPL-compatible, request scoped method of raising and handling domain events without impacting the design of our aggregate.

You might be thinking, sounds good, but how would you go about implementing this? Let’s start with a basic implementation and go from there. Firstly, we define our domain event:


internal interface IDomainEvent { }

internal record OrderPlaced(string Name) : IDomainEvent;
	

Next we implement a basic domain event tracker using AsyncLocal:


internal static class DomainEventTracker
{
    private static readonly AsyncLocal<List<IDomainEvent>> _domainEvents = new();

    public static void Raise(IDomainEvent domainEvent)
    {
        _domainEvents.Value ??= [];
        _domainEvents.Value.Add(domainEvent);
    }

    public static IReadOnlyCollection<IDomainEvent> GetDomainEvents()
    {
        return _domainEvents.Value ?? [];
    }
}
	

This implementation creates and stores a new empty domain event collection in the AsyncLocal if one does not exist already, and then adds the domain event to that collection. We can then create an entity from which we raise domain events:


internal class Product
{
    public void Order()
    {
        DomainEventTracker.Raise(new OrderPlaced("Some product"));
    }
}
	

And then run our execution pipeline that calls the Order method on the entity:


internal class Program
{
    static void Main(string[] args)
    {
        SomeProcessingPipeline();
        Console.ReadLine();
    }

    private static void SomeProcessingPipeline()
    {
        var product = new Product();
        product.Order();

        var events = DomainEventTracker.GetDomainEvents();
        foreach (var @event in events)
        {
            Console.WriteLine(@event);
        }
    }
}
	

If you run this example, you would see the following output:

Program output showing OrderPlaced result
The program’s output

As you can see, this code satisfies the requirement I set earlier about not having a publicly accessible domain event collection or dependency injection. Using a static method to raise domain events, we register the domain events in the asynchronous execution context of the processing pipeline using an AsyncLocal in the DomainEventTracker.

There is one problem with the example code, and that is that we are accessing and handling the domain events from within the processing pipeline. Ideally we would have some sort of behavior or middleware wrapped around your processing pipeline that handles this concern for you. No problem, we just move the accessing and handling higher up in the call stack:


internal class Program
{
    static void Main(string[] args)
    {
        SomeProcessingPipeline();
        var events = DomainEventTracker.GetDomainEvents();
        foreach (var @event in events)
        {
            Console.WriteLine(@event);
        }
        Console.ReadLine();
    }

    private static void SomeProcessingPipeline()
    {
        var product = new Product();
        product.Order();
    }
}
	

If you run this example you get the exact same output. Great! But what if SomeProcessingPipeline leverages the TPL? The AsyncLocal would be instantiated in the asynchronous execution context of the SomeProcessingPipeline method, and cease to exist once we are returned to the Main method.

To work around this, we need to explicitly instantiate the AsyncLocal outside the processing pipeline:


internal static class DomainEventTracker
{
    private static readonly AsyncLocal<List<IDomainEvent>> _domainEvents = new();

    public static void CreateScope()
    {
        _domainEvents.Value ??= [];
    }

    ...
}
	

internal class Program
{
    static async Task Main(string[] args)
    {
        DomainEventTracker.CreateScope();
        await SomeProcessingPipeline();
        var events = DomainEventTracker.GetDomainEvents();
        foreach (var @event in events)
        {
            Console.WriteLine(@event);
        }
        Console.ReadLine();
    }

    private static async Task SomeProcessingPipeline()
    {
        var product = new Product();
        await product.OrderAsync();
    }
}
	

With these simple modifications we can now explicitly create scopes for our behaviors or middleware, and our implementation is safe to use across asynchronous methods. This allows us to register a scoped dependency which we can use in our processing pipelines to get the domain events that were raised in our domain model for that specific processing pipeline execution.

There is a lot more you can do with the AsyncLocal, such as wrapping it up in a disposable wrapper, essentially binding the lifetime of the domain event scope to a using statement. This allows for nested scopes, which is useful when nesting processing pipelines that each require their own scope. The code is a bit long, so I won’t post it here, but you can find it on my GitHub. The project features behaviors and middleware for MediatR, AspNetCore, MongoDb and NServiceBus.


using (var scope = DomainEventTracker.CreateScope())
{
    DomainEventTracker.RaiseDomainEvent(new TestEvent("I was raised in the top scope."));
    using (var nestedScope = DomainEventTracker.CreateScope())
    {
        DomainEventTracker.RaiseDomainEvent(new TestEvent("I was raised in the nested scope."));

        using (var evenMoreNestedScope = DomainEventTracker.CreateScope())
        {
            DomainEventTracker.RaiseDomainEvent(new TestEvent("I was raised in the deepest scope."));
            DomainEventTracker.Peek().Should().HaveCount(1);
        }

        DomainEventTracker.RaiseDomainEvent(new TestEvent("I was also raised in the nested scope."));
        DomainEventTracker.Peek().Should().HaveCount(2);
    }

    DomainEventTracker.Peek().Should().HaveCount(1);
}

DomainEventTracker.Peek().Should().BeEmpty();