Axon

Event Sourcing and CQRS with Axon!

Interested in Event Sourcing? Don’t know where to get started? Don’t want to do all the hard work yourself? You’ve come to the right place!

In this series, we’ll scratch the surface of two frameworks that focus on microservice development in combination with Event Sourcing and Command Query Responsibility Segregation (CQRS).
In this episode, we introduce to you: the Axon framework.

axon framework

This post is related to another episode in this series where we also have a look at the Lagom framework in the context of ES/CQRS.

Let’s have a look at what Axon can offer us in the realms of Event Sourcing and CQRS!

Hey, wait a minute! Event Sourcing? CQRS?

Let’s get started by shortly introducing Event Sourcing and CQRS.

Event Sourcing and CQRS are often used together but one can perfectly exist without the other. However because they play very well together they are often combined.

 is a data-storing technique that is different to more classical data-storage strategies.
More classically, updates to the application state are stored in a database with in-place updates. This means that the previous state is overwritten and only the latest state is preserved. This is sometimes also referred to as destructive updates. Event Sourcing is different in the sense that not the latest state is stored, but all events making up for this state. This is very powerful because we can keep track of how the application state is evolving up until the current point in time. In its core it captures the journey, rather than the destination.

, or short CQRS, is an architectural design pattern where the write side of an application (triggered by Commands) is strictly split from the read side (triggered by Queries).
Commands are used to describe an intent to change the state of the application. Queries on the other side are used to retrieve information from the application.

CQRS splits these concerns which makes it possible to have different models for the read side as for the write side. This makes it possible to decouple them completely and evolve them independently of each other. As typically the read side has different capacity needs as the write side has, splitting the two sides makes it possible to scale the two components independent from each other.

Sounds good… in theory, but where do we start?

A framework that does the heavy lifting for us

Luckily for us, there are frameworks that help us to design exactly this kind of applications. In this series, we will have a look at two of them: Lagom and Axon.

The Axon framework, created by AxonIQ, is mainly focused on building applications that leverage techniques like Event Sourcing, CQRS and Event driven architecture. Also a lot of concepts from Domain Driven Design are 1 to 1 mapped to concepts in Axon.

Let’s get acquainted with them in the context of a real world example: a food ordering application.

axon framework

The food ordering application

Axon has a very good example of a food ordering application. The application is written in Java and showcases the principles of Event Sourcing and CQRS.

The application is about ordering food, obviously, one of the key developments of the modern world! The application allows a user to create a shopping cart, add items to and remove items from the cart and afterwards checkout the order. Simple enough, right?

Let’s jump in by having a look at the application structure using the Axon framework to see Event Sourcing and CQRS in action. Note that we will implement the example application in a plain Scala project to show all the mechanics of an Axon application.

food order application

1| Commands, Events and Queries

As usual, we start by defining the core API of the internal application: the commands, events and queries.

Commands will reflect the intent that users can express to change something in the state of the application, like selecting a product to put into their cart.

case class CreateFoodCartCommand(@(RoutingKey @field) foodCartId: UUID)

case class SelectProductCommand(@(TargetAggregateIdentifier @field) foodCartId: UUID, productId: UUID, quantity: Int)

case class DeselectProductCommand(@(TargetAggregateIdentifier @field) foodCartId: UUID,productId: UUID,quantity: Int)

case class ConfirmOrderCommand(@(TargetAggregateIdentifier @field) foodCartId: UUID)

A command clearly shows the intent of what a user tries to change in the application. Each of the commands requires at least an identifier that will be used to identify the specific aggregate we want to target. The aggregate can be seen as a single state container. In our case this would be a single shopping cart for a given user and is identified by the foodCartId.

These commands will get processed and converted to events (we’ll see how that works in the command handling section). These are the factual changes to the application’s state and can be modeled as follows:

case class FoodCartCreatedEvent(foodCartId: UUID)
case class ProductSelectedEvent(foodCartId: UUID, productId: UUID, quantity: Int)
case class ProductDeselectedEvent(foodCartId: UUID, productId: UUID, quantity: Int)
case class OrderConfirmedEvent(foodCartId: UUID)

Events will get stored in an event store. At that point, the events make up for the factual changes within the system and will form a history. As a reaction to this new fact of life, the application will change the in-memory state in accordance with the event to recreate the actual end state at this point in time.

A last essential piece of our API is a way to get the information out. If a user wants to get information about his specific cart for example, a query can be issued.

case class FindFoodCartQuery(foodCartId: UUID)

Next up, filling the carts with food in the implementation of this API.

2| Command Processing in the Write Model

Commands are received and translated to events to be stored in the event store. The aggregate is the processor that handles all commands for a specific entity in the system, which is identified by the aggregate identifier. In our case, the cart!

First of all, we define the specifics to this container of state:

class FoodCart() {

  @AggregateIdentifier private var foodCartId: UUID = _
  private var selectedProducts = Map.empty[UUID, Int]
  private var confirmed = false
  
  // ...

}

The Axon framework is able to route the commands to the correct container using the TargetAggregateIdentifier field in the command. This is mapped to the correct aggregate, where the identifier is tagged with AggregateIdentifier.

Additionally, we define how this aggregate needs to handle a command. For example, in case of a SelectProductCommand :

@CommandHandler
def handle(c: SelectProductCommand): Unit = {
  AggregateLifecycle.apply(ProductSelectedEvent(c.foodCartId, c.productId, c.quantity))
}

The aggregate receives the command and publishes the ProductSelectedEvent to the Axon life cycle. At this point the heavy lifting starts for Axon. It will make sure the event is stored in the event store in Axon server. As soon as the event is stored, we are sure the state cannot be lost anymore.

It’s time to apply the event to our in-memory state of the cart. In Axon, we do this by defining an EventSourcingHandler:

@EventSourcingHandler
def on(e: ProductSelectedEvent): Unit = {
  selectedProducts = selectedProducts.updatedWith(e.productId) {
    case Some(quantity) =>
      Some(e.quantity + quantity)
    case None =>
      Some(e.quantity)
  }
}

We update the selectedProducts variable in the aggregate to include the newly selected products.

Cool, our write side model is fully up to date and we are ready to receive more events. But what about reading this data now? That’s where the read model comes in.

3| Query Side in the Read Model

The aggregate models a single Cart entity and makes sure the state is up to date to evaluate new incoming commands.
Following the principles of CQRS, we will split query side to a totally separate module. The glue between the two are the events or stateful facts to the application.

We will define a projector that tracks all events and builds a read model:

class FoodCartProjector(foodCartViewRepository: FoodCartViewRepository) {

  @EventHandler
  def on(event: FoodCartCreatedEvent): Unit = {
    val foodCartView = FoodCartView(event.foodCartId, mutable.Map.empty[UUID, Int])
    foodCartViewRepository.save(event.foodCartId, foodCartView)
  }

  @EventHandler
  def on(event: ProductSelectedEvent): Unit = {
    foodCartViewRepository.findById(event.foodCartId).foreach(foodCartView => foodCartView.addProducts(event.productId, event.quantity))
  }
  
  // ...
  
}

The foodCartViewRepositoryrepresents a specific persistence model that is designed to answer queries from the user. The repository can save the results to a separate database to make sure the latest state is immediately available to be served as a response to issued queries to the application.

This can be done by defining a query handler in the Projector that does exactly that:

@QueryHandler
def handle(query: FindFoodCartQuery): FoodCartViewResult = {
  FoodCartViewResult(foodCartViewRepository.findById(query.foodCartId))
}

Time to put it all together!

4| Putting It All Together

The application is now capable of receiving commands and answering queries, while the events underneath model the state of the application. But this is all internal to the application and a user of the application should not care. A last important step is to expose the functionality of the application to the outer world.

Let’s create an HTTP controller that is able to expose the application over REST. Any framework could be used to expose the application, so we’ll use the lightweight Akka-HTTP framework here:

class FoodOrderingController(config: Configuration) {
  // ...
  val route: Route = 
    pathPrefix("foodCart") {
      concat(
        path("create") {
          post {
            val response: CompletableFuture[UUID] = config.commandGateway().send(CreateFoodCartCommand(UUID.randomUUID()))
            complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, s"${response.get}"))
          }
        },
        pathPrefix(JavaUUID) { foodCartId =>
          concat(
            pathPrefix("select" / JavaUUID / "quantity" / IntNumber) { (productId, quantity) =>
              post {
                val _ = config.commandGateway().send(new SelectProductCommand(foodCartId, productId, quantity)).get
                complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, s"Selected"))
              }
            },
            // ...
            pathEnd {
              get {
                val response = config.queryGateway()
                  .query(
                    FindFoodCartQuery(foodCartId),
                    ResponseTypes.instanceOf(classOf[FoodCartViewResult])
                  ).get
                complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, s"$response"))
              }
            }
          )
        }
      )
    }

  Http().bindAndHandle(route, "localhost", 8080)
}

For convenience, we only show the endpoints to add some products to the cart and to query the content of the cart.

The bridge between the REST controller and the Axon framework is the commandGateway and queryGateway. These are the hooks to the Axon framework to allow us to issue a command in the application or query some information about our carts. And that’s about it!

Make sure to checkout the full source code on Github.

Conclusion

If we are looking at frameworks that aim to help us build Event Sourcing applications with CQRS, Axon is designed with these concepts in mind.

Concepts like Commands, Events and Queries are modeled right into the framework which makes it very natural to design an ES/CQRS application. The event store in Axon server is specifically designed for the purpose which makes it tailored to the needs and characteristics of an event sourced application.

All in all, developing an event sourced application feels very natural using the Axon framework. It supplies you with all the necessary tools to easily get started with Event Sourcing and CQRS in your applications today.