NServiceBus has been a well-known messaging and workflow solution for the .NET platform for over a decade. It offers reliability, maintainability, and scalability for distributed workflows. NServiceBus supports various queuing technology, aka transport, such as MSMQ, Azure Service Bus, Rabbit MQ, Amazon SQS, etc. In addition to this, it has support for datastore or persistence such as SQL, Cosmos DB, MongoDB, Raven DB, etc.
Cosmos DB, on the other hand, is a fully managed No SQL database service on Azure. Cosmos DB is robust, and it is growing at a rapid pace. So much so that my 2-year old post on Cosmos DB has a lot of outdated information, and it needs a complete refresh.
This post will discuss how you can use these two world-class products to build your distributed system.
You can download the sample application that I will work on within this post from my GitHub repository.
The post assumes that you have a basic understanding of distributed systems, NServiceBus, and Cosmos DB. To run the solution on your local machine, you would need the following:
As part of this exercise, we will build a new social platform called
Twitbook(naming is hard).
Twitbook is still in beta and only offers two features:
- It allows the author to create a new post
- People can comment on the post
We will offer these two features through API and save the data to Cosmos DB asynchronously using a background worker. We will also use the NServiceBus to make our distributed system reliable and scalable.
Here is the architecture.
The solution structure
Our solution consists of the following three projects:
- Api: Api project receives a request from the client and sends the message to the service bus using NServiceBus. For simplicity and demo, we are using NService Learning transport instead of real service bus. In the real world, we would use transport such as Azure Service Bus.
- Messages: This project contains the commands and events that we will send or publish using NServiceBus.
- Worker: The worker receives a message from service bus, saves it to Cosmos and publishes the event when save is successful. The published event can be subscribed by different services downstream and use for purposes such as updating state of an external system, reporting, and so on.
Building the sample
Api Project – Setting up NServiceBus and Swagger
Let us start with creating the Web API. We will first add NServiceBus using
NServiceBus.Extensions.Hosting nuget package.
For API discovery, we will use Swagger.
Api Project – Create API endpoints
Next, we will add API endpoints to create a post and add comments to the post.
We are using the shiny new minimal APIs to develop our endpoints.
It is important to note that the endpoint will not create the post synchronously. It will put the Command on the bus and return
Accepted to the client.
Messages Project – Commands
AddComment inherit from an interface
IPostIdPartitionKey . The interface contains a single property
PostId which will be the partition key for Post and Comment documents in Cosmos DB. I will talk more about this later. But for now, here is how our messages look like:
Cosmos DB Data structure
We will keep the original post and the associated comments of the post in separate JSON documents sharing the same partition key. It is because comments are unbonded collection and hierarchical data structure is better suited to save them. You can read more about design consideration when working with Cosmos DB here.
We will use
PostId as our partition key in Cosmos.
Worker Project – Domain
Before we go further, let us first look at how our domain objects that we save to Cosmos look.
As you can see, both
Comment domain objects have the property
PostId that serves as a partition key in Cosmos.
Worker Project – NServiceBus Behavior
We will create an NServiceBus behavior that will set the Cosmos DB partition key in the incoming message context in the NServiceBus pipeline. It is a crucial step to get the Cosmos DB transaction batch to work with the NServiceBus persistence session.
The NServiceBus Cosmos DB sample inspires the above code. If you are not familiar with NServiceBus behavior, please see this post.
Next, we will configure NServiceBus for our worker. We need to import
NServiceBus.Persistence.CosmosDB nuget package along with usual
Worker Project – NServiceBus Setup
As you can see in the above code, there are three major points in this step up:
- We use NServiceBus Cosmos persistence. The Cosmos DB connection string points to the emulator for testing purposes. This allows us to use
CosmosPersistenceSessionin our handler.
- We have enabled NServiceBus outbox. Outbox pattern ensures all message handlers are idempotent. It also gurantees atomic transaction distributed across the business data stored in Cosmos DB and the message queued from the message handler. Note that enabling the outbox will create outbox documents within Cosmos DB partition. The default time-to-live or TTL for these documents in 7 days.
- We register
PostIdAsPartitionKeyBehaviorthat we created in the previous step in the NServiceBus pipeline.
Worker Project – Message Handler
Next, we will consume the messages
AddComent sent in by the API in the worker handler.
Let us start with the handler for
AddPostHandler gets the Cosmos persistence session from the message handler context. We use Cosmos DB TransactionBatch to create a new Post in Cosmos and then publish
PostCreated event to the Service Bus. NServiceBus outbox ensures that this operation is idempotent and the transaction is atomic.
Now, let us look at the handler for
AddCommentHandler is slightly more involved.
- We first check if the parent post exists in Cosmos. It is possible that messages may come out of order and add comment request on the post is received before the post exists in the data store. If the post does not exist, we throw an exception. This allows the retry policy to kick-in and NServiceBus retries the message.
- We then update the
LastUpdatedDateof the post. In the
Etagof post that we are updating matches the one we retreive. Cosmos DB follows Last Write Win strategy. Setting
IfMatchETagensures the optimitic concurrency and prevents accidental overriding of the document.
- Next, we add the comment in the same transaction batch to ensure atomicity.
- Last, but not the least, we publish the
Running the sample application
To run the sample application, we need to start Api and Worker projects. Let us create our first post by calling create post endpoint through Swagger.
Next, we use generated
PostId and add our first comment by calling add comment endpoint.
This will save our post and comment to the Cosmos DB.
You will also notice a couple of outbox documents created by NServiceBus in Cosmos.
As mentioned earlier, the default TTL for the outbox document is seven days. It ensures that no duplicate messages are reprocessed within this time range.
A word of caution
Enabling outbox with NServiceBus Cosmos persistence can lead to duplicate messages when the Cosmos DB RU limit is reached. It is an open issue with NServiceBus Cosmos DB SDK. As a workaround, you can increase your Cosmos DB RUs or switch to Cosmos DB Serverless. Alternatively, you can keep your handlers idempotent instead of relying on the outbox feature.
In this post, I have gone into detail about how you can use NServiceBus and Azure Cosmos DB together. I hope you find it helpful. If you have any further questions on this topic, please feel free to reach out to me on my Twitter handle or through the comments section below.
Leave a Reply