As you know Event Sourcing and CQRS are important topics for microservices. But have you ever thought, what happens if we do not throw any event(s) or lose some event(s) during transaction?

Neşeli Günler

In these days we have been killing legacy systems with microservices and getting rid of them. Microservices have many advantages, as well as many managing difficulties. One of the most important problems when microservices are communicating event-based with each other is that the events sent cannot be threw, the message is lost on the message broker side or the microservice to whom you will send the message needs that message again due to an error or other reason. So what do we do in this situation?

Transactional Outbox Pattern, one of the microservice patterns meets this need. This pattern briefly gives us, It says;

If you are going to leave a message to another service, don’t leave it directly to the message broker, write it in a table and read it from there.

The advantage of this is that we will never lose events anymore, and we will be able to launch them again if needed. As for the disadvantage, writing / reading events will be an extra cost for us. Of course, applying and using this pattern should be according to your needs.

So how do we implement it?

First, I start by creating the project structure. I preferred to proceed using onion architecture. Because Onion Architecutre, which is very convenient for DDD, it will provide you to separate the layers more accurately and responsibilities more. For more information on Onion Architecture, look at here.

Understanding Onion Architecture

As seen in the onion architecture diagram above, I have created a focused structure for Domain, Domain Services and Application services for Application Core as follows.

Onion Architecture Layers

As in the previous Take a look at CQRS and Event Sourcing with DDD — Part 2 article, I wrote the parts before where we will leave the events. So I’m skipping these sections, but you can read them here if you want.

I am using Entity Framework in the project and I have overridden the SaveChanges method as follows. Here, I have edited that only when a record is to be made to db, that’s when the transaction will end, the events related to it will be written to the Outbox table.

namespace FB.TransactionalOutbox.Persistence
{
    public class ApplicationDbContext : DbContext
    {
        public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options) : base(options)
        {
        }

        public DbSet<User> Users { get; set; }
        public DbSet<Event> Events { get; set; }

        public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess,
                CancellationToken cancellationToken = new CancellationToken())
        {
            var domainEntities = this.ChangeTracker
                                     .Entries<Entity>()
                                     .Where(x => x.Entity.DomainEvents != null && x.Entity.DomainEvents.Any()).ToList();

            var domainEvents = domainEntities
                               .SelectMany(x => x.Entity.DomainEvents)
                               .ToList();
            domainEntities.ForEach(entity => entity.Entity.ClearDomainEvents());
            foreach (var domainEvent in domainEvents)
            {
                var message = MessageAttribute.Get(domainEvent.GetType());
                if (message == null)
                    continue;
                
                var eventBody = JsonConvert.SerializeObject(domainEvent, new JsonSerializerSettings
                {
                        ContractResolver = new CamelCasePropertyNamesContractResolver(),
                });
                Console.WriteLine($"\n------\nA domain event has been published!\n" +
                                  $"Event: {domainEvent.GetType().Name}\n" +
                                  $"TopicName: {message.Name}\n" +
                                  $"EventBody: {eventBody}\n");

                await Events.AddAsync(new Event(domainEvent.GetType().Name, eventBody, message.Name), cancellationToken);
            }

            return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken);
        }
    }
}

So far, everything is great! We finished the transaction and wrote the events to the Outbox table. Now it’s time to throw the events to the related exchange, topic or queue.

You can find many approaches in this section. Solutions such as Pooling Publisher Pattern, Eventuate Tram or Debezium can be best practice for reading outbox table. I used a background job to read it to db at intervals of 5 seconds in order to be simple and understand the subject. And the job works as follows.

namespace FB.TransactionalOutbox.Infrastructure.Hangfire.Jobs
{
    public class OutboxTableReadJob : IJob
    {
        private readonly IMediator _mediator;
        private readonly IModel _channel;

        public OutboxTableReadJob(IMediator mediator, IModel channel)
        {
            _mediator = mediator;
            _channel = channel;
        }

        public async Task Start()
        {
            var events = await _mediator.Send(new GetEventsQuery());
            if (events.Count == 0)
                return;

            foreach (var @event in events)
            {
                _channel.BasicPublish(routingKey: "", basicProperties: _channel.CreateBasicProperties(),
                        exchange: @event.TopicName,
                        body: Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event.EventBody)));
            }

            await _mediator.Send(new DeleteThrewEventsCommand(events.Select(e => e.Id).ToList()));
        }
    }
}

Here I query all the events that have been threw before and then I throw the events to the message broker. After the launch process is complete, I turn these events into IsDeleted status.

And result;

Console Result

RabbitMq Result

I write when I learn, I learn as I write. I may have made mistakes in many places because I am just a human. I would be glad if you correct it when you see any mistakes.

You can access the source code from here.

Thank you for your interest. See you in the next post :)