Using NServiceBus to connect two webapplications (MVC), with a touch of CQRS

In the previous post I explained what a service bus is designed to do. In particular, I introduced NServiceBus, which is a .NET implementation of a service bus that I quite like. Since the documentation on NServiceBus is quite limited I decided to write a post that shows you how to use NServiceBus with two actual web applications (MVC), a bit of Ninject and a touch of the CQRS design pattern.  In the next post I will also show you some actual code. You can download the source code.

What do we need for NServiceBus?

To get started with NServiceBus, we need at least the following:

  • One or more ‘nodes’ (or ‘endpoints’ in NServiceBus) that send and/or receive messages;
  • The NuGet package for NServiceBus must be installed on all nodes;
  • A binary with a bunch of messages (simple DTO classes with some properties);
  • A persistent store where messages can be stored and that can be polled by the individual nodes (consider this the post office from the previous post). The equivalent of a mailbox is a ‘queue’ in NServiceBus;

Missing from this list, and it’s important to emphasize this, is some sort of central node, proxy, gateway, router or some kind of server that ‘is’ NServiceBus. NServiceBus exists as a package in every endpoint and uses the local store (or stores) to communicate. It is a distributed service bus, unlike BizTalk (and many other buses).

For the example, I chose to use a (for me familiar) MSSQL instance hosted at AppHarbor as my persistent store. You can also use MSMQ, RabbitMQ (default for NServiceBus) or another platform. The store database is automatically populated by NServiceBus, as long as the connection string is configured for all the nodes (the address of the post office, so to speak). You can easily change the store location by changing the connection string.

Let’s get started with a node that can send and receive messages. We will then implement a node that can receive a message and send another one. 

Step 1: Overview

I’ve created two simple applications that are loosely coupled; a webshop with a bunch of products and a CMS for populating the webshop with products. It is also possible for a visitor to purchase a product in the webshop, after which it’s marked as ‘purchased’ in the CMS. Not a very useful webshop, I agree, but it suits the purpose and is more useful than a bunch of console applications (like most examples you can find online). You can download the source and play with it locally (don’t forget the read the Readme.txt), or try out the online [webshop][4] and [CMS][5] (AppHarbor only spins up new instances if there is traffic, so first-load may take 20 seconds or so).

Solution structure

The solution (‘Samples.NServiceBus’) contains website projects for the webshop and the CMS. It is important to note that the projects don’t reference each other anywhere in the code, nor communicate with each other directly. There is also a project called ‘Messages’ which contains the various Messages that can be published on the service bus. 

Step 2: Messages

Messages are an integral part of any real Service Bus. An example of such a message is:

namespace Samples.NServiceBus.Messages.Events
{
    public class ProductWasAdded
    {
        public ProductDto Product { get; set; }
    }
}

The other messages (ProductWasAdded, ProductWasPurchased, ProductWasRemoved, ProductWasUpdated and PurchaseProductCommand) follow this structure, but you can easily add more properties. Because my message concern only products at the moment, I created a ProductDto class that I include as part of nearly all messages:

public class ProductDto
{
    public Guid Id { get; set; }  
    public string Name { get; set; }
    public decimal Price { get; set; }
    public string Description { get; set; } 
    public bool Purchased { get; set; }
} 

The 'Messages' project is referenced by both the webshop and the CMS, and essentially describes the ‘contracts’ of the messages that can exchanged. 

Step 3: Connecting the websites to the service bus

Both the webshop and the CMS are connected to the service bus. This sounds like there is actually a piece of software called a ‘service bus’ running, but that’s not really the case. Both applications automatically start their own NServiceBus-process. But messages are stored - in this case - in a shared SQL server database (the aforementioned post office). The configuration is mostly the same, so I’ll show you how the CMS is configured.

The first step in configuring NServiceBus involves the web.config:

  <configSections>
    <!-- NServiceBus sections -->
    <section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="Logging" type="NServiceBus.Config.Logging, NServiceBus.Core" />
    <section name="AuditConfig" type="NServiceBus.Config.AuditConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    &hellip;.
  </configSections>
  <connectionStrings>
    <!-- NServiceBus connectionstrings -->
    <add name="NServiceBus/Persistence" connectionString="Data Source=(localdb)\v11.0;Integrated Security=true;Database=Samples.NServiceBus;Asynchronous Processing=true;Pooling=true;MultipleActiveresultsets=True" />
    <add name="NServiceBus/Transport" connectionString="Data Source=(localdb)\v11.0;Integrated Security=true;Database=Samples.NServiceBus;Asynchronous Processing=true;Pooling=true;MultipleActiveresultsets=True" />
  </connectionStrings>

  <!-- NServiceBus Config -->
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <AuditConfig QueueName="audit" />
  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Messages" Type="Samples.NServiceBus.Messages.Events.ProductWasAdded" Endpoint="CMS#LOCAL" />
      <add Assembly="Messages" Type="Samples.NServiceBus.Messages.Events.ProductWasUpdated" Endpoint="CMS#LOCAL" />
      <add Assembly="Messages" Type="Samples.NServiceBus.Messages.Events.ProductWasRemoved" Endpoint="CMS#LOCAL" />
      <add Assembly="Messages" Type="Samples.NServiceBus.Messages.Events.ProductWasPurchased" Endpoint="CMS#LOCAL" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

[ other configuration ]

The first thing you need is the boilerplate part for the . After that, I’ve added two connection strings; one for the transport and one for the persistence. In this case, I’ve used MSSQL, but you can easily switch to MSMQ, RavenDB, RabbitMQ, etc. More important is the third part, where we configure the queues, error handling, logging and - most importantly - the types of message this application should ‘pick up’. NServiceBus allows this to be a bit less verbose by simply referencing the assembly that contains the messages, but I like the clarity of my approach. You also have to specify from which endpoint the messages should be. In NServiceBus, all messages are sent or published by a single logical endpoint (usually a single application). This endpoint is the authoritative source of the messages. In this case, I’m listening only for messages from the endpoint named ‘CMS#LOCAL’. But I can easily add more messages and endpoints.

After the web.config, you have to actually create a bus. In an MVC application, this snippet in your App_Start folder will do most of the work:

namespace Samples.NServiceBus.Webshop
{
    public class NServiceBusConfig
    {
        public static IBus CreateServiceBus(IKernel kernel)
        {
            var configuration = new BusConfiguration();
            configuration.EndpointName(ConfigurationManager.AppSettings["NServiceBus.EndpointName"]);
            configuration.UseSerialization<JsonSerializer>();
            configuration.UseTransport<SqlServerTransport>();
            configuration.UsePersistence<NHibernatePersistence>(); 
            configuration.Conventions()
                .DefiningEventsAs(p => p.Namespace != null && p.Namespace.StartsWith("Samples.NServiceBus.Messages") && p.Namespace.EndsWith("Events"))
                .DefiningCommandsAs(p => p.Namespace != null && p.Namespace.StartsWith("Samples.NServiceBus.Messages") && p.Namespace.EndsWith("Commands"));
            configuration.UseContainer<NinjectBuilder>(p => p.ExistingKernel(kernel));
            configuration.EnableInstallers();

            return Bus.Create(configuration).Start();
        }
    }
}

The first thing we do is set the endpoint name of this node. I’ve made it configurable through the AppSettings in the web.config in this case. Next, we specify the serialization type for the messages (XML or JSON, usually), followed by the transport and persistence modules. As mentioned before, you can easily switch to MSMQ, RabbitMQ or other providers. 

The next part contains ‘message conventions’. It tells NServiceBus which messages are ‘commands’ and which are ‘events’. This configuration approach is called ‘unobtrusive message mode’. You can also leave out this bit and implement either ICommand or IEvent interfaces in your messages. But the nice thing about manual conventions is that your Messages project doesn’t depend on NServiceBus in any way (one less NuGet package). I will show you the actual (practical) difference between Commands and Events in a bit, which is small in code but large in purpose.

The .UseContainer part specifies that I want to use Ninject for injection of dependencies, instead of the default AutoFac. If you want to know more about dependency injection with Ninject, check out this post. In any case, the UseContainer method requires the Ninject kernel to be passed in so NServiceBus can use it to resolve dependencies for handlers (and also register itself for use in said handlers). I’m not too happy with this approach, but if you install the Ninject.MVC5 NuGet package, it automatically creates a NinjectWebCommon.cs file in your App_Start folder. I added the creation of the service bus there, so I have access to the kernel that’s being created. It works, but it’s not the prettiest code.

private static void RegisterServices(IKernel kernel)
{
    &hellip;. service bindings &hellip;..
    NServiceBusConfig.CreateServiceBus(kernel);
}        

Configuration-wise, we’re done for now! The CMS is connected. If you start it in Visual Studio, NServiceBus will automatically spawn a hidden process that connects to the message store and polls for new messages (very) frequently. All we need now is to actually send and handle messages.

Step 4: Publishing a message through the service bus

Publishing messages is actually really easy once the plumbing is in place. You can find a few examples in the ProductController of the CMS project. This is an example of publishing a ProductWasAdded message whenever a new product is created:

[HttpPost]
public ActionResult Create(Product product)
{
    if (ModelState.IsValid)
    {
        productRepository.Insert(product);
        productRepository.SaveChanges();

        var message = new ProductWasAdded();
        message.Product = Mapper.Map<ProductDto>(product);
        bus.Publish<T>(message);

        return RedirectToAction("Index");
     }
     else
     {
        return View();
     }
 }

Since the ProductWasAdded message only contains a ProductDto, populated with a selection of fields from the CMS’ Product class, I used AutoMapper to do the mapping for me. You could forego this mapping altogether, and just attach the Product instance to the message. The advantage is that this removes one layer of DTOs, and cuts back complexity, but there is a risk of exposing properties (through the message) that you may not want to expose. 

Although this code will work, I refactored it a bit to remove the responsibility of publishing messages to another class instead of my ProductController as it is already tasked with the responsibility of interacting with the views. In the source code, you can see that I’ve moved the publishing logic to a class called ProductEventPublication in a separate project (CMS.EventPublication). As my messages are still very simple, and mostly the same, this class is also very simple:

public class ProductEventPublication : IProductEventPublication
{
    private readonly IBus bus;

    public ProductEventPublication(IBus bus)
    {
        this.bus = bus;
        Mapper.CreateMap<Product, ProductDto>();
    }

    public void ProductWasAdded(Product product)
    {
        PublishProductEvent<ProductWasAdded>(product);
    }

    public void ProductWasUpdated(Product product)
    {
        PublishProductEvent<ProductWasUpdated>(product);
    }

    public void ProductWasRemoved(Product product)
    {
        PublishProductEvent<ProductWasRemoved>(product);
    }

    public void ProductWasPurchased(Product product)
    {
        PublishProductEvent<ProductWasPurchased>(product);
    }

    private void PublishProductEvent<T>(Product product) where T : ProductEventBase, new()
    {
        var message = new T();
        message.Product = Mapper.Map<ProductDto>(product);
        bus.Publish<T>(message);
    }
}

An instance of IBus is injected through Ninject automatically when this class is instantiated. To reduce code duplication in the current version, I created a generic private method that can publish any kind of message that inherits from ProdutEventBase. If I need more custom mappings in the future, I can easily add it without breaking the interface of this class.

If we return to the ProductController, this makes our previous Create handler look like this:

[HttpPost]
public ActionResult Create(Product product)
{
    if (ModelState.IsValid)
    {
        productRepository.Insert(product);
        productRepository.SaveChanges();

        productEventPublication.ProductWasAdded(product);

        return RedirectToAction("Index");
    }
    else
    {
        return View();
    }
}

A similar piece of code can be found when removing a product:

[HttpPost]
public ActionResult Delete(Guid id, Product product)
{
    try
    {
        var dbProduct = productRepository.GetById(id);
        productRepository.Delete(dbProduct);
        productRepository.SaveChanges();

        productEventPublication.ProductWasRemoved(product);

        return RedirectToAction("Index");
    }
    catch
    {
        return View();
    }
}

So we now have a ProductController that can publish a message when a product is created, updated or removed. This means it’s time for our webshop to start listening (or subscribing) to these messages. And since we’re publishing this message instead of sending it as a command to a specific endpoint, we can create an unlimited number of endpoints that subscribe to these product events.

Step 5: Handling messages

The webshop is already configured to interact with the service bus, so handling messages shouldn’t be hard. The first thing we need is a so-called handler class that receives and processes new messages that an endpoint is subscribed to. For the webshop, you can find these in the project ‘Webshop.Handlers’. Although nothing prevents you from putting the Handlers in the ‘Webshop’ project, I like to split my solution into logical units.

Let’s take a look at the ProductEventHandler:

public class ProductEventHandler : IHandleMessages<ProductWasAdded>, IHandleMessages<ProductWasUpdated>, IHandleMessages<ProductWasRemoved>, IHandleMessages<ProductWasPurchased>
{
    private readonly ILocalDataStore localDataStore;

    public ProductEventHandler(ILocalDataStore localDataStore)
    {
        this.localDataStore = localDataStore;
    }

    public void Handle(ProductWasAdded message)
    {
        localDataStore.SaveEntity<ProductDto>(message.Product.Id.ToString(), message.Product);
    }

    public void Handle(ProductWasUpdated message)
    {
        localDataStore.SaveEntity<ProductDto>(message.Product.Id.ToString(), message.Product);
    }

    public void Handle(ProductWasRemoved message)
    {
        localDataStore.DeleteEntity<ProductDto>(message.Product.Id.ToString());
    }

    public void Handle(ProductWasPurchased message)
    {
        localDataStore.SaveEntity<ProductDto>(message.Product.Id.ToString(), message.Product);
    }
}

Now, this all looks very simple, doesn’t it? That’s one of the nice things of NServiceBus. Once you get the plumbing ready (and this takes some effort), the actual messaging and handling is straightforward. In the class definition, you can see that this this class implements a generic interface called ‘IHandleMessage’. This is part of the NServiceBus framework, and needs to be implemented for every message you want to handle in this handler. There is also some injection (Ninject) going on in the constructor. The LocalDataStore is of my own devising, and I use it to extract information for the webshop from the message and persist it to a data store.

What does this handler do, logically? The webshop, when it starts, is completely empty. Although Entity Framework creates a database, it does not populate it. Instead, the webshop subscribes to product events. Whenever a new product is added in the CMS, it receives a ProductWasAdded message. In the handler, I extract the ProductDto and save it to a local datastore for use by the webshop. When a product is updated in the CMS, I simply update the ProductDto that I’ve stored in the data store of the webshop. And when a product is deleted in the CMS, I remove the corresponding ProductDto from the webshop whenever I receive a ProductWasRemoved message. This means that my webshop is kept in sync with my CMS exclusively through NServiceBus. NServiceBus takes care of delivery (and ordering) and makes sure that my webshop receives the message eventually. A message will not get lost, unless I screw up the handling somehow (without throwing exceptions). It may take a few milliseconds, seconds or even minutes, but it will receive the message and become consistent again. This concept is called ‘eventual consistency’, and it is sufficient for most applications. 

The aforementioned implies that NServiceBus is very robust. And it gets better, because NServiceBus will retry delivery if the Handler is unavailable or throws exceptions. This is also the reason why you should never handle unhandled exceptions in a Handler. Just let these exceptions bubble up to NServiceBus. After a few (configurable) retries, NServiceBus will move the message to an Error Queue that you can monitor in the database or with tools like ServicePulse or ServiceInsight. From this it follows that an endpoint is guaranteed to become consistent eventually, even if it was down when the message was broadcast (due to disruptions, updates, load, etc). Not only is this exceptionally robust, it also allows extreme scaling. In my example, I have only one instance of a webshop. But there’s nothing keeping me from publishing a 100 instances of my webshop to different cloud servers. As long as they’re all connected to NServiceBus, they will be kept consistent (with the CMS in this case). And since they're not connected to each other directly, there is no single point of failure. This is one of the strengths of the Command-Query Separation design (CQRS) pattern that NServiceBus allows, and that you (partially) see at work here.

Step 6: ProductDtos and view models

A corollary of the CQRS pattern is that it changes how we think about view models. Although this is not necessary for, nor mandated by NServiceBus, I’ve chosen to use the ProductDto class as my view model for the views in my webshop. Furthermore, the database for the webshop is only a single table (‘LocalDataStoreItems’) with serialized ProductDto classes. You may wonder why I haven’t created a normalized database with a Product table. I could’ve, but what’s the point? I’d just be adding complexity for complexity’s sake. The ProductDto that I receive through NServiceBus contains everything I need in my webshop, so why not use that as a view model directly?

The upside of this approach is that it makes my controllers really simple. Take, for example, the Index and Details actions on the Webshop’s HomeController:

public ActionResult Index()
{
    return View(localDataStore.GetEntities<ProductDto>());
}

public ActionResult Details(string id)
{
    return View(localDataStore.GetEntity<ProductDto>(id));
}

The only thing that happens in the Details action is that the LocalDataStore locates the item with the corresponding key, deserializes it, and returns it. For the index, the LocalDataStore deserializes all products and returns them. Since deserialization is really fast, this will work fine for now. But in the future, I could optimize the Index action by returning a specialized viewmodel (like ‘ProductListViewModel’) that contains a list of products. In my ProductEventHandler, I could simply update the list of products in the view model whenever products are added, changed or removed. That way, my view models are only (re)generated when something meaningful actually happens, instead on every page load. This performance boost and reduction of complexity comes at the cost of data duplication and lack of normalized data. But do we need this in the first place for a read-only web application like this? As long as NServiceBus does it’s job well, the webshop should stay in sync with the CMS. 

But what happens if we have to perform write actions in the webshop? This is where Commands come into play.

Step 7: Sending commands

NServiceBus distinguishes two kinds of messages; events that are published from an endpoint and received by zero to infinite endpoints, and commands that are send from one endpoint to another. This means that a command can be send by multiple endpoints, but it can only be send to one endpoint at a time. If we return to the metaphor of the post office, this is akin to sending a letter to a specific recipient because only they should get the letter. Maybe because it’s private, or because only that person should read the contents.

The difference between publishing (pub/sub) and sending (Commands)

A practical example in our webshop is the purchase of a product. Although I haven’t implemented an actual purchase process, I would like to register somewhere that the product was purchased and is no longer available for purchase. For the purpose of this example, I consider the CMS as my authoritative source for products (and their status). This means that the webshop should send a command to the CMS to mark a product was purchased.

The sending of this command happens in the Webshop’s HomeController:

public ActionResult Purchase(Guid id)
{
   var message = new PurchaseProductCommand() { Id = id };
   bus.Send("CMS", message);
   return RedirectToAction("Index");
}

Note that we’re calling .Send and not .Publish, and we also have to specify an endpoint name. Remember that I described how you have to tell NServiceBus which messages are Events and Commands, either by implementing the ICommand and IEvent interfaces or by using conventions. NServiceBus actually enforces that you can only use .Send for Commands and .Publish for Events, so you can’t make a mistake there.

On the side of the CMS, the ProductEventHandler looks like this:

public class ProductEventHandler : IHandleMessages<PurchaseProductCommand>
{
    private readonly IRepository<Product> productRepository;
    private readonly IProductEventPublication productEventPublication;

    public ProductEventHandler(IRepository<Product> productRepository, IProductEventPublication productEventPublication)
    {
        this.productRepository = productRepository;
        this.productEventPublication = productEventPublication;
    }

    public void Handle(PurchaseProductCommand message)
    {
        var product = productRepository.GetById(message.Id);
        product.Purchased = true;
        productRepository.SaveChanges();

        productEventPublication.ProductWasPurchased(product);
    }
}

Very easy again, don’t you think? But also note that, after marking a product as purchased, I’m actually publishing a message again that the product was purchased. This is intended to synchronize all the webshop instances and make the product unavailable for further purchase. The nice thing here is that NServiceBus can track messages that are ‘correlated’ like this. This allows you to handle cross-transaction applications, including replay and rollback. In NServiceBus these transactions are called ‘sagas’. But that’s for another post.

The flow of messages is nicely visualized in the ServiceInsight tool:

Message flow as visualized by ServiceInsight

Concluding thoughts

Although I was only able to cover the surface of NServiceBus with this post, I hope to have given you some insights into the ‘service bus’ design pattern and how it can be implemented with NServiceBus. Although the example project is a more expansive (and perhaps complex) than most examples you find on the internet (which are often just console apps), I hope that it did give you a better idea on how to implement NServiceBus in multiple web applications. 

Check out the source code

Christiaan Verwijs
Christiaan Verwijs

Scrum Master, Trainer, Developer & founder of Agilistic