event souring cqrs lagom

Event Sourcing and CQRS with Lagom!

Interested in Event Sourcing? Don’t know where to get started? Don’t want to do all the hard work yourself? You’ve come to the right place!

In this series, we’ll scratch the surface of two frameworks that focus on microservice development in combination with Event Sourcing and Command Query Responsibility Segregation (CQRS).

In this episode, we introduce to you: the Lagom framework.

event souring cqrs lagom

This post is related to another episode in this series where we also have a look at the Axon framework in the context of ES/CQRS.

But without further ado, let’s dive into the magical world of Event Sourcing and CQRS!

Hey, wait a minute! Event Sourcing? CQRS?

Let’s get started by shortly introducing Event Sourcing and CQRS.

Event Sourcing and CQRS are often used together but one can perfectly exist without the other. However because they play very well together they are often combined.

 is a data-storing technique that is different to more classical data-storage strategies.
More classically, updates to the application state are stored in a database with in-place updates. This means that the previous state is overwritten and only the latest state is preserved. This is sometimes also referred to as destructive updates. Event Sourcing is different in the sense that not the latest state is stored, but all events making up for this state. This is very powerful because we can keep track of how the application state is evolving up until the current point in time. In its core it captures the journey, rather than the destination.

, or short CQRS, is an architectural design pattern where the write side of an application (triggered by Commands) is strictly split from the read side (triggered by Queries).
Commands are used to describe an intent to change the state of the application. Queries on the other side are used to retrieve information from the application.

CQRS splits these concerns which makes it possible to have different models for the read side as for the write side. This makes it possible to decouple them completely and evolve them independently of each other. As typically the read side has different capacity needs as the write side has, splitting the two sides makes it possible to scale the two components independent from each other.

Sounds good… in theory, but where do we start?

A framework that does the heavy lifting for us

Luckily for us, there are frameworks that help us to design exactly this kind of applications. In this series, we will have a look at two of them: Lagom and Axon.

While Axon is designed with a clear focus on Event Sourcing and CQRS, the Lagom framework, created by Lightbend, is a more general microservice framework which includes some abstractions to build ES/CQRS applications.

Let’s get acquainted with them in the context of a real world example: a food ordering application.

axon framework

The food ordering application

Lagom has a very good example of a food ordering application. The application is written in Java and showcases the principles of Event Sourcing and CQRS.

The application is about ordering food, obviously, one of the key developments of the modern world! The application allows a user to create a shopping cart, add items to and remove items from the cart and afterwards checkout the order. Simple enough, right?

Let’s jump in by having a look at the application structure using the Lagom framework to see Event Sourcing and CQRS in action.

Conceptual model of the Food Ordering application

1| Commands, Events and Queries

In the philosophy of ES/CQRS, we start by defining the core API of the internal application: the commands, events and queries.

Commands will reflect the intent that users can express to change something in the state of the application, like selecting a product to put into their cart.

sealed trait FoodCartCommand extends FoodCartCommandSerializable

case class CreateCartCommand(foodCartId: UUID, replyTo: ActorRef[CartCreated])
  extends FoodCartCommand  
case class SelectProductCommand(foodCartId: UUID, productId: UUID, quantity: Int, replyTo: ActorRef[Confirmation])
  extends FoodCartCommand  
case class DeselectProductCommand(foodCartId: UUID, productId: UUID, quantity: Int, replyTo: ActorRef[Confirmation])
  extends FoodCartCommand  
case class ConfirmOrderCommand(foodCartId: UUID, replyTo: ActorRef[Confirmation])
  extends FoodCartCommand

A command clearly shows the intent of what a user tries to change in the application. Each of the commands requires at least an identifier that will be used to identify the specific aggregate we want to target. The aggregate can be seen as a single state container. In our case this would be a single shopping cart for a given user and is identified by the foodCartId.

These commands will get processed and converted to events (we’ll see how that works in the command handling section). These are the factual changes to the application’s state and can be modeled as follows:

sealed trait FoodCartEvent extends AggregateEvent[FoodCartEvent] {
  override def aggregateTag: AggregateEventTag[FoodCartEvent] = FoodCartEvent.Tag
}

case class FoodCartCreatedEvent(foodCartId: UUID)
  extends FoodCartEvent
case class ProductSelectedEvent(foodCartId: UUID, productId: UUID, quantity: Int) 
  extends FoodCartEvent
case class ProductDeselectedEvent(foodCartId: UUID, productId: UUID, quantity: Int)
  extends FoodCartEvent
case class OrderConfirmedEvent(foodCartId: UUID)
  extends FoodCartEvent

Events will get stored in an event store. At that point, the events make up for the factual changes within the system and will form a history. As a reaction to this new fact of life, the application will change the in-memory state in accordance with the event to recreate the actual end state at this point in time.

A last essential piece of our API is a way to get the information out. If a user wants to get information about his specific cart for example, a query can be issued. This aspect is modeled using a Query repository:

trait FoodCartRepository {
  def getCatalog(foodCartId: UUID): Option[FoodCartView]
}

We’ll get back to the query side later.

Next up, filling the carts with food in the implementation of this API.

2| Command Processing in the Write Model

Commands are received and translated to events to be stored in the event store. The aggregate is the processor that handles all commands for a specific entity in the system, which is identified by the aggregate identifier. In our case, the cart! A lot of the heavy lifting is handled by the framework, such as routing the command to the correct aggregate in the application.

First of all, we define the specifics to this container of state:

def create(persistenceId: PersistenceId) = 
  EventSourcedBehavior
    .withEnforcedReplies[FoodCartCommand, FoodCartEvent, FoodCartState](
      persistenceId = persistenceId,
      emptyState = FoodCartState.initial,
      commandHandler = (state, cmd) => state.applyCommand(cmd),
      eventHandler = (state, evt) => state.applyEvent(evt)
    )

We create an EventSourcedBehavior which models the aggregate. As we can see we define the scope of the aggregate (with the persistenceId), the initial state and the handlers for command and event messages.

When looking at the command handler below, we see that all types of commands can be processed and a reply is issued to the requester. Scala has this nice feature called pattern matching which warns you at compile time if you have forgotten to implement handling one of the Command types. We have also included the specific handling of the SelectProductCommand:

def applyCommand(cmd: FoodCartCommand): ReplyEffect[FoodCartEvent, FoodCartState] =
    cmd match {
      case x: CreateCartCommand => onCreation(x)
      case x: SelectProductCommand => selectProduct(x)
      case x: DeselectProductCommand => deselectProduct(x)
      case x: ConfirmOrderCommand => confirmOrder(x)
    }
  // ...

private def selectProduct(cmd: SelectProductCommand): ReplyEffect[FoodCartEvent, FoodCartState] =
  Effect
    .persist(ProductSelectedEvent(cmd.foodCartId, cmd.productId, cmd.quantity))
    .thenReply(cmd.replyTo) { _ =>
      Accepted
    }

In the handler we first make sure the event gets persisted in the event store using Effect.persist. In our implementation we use Cassandra as the event store. Lagom comes with a plugin to make persisting to Cassandra effortless.
As soon as the event is stored, the aggregate replies with Accepted to the original requester which is referenced by the replyTo address. (psst, an indication that Lagom uses Akka under the hood)

After persisting the event, the framework makes sure the event is fed into the aggregate so that the state of the aggregate can be updated.

def applyEvent(evt: FoodCartEvent): FoodCartState = {
    evt match {
      case FoodCartCreatedEvent(_) => 
        this
      case ProductSelectedEvent(_, productId, quantity) => 
        updateProducts(productId, quantity)
      case ProductDeselectedEvent(_, productId, quantity) => 
        deselectProducts(productId, quantity)
      case OrderConfirmedEvent(_) => 
        confirmOrder()
    }
  }

private def updateProducts(productId: UUID, quantity: Int): FoodCartState = {
  val newProductsMap = products.updatedWith(productId) {
    case Some(value) => Some(value + quantity)
    case _ => Some(quantity)
  }
  copy(products = newProductsMap)
}

The event is processed and the internal state is adapted to take the latest event into account. For the functional programming enthusiasts: mind the pure function applyEvent that receives an event and produces a new FoodCartState. This is a nice indication of the functional focus of the Scala API in Lagom.

This wraps up the aggregate implementation and thus the write side of the application. Now let’s have a look at how Lagom handles the read side of things, a.k.a. the query side.

3| Query Side in the Read Model

In Lagom, the read side is covered by a concept called Read Side Processor.
This means that we can create a separate model for the read side (CQRS) and the data is fed into this read model using the events that are created. Any persistence type can be implemented to be optimal for the queries at hand. Lagom provides a couple of adapters out of the box like a Cassandra plugin and a plugin for relational database support.

For now, we will implement an in-memory repository similar to the Axon application:

class FoodCartProcessor()(implicit ec: ExecutionContext)
  extends ReadSideProcessor[FoodCartEvent]{

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[FoodCartEvent] = {
    new ReadSideHandler[FoodCartEvent] {
      override def globalPrepare(): Future[Done] =
        FoodCartViewRepository.createTables()

      override def prepare(tag: AggregateEventTag[FoodCartEvent]): Future[Offset] =
        FoodCartViewRepository.loadOffset(tag)

      override def handle(): Flow[EventStreamElement[FoodCartEvent], Done, NotUsed] =
        Flow[EventStreamElement[FoodCartEvent]]
          .mapAsync(1) { eventElement =>
            FoodCartViewRepository.handleEvent(eventElement.event, eventElement.offset)
          }
    }
  }

  override def aggregateTags: Set[AggregateEventTag[FoodCartEvent]] =
    Set(FoodCartEvent.Tag)
}

The FoodCartRepository the responsibility to keep the query model in-memory for fast serving.
As we can see, updates in the form of events are fed into the repository one by one to update the read side model.

Time to put it all together!

4| Putting It All Together

Lagom is in its core a microservice oriented framework. Let’s have a look at how Lagom abstracts over the interface of the application to the outside world. In the end, it’s our end goal to expose the business functionality to the users that want to order food.

The FoodOrderingService provides a blueprint of what the interface of the application looks like:

trait FoodOrderingService extends Service {

  def createCart(): ServiceCall[NotUsed, UUID]

  def selectItem(foodCartId: UUID, productId: UUID, quantity: Int): ServiceCall[NotUsed, Done]

  def deselectItem(foodCartId: UUID, productId: UUID, quantity: Int): ServiceCall[NotUsed, Done]

  def confirmCart(foodCartId: UUID): ServiceCall[NotUsed, Done]

  def getFoodCartItems(foodCartId: UUID): ServiceCall[NotUsed, FoodCartViewResult]

  override final def descriptor: Descriptor = {
    import Service._
    named("food-ordering")
      .withCalls(
        pathCall("/foodCart/create", createCart _),
        pathCall("/foodCart/:foodCartId/select/:productId/quantity/:quantity", selectItem _),
        pathCall("/foodCart/:foodCartId/deselect/:productId/quantity/:quantity", deselectItem _),
        pathCall("/foodCart/:foodCartId/confirm", confirmCart _),
        pathCall("/foodCart/:foodCartId", getFoodCartItems _)
      )
      .withAutoAcl(true)
  }
}

The endpoints are defined but are decoupled from the transport protocol (e.g. REST / gRPC) that will be used.
Each service call is typed and defines the structure of the Request as well as the Response.

If we take a look at the implementation, we see how we can wire the service calls to the command model:

class FoodOrderingServiceImpl(clusterSharding: ClusterSharding,persistentEntityRegistry: PersistentEntityRegistry)(implicit ec: ExecutionContext)
  extends FoodOrderingService {

  private def aggregateRef(id: UUID): EntityRef[FoodCartCommand] =
    clusterSharding.entityRefFor(FoodCartState.typeKey, id.toString)

  // ...

  override def selectItem(foodCartId: UUID, productId: UUID, quantity: Int): ServiceCall[NotUsed, Done]= ServiceCall { _ =>
    aggregateRef(foodCartId)
      .ask[Confirmation](
        replyTo => SelectProductCommand(foodCartId, productId, quantity, replyTo)
      )
      .map {
        case Accepted => Done
        case _        => throw BadRequest("Can't select the specified products.")
      }
  }

In the case of adding a product to the cart, we’ll first lookup the reference to the correct aggregate by using the entityRefFor method. Again a part where the framework is doing the heavy lifting for us. Since the state container is a cart, the lookup is based on the foodCartId. The command is sent to the aggregate for processing. Afterwards the message Done will be returned to the original requester.

On the query side, we see that the implementation can use the read model directly to fulfill the query request:

override def getFoodCartItems(foodCartId: UUID): ServiceCall[NotUsed, FoodCartViewResult] = { _ =>
  val content = FoodCartViewRepository.getCatalog(foodCartId)
  Future.successful(FoodCartViewResult(content))
}

There we have it! A Lagom application to help us create a ES/CQRS application. Lagom also comes with a convenient build command runAll to launch the application, as well as the Cassandra database for local testing.

Make sure to checkout the full source code on Github.

Conclusion

If we are looking at frameworks that aim to help us build Event Sourcing applications with CQRS, Lagom is clearly capable of doing exactly that.

Lagom is less opinionated with regard to Event Sourcing and CQRS as Axon is, which is for example visible in the fact that queries are less explicitly modeled in the Lagom API. Also the wiring of commands to aggregates needs a bit more manual work, while this is abstracted away in Axon. Nonetheless is Event Sourcing very explicit when using the EventSourcedBehavior and is CQRS specifically supported with the Read Side Processor API to split the read model from the write model. For testing purpose it’s really helpful to leverage the pure functional constructs that are present in the Lagom API.

All in all, Lagom is a framework that does a great job in supplying the necessary tools to easily get started with Event Sourcing and CQRS in your applications today.