Saga Patterns in Kalix Part 2 - Read Models
- Andrzej Ludwikowski.
- Software Architect, Lightbend.
- 1 Sept 2023,
- 7 minute read
In Part 1 of this series, Saga Patterns in Kalix - Event Choreography, I showed how easy it is to build event-driven Saga choreography with Kalix. We continue our journey with the Saga pattern in Kalix - read models.
Previously, our implementation has been left with a hardcoded showId
value. If you are used to CRUD-style applications with ORM frameworks like Hibernate, Spring Data, or similar, you might be surprised by the fact that you can only access Kalix Entity by entity ID. It doesn’t matter if it's an Event Sourced Entity or a Value Entity (which emulates a CRUD-style approach).
What may look like a limitation at first glance is actually a very conscious decision to promote a CQRS pattern in Kalix from the very beginning. From a domain code perspective, it’s very healthy to have a separate model for your writes and a separate model for reads. While for Kalix, CQRS allows for a very flexible scaling of our application under the hood. Different strategies can be used to scale the write and the read side of the system. Similar to the previous blog post, CQRS may sound like a lot of extra work. With Kalix components - it’s just a matter of connecting the dots.
Just to recap, we ended with this implementation:
public Effect<String> confirmReservation(WalletCharged walletCharged) {
String reservationId = walletCharged.reservationId();
String showId = "show1";
return effects().forward(confirmReservation(showId, reservationId));
}
public Effect<String> cancelReservation(WalletChargeRejected walletChargeRejected) {
String reservationId = walletChargeRejected.reservationId();
String showId = "show1";
return effects().forward(cancelReservation(showId, reservationId));
}
One of the solutions could be to extend the Wallet
events model with an additional showId
field, similar to reservationId
. This is tempting but from a system architecture perspective, it’s an implicit cyclic dependency between two separate domains. The Show
domain knows about Wallet
domain and vice versa. Changing one will affect the other. Cyclic dependencies should be avoided whenever possible. If we want to build a more generic Wallet
domain, then even a reservationId
is a cyclic dependency. Charging the wallet for something other than reservation, like groceries etc. would not have a reservationId
. A better solution would be to rename reservationId
to an expenseId
and map this value in the action implementation.
public Effect<String> confirmReservation(WalletCharged walletCharged) {
String reservationId = walletCharged.expenseId();
String showId = "show1";
return effects().forward(confirmReservation(showId, reservationId));
}
Now, the Wallet
domain is not aware of any reservation system and could be used for other purposes. If you are interested in learning more, please follow the Context Mapping topic from DDD.
Back to the main task. Since we only have an expenseId
a.k.a reservationId
a mapping from reservationId
to showId
is required to implement our Saga correctly. You’re working with Event Sourced Entities, so it’s very natural to build a read model based on the events stream. The Show
and ShowByReservation
are your write and read model (respectively) from the CQRS perspective.
View as a read model
A View
component is the most natural way to achieve full CQRS in the Kalix ecosystem. Each View
is optimized for a specific query (or queries). We can retrieve our data with SQL-like query syntax. It’s worth mentioning that a View can be updated using events emitted from an Event Sourced Entity or changes updates from a Value Entity. When it comes to the implementation, it’s just a matter of adding some annotations, events handlers, and exposing the query via an HTTP endpoint:
@ViewId("show_by_reservation_view")
@Table("show_by_reservation")
@Subscribe.EventSourcedEntity(value = ShowEntity.class)
public class ShowByReservationView extends View<ShowByReservation> {
@GetMapping("/show/by-reservation-id/{reservationId}")
@Query("SELECT * FROM show_by_reservation WHERE :reservationId = ANY(reservationIds)")
public ShowByReservation getShow(String name) {
return null;
}
public UpdateEffect<ShowByReservation> onEvent(ShowCreated created) {
return effects().updateState(new ShowByReservation(created.showId(), new ArrayList<>()));
}
public UpdateEffect<ShowByReservation> onEvent(SeatReserved reserved) {
return effects().updateState(viewState().add(reserved.reservationId()));
}
public UpdateEffect<ShowByReservation> onEvent(SeatReservationPaid paid) {
return effects().updateState(viewState().remove(paid.reservationId()));
}
public UpdateEffect<ShowByReservation> onEvent(SeatReservationCancelled cancelled) {
return effects().updateState(viewState().remove(cancelled.reservationId()));
}
}
The getShow
returns null
because this is only a placeholder to associate a Query
annotation with an endpoint GetMapping
annotation. The actual implementation is done by the Kalix engine. Let's have a detailed look at the ShowByReservation
implementation, which basically keeps the reservationIds
list up to date.
public record ShowByReservation(String showId, List<String> reservationIds) {
public ShowByReservation(String showId, String reservationId) {
this(showId, new ArrayList<>());
reservationIds.add(reservationId);
}
public ShowByReservation add(String reservationId) {
if (!reservationIds.contains(reservationId)) {
reservationIds.add(reservationId);
}
return this;
}
public ShowByReservation remove(String reservationId) {
reservationIds.remove(reservationId);
return this;
}
}
Instead of simply mapping one showId
to one reservationId
we have one showId
with many reservationIds
. That’s because there can be only one entry for a single entity id in a view model. A collection like Java List
will materialize itself as an SQL-like array, that’s why we use the ANY
operator in the query definition.
It’s a very specific view model, which may be difficult to understand at first glance. Keep in mind that there can only be one view entry for a single entity id. However, if you're not comfortable with this solution, or you're concerned about the query performance with larger collections (a show with 1000+ seats), I suggest a different approach.
Value Entity as a read model
The real power of Kalix is the elasticity of how you can combine existing components. The View
component is a default candidate for a read model, but not the only one. A ValueEntity
could also be used as a read model. The domain part is much simpler:
record Reservation(String reservationId, String showId) {}
After wrapping it with the ValueEntity
component:
@EntityKey("id")
@EntityType("reservation")
@RequestMapping("/reservation/{id}")
public class ReservationEntity extends ValueEntity<Reservation> {
@GetMapping
public Effect<Reservation> get(@PathVariable String id) {
if (currentState() == null) {
return effects().error("reservation not found", Status.Code.NOT_FOUND);
} else {
return effects().reply(currentState());
}
}
@PostMapping("/{showId}")
public Effect<String> create(@PathVariable String id, @PathVariable String showId) {
return effects().updateState(new Reservation(id, showId)).thenReply("reservation created");
}
@DeleteMapping
public Effect<String> delete() {
return effects().deleteEntity().thenReply("reservation deleted");
}
}
The only missing part is the subscription to the Show
events:
@Subscribe.EventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class FoldShowEventsToReservation extends Action {
public Effect<String> onEvent(SeatReserved reserved) {
return effects().forward(createReservation(reserved.reservationId(), reserved.showId()));
}
public Effect<String> onEvent(SeatReservationPaid paid) {
return effects().forward(deleteReservation(paid.reservationId()));
}
public Effect<String> onEvent(SeatReservationCancelled cancelled) {
return effects().forward(deleteReservation(cancelled.reservationId()));
}
}
Although it looks like more code, the solution is more explicit with what we want to achieve.
To finish our confirmation (or cancellation) Saga, we can use both solutions in the same way:
public class CompleteReservation extends Action {
public Effect<String> confirmReservation(WalletCharged walletCharged) {
String reservationId = walletCharged.expenseId();
return effects().asyncReply(
getShowIdBy(reservationId).thenCompose(showId ->
confirmReservation(showId, reservationId)
));
}
//Value Entity as a read model
private CompletionStage<String> getShowIdBy(String reservationId) {
return componentClient.forValueEntity(reservationId).call(ReservationEntity::get).execute()
.thenApply(Reservation::showId);
}
//View as a read model
private CompletionStage<String> getShowIdBy2(String reservationId) {
return componentClient.forView().call(ShowByReservationView::getShow).params(reservationId).execute()
.thenApply(ShowByReservation::showId);
}
}
Summary
Today we introduced two more Kalix components, the View
and the ValueEntity
. I hope you liked the elasticity of connecting different puzzle pieces together to deliver a complete asynchronous CQRS implementation.
The third post in this series will cover deduplication. All Kalix subscriptions, in addition to preserving the order of events (per aggregate), guarantee at-least-once delivery semantics. This means that we should be ready for event/message duplicates in our Saga choreography. Check out part2 tag from the source code.