Hands-on with Akka Serverless – Part 2

In part 1 of this series we created an application that can store and process data from weather stations around the globe. In this blog we continue on the setup from part 1 to aggregate data over multiple weather stations. At the end of this blog, you’ll have a better view on what it takes to create a realistic feature in Akka Serverless and how it helps you develop faster with less hassle.

Let’s get started!

Picking up where we left off

In part 1 we covered the basics of Akka-Serverless. There we illustrated the basic building blocks of our serverless application, namely Actions, Views and Entities. Our initial, rather primitive application looked like this:

Even with it’s limited set of features, this application is already well equipted for extensibility as well as for elasticity and resilience:

    • Write-actions (the POST endpoint) are decoupled from everything else. There is not a lot of processing going on there, the only thing is does is create and store events based on the inbound requests.

    • The processing of those events to generate the view is decoupled from all the rest. If that processing would be time consuming, it would not interfere with the throughput of the POST endpoint.

    • The read requests (the GET endpoint) are served from a dedicated database: no matter how many requests we get there, it cannot interfere with any other part of the application.

    • Any 3 of the components can fail independently, without tearing down the complete system. Granted, it is possible that the read-state will be not have the latest data, but at least something will be available (cfr. eventual consistency)

    • For this example, the 3 components are part of the same application, but since they communicate with gRPC, (location transparency) they can easily be deployed as separate applications. This makes that every component can be scaled independantly.

For the second part of this tutorial, we want to take it to the next level: what would we need to add/adapt so that we can query, group and aggregate the data in the most flexible way.

Additional requirements of our reactive application

We want to add 2 ways to view the data:

    • Since we know the location of each weatherstation, we want tot group the measurements per country, so that we can make reports per country.
    • what are the most extreme measurements: maximum and minimum, globally (not per weatherstation)

We want a solution that is reactive (elastic and resilient), but also flexible: it should be easy to, later on, add additional ways of grouping/aggregating the data.

(The complete sourcecode is available here: https://github.com/reaktika/reactive-weather-code-pt2 )

Message driven solution

To allow for future extensibility, and to guard the performance of the application, we’ll start by publishing all the measurements of the stations onto a topic. Once the data is there, it can be consumend by other components for futher processing. (Akka-Serverless allows integration with Google Pub/Sub and Confluent Cloud)

This approach gives us a few advantages:

    • The data on the topic is published in a format that is independent from the input-format. It means that the (parts of the) application that processes the data will not be impacted when the format of the input-data changes

    • The (potential expensive) processing of the data cannot interfere with the actual input: Even if there is a lot of slow processing going on, handing inbound requests must be just as efficient.

    • Processing the data means subscribing to the topic where the data is written to. Introducing additional consumers will not change thre throughput of other part of the application. This makes for a extensible design

The design of the application then looks like this:

Let’s take a look at how we can publish the data onto a topic.

Publishing data onto a topic

We’ll introduce an action that subscribes to to the event-journal of the WeatherStationEntity. The intend of this action is to publish everything to a topic: “weatherstation_data”.

As with everything in Akka-Serverless, we start by defining the definition of the action that wil implement this. The option (akkaserverless.method).eventing allows us to define the input for the action, and the destination for the output: the event-journal and the topic respectively.

To make it as easy as possible to later create consumers of this data, we’ll convert every type of event that is in the journal into a new message WeatherStationData, so that the topic is homogeneous.

service WeatherStationToTopicService {
    option (akkaserverless.service) = {
        type : SERVICE_TYPE_ACTION
    };
    rpc PublishStationRegistered(be.reaktika.weatherstation.domain.StationRegistered) returns (WeatherStationData){
        option (akkaserverless.method).eventing = {
            in: {
                event_sourced_entity: "weatherstation";
            }
            out: {
                topic: "weatherstation_data";
            }
        };
    };
}

Now that we have a topic which contains the data of all weatherstations, we can start building functionality which need a broader scope than just 1 weatherstation.

Storing the most extreme measurements

To be able to know what the highest and lowest values are, accross all entities, we need to have a new entity which processes all events, and stores only the extreme values. This one is a ValueEntity. In contrast to EventSourced entities, ValueEntities store only the current state. The framework guarantees that all events within the scope of 1 entity are processed sequentially. It means we don’t need to worry about the problems we can have when multiple weather stations are sending data simultaniously. All that is taken care off automatically, in the most performant and elastic way, thanks to the underlying technology.

For this case, that is exactly what we need.

message WeatherStationExtremesState {
    TemperatureRecord maxTemperature = 1;
    TemperatureRecord minTemperature = 2;
    WindRecord windspeed = 3;
}
message TemperatureRecord {
    be.reaktika.weatherstation.domain.aggregations.TemperatureMeasurement current = 1;
    be.reaktika.weatherstation.domain.aggregations.TemperatureMeasurement previousRecord = 2;
}
message TemperatureMeasurement {
  string station_id = 1;
  double measuredTemperature = 2;
  google.protobuf.Timestamp measurementTime = 3;
}

The implementation processes every value of every weatherstation and updates the internal state whenever a value is encountered that is higher/lower than the current value.

@Override
public Effect<Empty> registerData(WeatherStationExtremesAggregation.WeatherStationExtremesState currentState, WeatherStationAggregation.AddToAggregationCommand command) {
  //check if the received measurement is higher/lower than the current extremes
  //...
  return effects()
        .updateState(newExtremes)
        .thenReply(Empty.getDefaultInstance());
}

If we want to expose this information through the public API, we need another component: We can create a view (extremes_view) which subscribes to the state-changes of the value-entity we just created.

Alternatively, we could also choose to expose the data of the value-entity through an Action, which requests the current state of the value-entity. For our application we’re going for the view. This brings us to the following architecture.

For the next aggregation, we need a slightly different approach.

Grouping data per country

If we want to group measurements per country, we need to know in which country every weatherstation is located. As you might remember from part-1, weatherstations can report their location on a separate endpoint. The requests which contain the actual measurements don’t contain the location of the weatherstation.

Let’s introduce a new value-entity which will keep track of the country for every registered weatherstation. It will subscribe to the same weatherstation_data topic and work like this:

    • if the event contains the location of a weather-station, the entity calls an external geo-coding service to retrieve the corresponding country for the location. If a country is found, this entity will store the country in which the current weatherstation is located in its local state.

    • if the event contains measurements, the entity retrieves the country of the station from its local state, and adds it to the measurements. This updated event is published onto a new topic: measurements_per_country

This is the definition of our GeoCodingEntity

message GeoCodingState {
  map<string,string> stationIdToCountry = 1;
}
//events
message CountryMeasurements {
  string country = 1;
  repeated TemperatureMeasurement temperatures = 3;
}
service GeoCodingEntityService {
  option (akkaserverless.service) = {
    type : SERVICE_TYPE_ENTITY
    component : "be.reaktika.weatherstation.domain.geocoding.GeoCoding"
  };
  rpc RegisterData(be.reaktika.weatherstation.domain.aggregations.AddToAggregationCommand) returns (google.protobuf.Empty);
}

Based on the content of the event, this entity will do 1 of the following 2 things:

    1. request the country for a certain location, and update its internal state

    2. use its internal state to ‘enrich’ the event with country in which the weather station is located, and publish that event on a topic.

Now we can create a view which is bases on the messages that are published on the measurements_per_country-topic.

This gives us the abitity to query the average measurements per country.

service CountryAverage {
  option (akkaserverless.service) = {
    type: SERVICE_TYPE_VIEW
  };
  rpc ProcessCountryMeasurement(be.reaktika.weatherstation.action.geocoding.CountryData) returns (AveragePerCountryState) {
    option (akkaserverless.method).eventing.in = {
      topic: "measurements_per_country"
    };
    option(akkaserverless.method).view.update = {
      table: "averagePerCountry"
      transform_updates: true
    };
  }
  rpc GetCountryAverage(google.protobuf.Empty) returns (AveragePerCountryState) {
    option (akkaserverless.method).view.query = {
      query: "SELECT * from averagePerCountry"
    };
    option (google.api.http) = {
      get: "/weather/percountry"
    };
  }
}

On a diagram, this part of the application looks like this:

Putting everything together

There is 1 piece missing to make all this into a single well-oiled machine. As you might have noticed, both newly introduced entities (ExtremesEntity and GeoCodingEntity) share the same API:

rpc RegisterData(AddToAggregationCommand)

To prevent that every entity (we might add additional types of aggregations in the future) needs to subscribe to the message-bus (Pub/Sub), we’ll introduce a dedicated action for it:

The WeatherStationConsumeAction will subscribe to the weatherstation_data topic and dispatch all messages to every ‘downstream’ entity.

When we put it all together, the application looks like this:

    1. Inbound http-POST requests are processed by the ‘WeatherStationApiAction‘. The requests are translated into commands for the WeatherStation. This is an event-sourced entity which stores all events in a journal.

    2. The ‘WeatherStationAverageView’ subscribes to the event-journal of the ‘WeatherStation’-entity. By consuming the events, it calculates the averages (temparature/windspeed) per entity and stores it in a dedicated store. Those averages are exposed through the api: they can be queries per station_id.

    3. WeatherStationPublishAction’ also subscribes to the event- journal of the ‘WeatherStation’-entity. It’s goal is to publish those events to a topic (weatherstation_data). Every entity-event is mapped to a message of type ‘WeatherStationData‘.

    4. WeatherStationConsumeAction’ subscribes to the weatherstation_data-topic. Every message is dispatched to 2 entities: ‘ExtremesEntity’ and ‘GeoCodingEntity’.

    5. ‘ExtremesEntity’: this entity, for which there is only 1 instance with ID ‘EXTREMES’ stores the extreme measurements (minimum and maximum), together with which station_id which measured them. The ‘ExtremesView’ exposes them in the API.

    6. ‘GeoCodingEntity’ is also an entity for which there is only 1 instance (with ID ‘COUNTRY’) and it does 2 things:

        • If the ‘WeatherStationData’ contains the location of the weather-station, the location-coordinates are used to (by means of a call to OpenCage) retrieve the country in which the weather-station is located. That information is stored in the state of this entity.

        • In other cases, the measurements, enriched with the countrycode, are published on the measurements_per_country-topic.

    7. AveragePerCountryView’ subscibed to the measurements_per_country-topic, calculates the averages per country and stores it in a dedicated store. Those averages are made public through the API

Deployment

Once the Akkaserverless-CLI tools are setup correctly (as described here), deploying this application is 1 command:

> mvn deploy

As you might have noticed, the project only consists of java-files and protobuf-files. There are no:

    • database schemas

    • database connection details

    • configuration to map database-records to java-models.

This example, as simple as it is, uses at least 5 separate data-stores. Setting them up, configuring them, setting up the connection between the application-components and the stores, backing-up, … is all taken care of by the Akka-Serverless platform. In addition to that, Akka-serverless takes care of the communication-plumbing between components too.

This explains where the term databaseless originates from.

To wrap up, here are a few sceenshots of the application in action:

Conclusion

This example builds further on our previous blog-post. This solution is more advanced and illustrates the flexibility of the framework. At the same time it illustrates the advantages of the rigidness of the programming-model of the framework.

The programming-model (based on the 3 types of components) makes it relatively hard to have components depend directly on one-another. That isa good thing: components that rely too much on eachother eventually lead to an unmaintainable mess.

Because of the API-first approach, it’s awkward to implement inter-component dependencies. So, if you need to combine different actions and entities, you are basically forced to introduce an intermediate communication such as a message-bus. With this, instead of relying on the internals of a component directly, a component only relies on something that is provided by another component or something that implements some API. This might seem overkill or overcomplicating things, but in fact, it almost automatically leads to better elasticity and resilience.

For the sake of simplicity , in this example we chose to have all functionality in a single application. However,it is rather easy to split off parts of it into separate applications. For example, if it turns out that there is a lot of load on the ‘AveragePerCountry’ view, it can be isolated in a separate application and scaled independently to multiple nodes, so spread the load. The same can be done with any other part of the application without significant changes of the code.

Another advantage of the structure is that every piece of functionality is independant from eachother: if the GeoCodingEntity for some reason would fail, nothing else would be impacted. Even if the WeatherStationEntity would fail, the aggregated views can still be available. And vice-versa: even if all aggregating entities fail, the weatherstations can still report their measurements.

CURIOUS FOR MORE?

Eager to join Reaktika and help us build the databaseless future? Let’s grab a coffee or check out our open job positions.