blog

Real-Time Traffic Monitoring with Kalix

The Kalix Team at Lightbend
  • 27 June 2022,
  • 20 minute read

Serverless platforms have gained popularity over the last few years thanks to their ability to scale naturally with fluctuating traffic.

However, developers must pay special attention to state management, as serverless platforms tend to automatically create and destroy microservice instances when traffic increases and decreases. This means we must save all persistent data to an external location.

Kalix provides a unique take on microservices by abstracting the underlying data storage from the code developers write. This abstraction allows us to focus on our API while leaving the details of persisting data to the Kalix platform.

In this post, we’ll create a service hosted by Kalix to simulate a real-time vehicle tracking system. The service accepts vehicle position updates from a mock service written using Spring and converts the positions into the General Transit Feed Specification real-time (GTFS-RT) feed. This feed format is an open standard for public transit agencies to publish their transit data.

Download the Sample Code

The Kalix sample project is available from this GitHub repo, and the Spring client is available from this GitHub repo.

Bootstrap the Sample Application

We’ll build the application’s skeleton using a Maven archetype.

Run the following command to create the initial code:

mvn
  archetype:generate
  -DarchetypeGroupId=io.kalix
  -DarchetypeArtifactId=kalix-maven-archetype
  -DarchetypeVersion=1.0.2

Note: Update 1.0.2 to the latest version of Kalix.

Maven prompts for the group ID, artifact ID, and default package. For this tutorial, we use a group ID, a package named com.example, and an artifact ID of realtimetransitdemo.

Remove the Sample Files

Kalix applications expose the API as gRPC services, with the API and entities defined in protocol buffer descriptors. The sample application defines a simple counter API with two descriptor files called src/main/proto/org/example/domain/counter_domain.proto and src/main/proto/org/example/counter_api.proto. We don’t use these sample files, so delete them. We replace these files with our own to expose the traffic tracking API.

Define the Domain Entities

Let’s start by defining the entities used by our application. Since our application tracks the position of vehicles, the entity is called VehiclePositionState and defines the fields relating to the position of a vehicle.

Create a file called src/main/proto/com/example/domain/vehicle_position_domain.proto with the following contents:

syntax = "proto3";

package com.example.domain;

option java_outer_classname = "VehiclePositionDomain";

message VehiclePositionState {
  string entity_id = 1;
  float bearing = 2;
  float latitude = 3;
  float longitude = 4;
  float odometer = 5;
  float speed = 6;
}

Let’s break this file down.

We start by defining the syntax we use in the file, which is proto3.

syntax = "proto3";

The package option defines the package into which we put the Java classes. The base package matches the package passed to the Maven command used to bootstrap the application and then appends the path’s domain to indicate that the classes represent our domain entities:

package com.example.domain;

The java_outer_classname option defines the name of the Java class generated to match the message:

option java_outer_classname = "VehiclePositionDomain";

The final content in this file defines a message. As we see later, the proto compiler converts messages to Java classes. Our VehiclePositionState message contains a number of fields relating to the position of a vehicle. These fields map to a GTFS position.

Note that while we’ve closely matched the fields in our VehiclePositionState message to the GTFS standard, we don’t use this message directly to create the final GRTF-RT feed. Our VehiclePositionState message defines the structure of the data persisted by Kalix, and the values are read and transformed into a GTFS-RT feed later.

message VehiclePositionState {
  string entity_id = 1;
  float bearing = 2;
  float latitude = 3;
  float longitude = 4;
  float odometer = 5;
  float speed = 6;
}

With the domain entity defined, the next step is to define the API.

Define the API

Like the domain entities, a protobuf file defines our API. Create a file called src/main/proto/com/example/api/vehicle_position_api.proto with the following contents:

syntax = "proto3";
 
import "google/protobuf/empty.proto";
import "kalix/annotations.proto";
import "google/api/annotations.proto";

package com.example.api;

option java_outer_classname = "VehiclePositionApi";

message VehiclePosition {
  float bearing = 1;
  float latitude = 2;
  float longitude = 3;
  float odometer = 4;
  float speed = 5;
}

service VehiclePositionService {
  option (kalix.codegen) = {
    value_entity: {
      name: "com.example.domain.VehiclePosition"
      entity_type: "vehicle_positions"
      state: "com.example.domain.VehiclePositionState"
    }
  };

  rpc AddPosition(VehiclePosition) returns (google.protobuf.Empty) {
    option (kalix.method).entity.key_generator = VERSION_4_UUID;
    option (google.api.http) = {
      post: "/position",
      body: "*"
    };
  }
}
 

This file shares many common statements with the entity definition such as syntax, package, and the java_outer_classname option. The import statements expose the Kalix and gRPC functionality that we use in our API:

import "google/protobuf/empty.proto";
import "kalix/annotations.proto";
import "google/api/annotations.proto";

We then define a message to describe the data sent and received by the API. So, whereas the VehiclePositionState message created earlier defines the persisted data structure, VehiclePosition defines the structure of the data exposed by the API. These two messages are very similar, and it may seem redundant to define this structure twice. However, keeping the public API separate from the persisted data provides a degree of flexibility as the API evolves:

message VehiclePosition {
  float bearing = 1;
  float latitude = 2;
  float longitude = 3;
  float odometer = 4;
  float speed = 5;
}

We then define the service that exposes our API:

service VehiclePositionService {

Our service exposes a value entity field, which Kalix persists for us. The value_entity field indicates that we want the codegen to generate a Value Entity for this service:

option (kalix.codegen) = {
	value_entity: {

The name field denotes the base name for the Value Entity, which we use to generate the class exposing the API methods and other test classes.

name: "com.example.domain.VehiclePosition"

The entity_type field defines a unique identifier called state storage. We use this type when querying entities later.

entity_type: "vehicle_positions"

The state field defines the protobuf message representing the Value Entity’s state which Kalix keeps. This was the message we previously defined as the domain entity:

state: "com.example.domain.VehiclePositionState"

We must now define a service method to allow clients to submit their vehicle positions called AddPosition. It takes a VehiclePosition message and returns nothing (indicated by google.protobuf.Empty):

rpc AddPosition(VehiclePosition) returns (google.protobuf.Empty) {

We then instruct Kalix to generate a unique entity ID with a randomly generated GUID with the (kalix.method).entity.key_generator option:

option (kalix.method).entity.key_generator = VERSION_4_UUID;

By default, gRPC clients consume services exposed by Kalix. However, we can also expose services as regular HTTP endpoints with the (google.api.http) option. Here, we expose this method as a HTTP POST request on the path /positions:

option (google.api.http) = {
  post: "/position",
  body: "*"
};

This API allows our application to persist new vehicle positions with both gRPC and HTTP calls. The next step is to expose the vehicle positions as a collection.

Define a view

Kalix exposes collections of entities through a view. Our view is defined in a file called src/main/proto/com/example/view/vehicle_position_view.proto with the following contents:

syntax = "proto3";
 
package com.example.view;

option java_outer_classname = "VehiclePositionView";

import "com/example/domain/vehicle_position_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/empty.proto";

service AllVehiclePositions {
  option (kalix.codegen) = {
    view: {}
  };

  rpc UpdateVehiclePosition(com.example.domain.VehiclePositionState) returns (com.example.domain.VehiclePositionState) {
    option (kalix.method).eventing.in = {
      value_entity: "vehicle_positions"
    };
    option (kalix.method).view.update = {
      table: "vehicle_positions"
    };
  }

  rpc GetAllVehiclePositions(google.protobuf.Empty) returns (stream com.example.domain.VehiclePositionState) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM vehicle_positions"
    };
  }
}

Like the other protocol buffer descriptors, this view includes common settings like syntax, package, the java_outer_classname option, and imports. We then define a new service called AllVehiclePositions to expose the view:

service AllVehiclePositions {

The (kalix.codegen) option indicates that this definition generates a view:

option (kalix.codegen) = {
  view: {}
};

The UpdateVehiclePosition method defines how to update the view as vehicle position entities are created:

rpc UpdateVehiclePosition(com.example.domain.VehiclePositionState) returns (com.example.domain.VehiclePositionState) {

Here, we define the source entities that make up the view. The value_entity field matches the entity_type field in the API exposing the vehicle position entities:

option (kalix.method).eventing.in = {
  value_entity: "vehicle_positions"
};

We then specify that this method is for updating a view. The table field can be set to anything, and we use this value in the simplified SQL syntax we use next to return the entities that we want the view to expose:

option (kalix.method).view.update = {
  table: "vehicle_positions"
};

We now define a method called GetAllVehiclePositions to return the stream of vehicle positions. This method takes no arguments, indicated by google.protobuf.Empty. It then returns a stream of VehiclePositionState objects, indicated by stream com.example.domain.VehiclePositionState:

rpc GetAllVehiclePositions(google.protobuf.Empty) returns (stream com.example.domain.VehiclePositionState) {

The (kalix.method.view.query) option allows us to define a simplified SQL query that defines the entities this view returns. Here, we return all entities, referencing the table defined above:

option (kalix.method).view.query = {
  query: "SELECT * FROM vehicle_positions"
};

This view exposes the vehicle positions saved by the service. The next step is to convert these entities into a GTFS-RT stream that other GTFS compatible tools can consume.

Convert the Entities with an Action

An action is a stateless method triggered by an event like a gRPC or an HTTP call. We create an action to query the view for a list of vehicle positions and convert the result to a GTFS-RT feed.

Create a file called src/main/proto/com/example/api/gtfs_feed.proto with the following contents:

syntax = "proto3";
package com.example.api;
 
import "kalix/annotations.proto";
import "google/api/annotations.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/empty.proto";
 
option java_outer_classname = "GtfsFeedApi";
 
service GtfsFeed {
  option (kalix.codegen) = {
    action: {}
  };
 
  rpc GetGtfsFeed(google.protobuf.Empty) returns (google.protobuf.BytesValue);
}

Here, we define a service called GtfsFeed:

service GtfsFeed {

We configure Kalix to generate an action from this definition

option (kalix.codegen) = {
  action: {}
};

The action exposes a single method called GetGtfsFeed that has no arguments and returns a byte array. This byte array holds the binary blob containing the GTFS feed:

rpc GetGtfsFeed(google.protobuf.Empty) returns (google.protobuf.BytesValue);

We now have everything in place to expose our API to consumers. The final step is to incorporate the GTFS API into our application.

Define the GTFS API

Coincidentally, GTFS encodes its feed as a binary protobuf blob. This means a protobuf descriptor defines the GTFS API. Because our application is already configured to compile protobuf descriptors into Java classes, all we must do is download the GTFS protobuf and save it in our application.

Create a file called src/main/proto/com/example/domain/gtfs-realtime.proto and fill it with the GTFS protobuf.

Notice we have not written any Java code yet. The bulk of the initial work creating services is defining the entities and API in protobuf descriptors. The next step is to generate the Java code from the protobuf files.

Generate the code

We generate the Java code from the protobuf files using Maven with the following command:

mvn package

As part of the build process, Maven creates the following files:

While protobuf descriptors are great for describing our API, they often don’t describe how the API is to function. This means we must update some of the generated files to implement our desired functionality.

A number of other supporting classes are also created in the target/generated-sources directory, including the classes representing the GTFS API. However, every build recreates files in the target directory, so we do not have to modify them.

Implement the Vehicle Position API

In the VehiclePosition class, replace the generated emptyState method with the following code, which returns a default instance of our VehiclePositionState class:

@Override
public VehiclePositionDomain.VehiclePositionState emptyState() {
	return VehiclePositionDomain.VehiclePositionState.getDefaultInstance();
}

Then replace the generated addPosition method with the following code:

@Override
public Effect<Empty> addPosition(
  VehiclePositionDomain.VehiclePositionState currentState,
  VehiclePositionApi.VehiclePosition vehiclePosition) {
  LOG.info("Entity Id: " + entityId);

Here we take the supplied VehiclePosition object and construct the internal VehiclePositionState representation of it:

 final VehiclePositionDomain.VehiclePositionState state = VehiclePositionDomain.VehiclePositionState.newBuilder()
    .setEntityId(entityId)
    .setLatitude(vehiclePosition.getLatitude())
    .setLongitude(vehiclePosition.getLongitude())
    .setBearing(vehiclePosition.getBearing())
    .setOdometer(vehiclePosition.getOdometer())
    .setSpeed(vehiclePosition.getSpeed())
    .build();

The returned effect instructs Kalix to update the entity in the persistent data store and then returns an empty object to the caller:

  return effects().updateState(state).thenReply(Empty.getDefaultInstance());
}

Implement the GTFS-RT Feed API

We are now at the point where we implement the most important part of the API, generating the GTFS-RT feed. Before we dive into the code, it is worth looking at what a GTFS-RT feed is and how it differs from the other services we have created.

Like other services, GTFS exposes information in protobufs. However, services mostly hide the protobuf data sent over the wire from developers and consumers of the services. GTFS, on the other hand, treats a protobuf binary blob as the primary artifact created by a feed service and consumed by GTFS clients.

What this means in practice is that, while Kalix services interact with Java classes (constructed for you at build time from the protobuf files) that provide an abstraction over the protobuf messages sent and received via gRPC, the GTFS feed produces the raw protobuf binary blob containing the feed as the output of an HTTP method call. We see this in action with the following code that replaces the generated getGtfsFeed method:

@Override
public Effect<BytesValue> getGtfsFeed(Empty empty) {
  try {

We start by calling the view to retrieve the list of vehicle position entities. The services call each other much the same way an external service would by making either a gRPC or HTTP call. But when calling services defined in the same application, the actionContext().getGrpcClient method provides a convenient way to access a gRPC service stub:

final AllVehiclePositions service = actionContext().getGrpcClient(AllVehiclePositions.class, "realtimetransitdemo");

The collection of vehicle position entities is retrieved by calling the getAllVehiclePositions method:

final Source<VehiclePositionState, NotUsed> positions = service.getAllVehiclePositions(Empty.getDefaultInstance());

The returned object is a stream that we convert to a strict collection (a list in this case):

final CompletionStage<List<VehiclePositionState>> positionsList = positions
  .take(1000)
  .runWith(Sink.seq(), creationContext.materializer());

We must then add the list of vehicle positions to a GTFS FeedMessage.

final Builder feedMessageBuilder = FeedMessage.newBuilder();

The feed message has a header defining high level details of the GTFS-RT feed:

final FeedHeader header = FeedHeader.newBuilder()
  .setGtfsRealtimeVersion("2.0")
  .setIncrementality(Incrementality.FULL_DATASET)
  .setTimestamp(System.currentTimeMillis() / 1000)
  .build();
  feedMessageBuilder.setHeader(header);

Each vehicle position is then added to the feed:

positionsList.toCompletableFuture().get().forEach(p -> addVehiclePositions(feedMessageBuilder, p));

Once the feed populates, we call the build method to build the final object:

final FeedMessage feedMessage = feedMessageBuilder.build();

Because binary protobuf blobs are hard to read, we dump the text version of the feed to the logs. This allows us to verify that the feed looks like it should:

LOG.info(feedMessage.toString());

Finally, we return the binary protobuf blob to the sender:

return effects().reply(BytesValue.of(feedMessage.toByteString()));
} catch (Exception e) {

We log any expectations and rethrow them as unchecked exceptions:

LOG.error(e + "\n" + ExceptionUtils.getStackTrace(e));
throw new RuntimeException(e);

The addVehiclePositions method converts the internal representation of a vehicle position into an object that we can add to a GTFS feed:

public void addVehiclePositions(
  final Builder builder,
  final VehiclePositionState position) {
 
  final Position gtfsPosition = Position.newBuilder()
    .setBearing(position.getBearing())
    .setLatitude(position.getLatitude())
    .setLongitude(position.getLongitude())
    .setOdometer(position.getOdometer())
    .setSpeed(position.getSpeed())
    .build();
 
  final VehiclePosition vehiclePosition = VehiclePosition.newBuilder()
    .setPosition(gtfsPosition)
    .build();
 
  final FeedEntity entity = FeedEntity.newBuilder()
    .setId(position.getEntityId())
    .setVehicle(vehiclePosition)
    .build();
 
	builder.addEntity(entity);
}

We now have a functional GTFS API. The next step is to expose the service publicly.

Deploy and Expose the Service

Follow the Kalix documentation to build and deploy the service. Note that you need to update the dockerImage element in the pom.xml file to reflect a Docker registry to which you have the ability to upload. Once deployed, you see the service in the Kalix dashboard.

Kalix Console Image One

We also must expose the service on an automatically generated hostname by running the kalix services expose command. Once exposed, the service shows a route:

Kalix Console Image Two

Test the Service

We can use the grpcurl tool to access the gRPC endpoints. We start by adding a sample vehicle position with the following command (make sure to replace steep-pond-8195.us-east1 with the hostname assigned to your service):

grpcurl -d '{"bearing": 1,"latitude": 2,"longitude": 3,"odometer": 4,"speed": 5}' frosty-moon-3947.us-east1.kalix.app:443 com.example.api.VehiclePositionService/AddPosition

Note that PowerShell users must escape the quotes like this:

grpcurl -d '{\"bearing\": 1,\"latitude\": 2,\"longitude\": 3,\"odometer\": 4,\"speed\": 5}' frosty-moon-3947.us-east1.kalix.app:443 com.example.api.VehiclePositionService/AddPosition

We can return the full list of vehicle positions with the command:

grpcurl -d '{}' frosty-moon-3947.us-east1.kalix.app:443 com.example.view.AllVehiclePositions/GetAllVehiclePositions

We then return the GTFS-RT feed with the command:

grpcurl -d '{}' frosty-moon-3947.us-east1.kalix.app:443 com.example.api.GtfsFeed/GetGtfsFeed

That last command returns a base 64 encoded string containing the protobuf binary blob representing the GTFS-RT feed. To extract the original content, we first strip the JSON string quotes using jq and then decode the base 64 string with base64 with the command:

grpcurl -d '{}' frosty-moon-3947.us-east1.kalix.app:443 com.example.api.GtfsFeed/GetGtfsFeed | jq -r . | base64 -d

We can confirm this is a valid GTFS-RT feed by viewing it with a client tool like gtfs-rt-printer. The following command saves the binary blog to a file and then loads it with gtfs-rt-printer:

grpcurl -d '{}' frosty-moon-3947.us-east1.kalix.app:443 com.example.api.GtfsFeed/GetGtfsFeed | jq -r . | base64 -d > positions.pb; java -jar gtfs-rt-printer-1.2.0.jar positions.pb

The output should show something similar to the following:

loading positions.pb
feed contains 35 entities
position {
  latitude: 57.518112
  longitude: 40.318607
  bearing: 288.91757
  odometer: 30779.123046875
  speed: 51.530975
}
...

This output confirms we have successfully generated a valid GTFS-RT feed.

Mock Positions with a Spring Client

To simulate vehicles adding their positions to the traffic tracker, we create a simple Spring Boot application that posts random positions on a fixed schedule. The client is based on the instructions for creating scheduled applications on the Spring website. We then add the following component to the sample app, which uses a web client to post a random position to the tracking service every 30 seconds. This code demonstrates how we can make use of the HTTP endpoints to interact with the service (remember to replace steep-pond-8195.us-east1 with the hostname of your service):

@Component
public class VehiclePositionScheduledUpload {
 
  private static final Logger LOG = LoggerFactory.getLogger(VehiclePositionScheduledUpload.class);
 
  @Scheduled(fixedRate = 30000)
  public void reportCurrentTime() {
    LOG.info("Saving a new vehicle position");
 
    final ResponseEntity<Void> response = WebClient.create("https://frosty-moon-3947.us-east1.kalix.app")
      .post()
      .uri("/position")
      .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
      .body(Mo.bearing((float) Math.random() * 360.0f)
        .latitude((float) Math.random() * 90.0f)
        .longitude((float) Math.random() * 90.0f)
        .speed((float) Math.random() * 100.0f)
        .odometer((float) Math.random() * 50000.0f)
        .build()no.just(VehiclePosition.builder()
      ), VehiclePosition.class)
      .retrieve()
      .toBodilessEntity()
      .block();

      LOG.info("Response was " + response.getStatusCodeValue());
  }
}

Running this Spring application slowly fills the tracking service with dummy data, allowing us to verify that the GTFS-RT feed creation endpoint successfully builds new feeds with updated positions.

Conclusion

Kalix provides developers with a unique approach to building microservices. By abstracting the underlying persistence layer, Kalix allows developers to focus on creating microservices manipulating plain old Java objects (POJOs), removing much of the boilerplate code typically used to serialize and persist data.

Kalix also provides a platform that automatically scales based on load, so our traffic tracking application could seamlessly scale to support a huge number of vehicles constantly reporting their positions. We could submit dozens of positions every second in our testing and still promptly download the GTFS-RT feed.

To test Kalix for yourself, sign up for a free account, grab the sample application source code, and see how easy it can be to build highly scalable and resilient microservices.

Author Section will go here