White to Black Gradient
blog

Saga Patterns in Kalix Part 3 - Exactly-once Delivery with Deduplication

Andrzej Ludwikowski.
Software Architect, Lightbend.
  • 1 Sept 2023,
  • 8 minute read

A good metaphor is worth a thousand words, even better if it's funny. This famous tweet by Mathias Verraes is one of my favorites:

Saga Patterns in Kalix Part 3 - Exactly-once Delivery with Deduplication

It's hard to find a better phrase to describe the fundamental challenges in a world of distributed systems.

The good news is that Kalix solves one of those challenges, namely the order of messages. As long as the source of your subscription is a Kalix component like an Event Sourced Entity or a Value Entity, the order of events or state changes is preserved (similar to Apache Kafka partitions).

The second challenge—exactly-once delivery—cannot (in most cases) be generalized and solved by a one-size-fits-all implementation. The solution is usually a mix of business requirements and possible technical tricks in a given context.

Where should I apply this solution? Anywhere there is a chance of duplication that could lead to an incorrect state, for example an HTTP endpoint, an events/messages subscription, etc. Each situation is different, and each time you can use a different strategy.

Let’s continue with the third part of this blog series journey and examine how this can be solved in our Kalix demo application.

Idempotent read models

The easiest way to achieve exactly-once delivery semantics is to have idempotent updates. We already did this when we worked on ReservationEntity (acting as a read model):

@PostMapping("/{showId}")
public Effect<String> create(@PathVariable String showId) {
  String reservationId = commandContext().entityId();
  return effects().updateState(new Reservation(reservationId, showId)).thenReply("...");
}

@DeleteMapping
public Effect<String> delete() {
  return effects().deleteEntity().thenReply("reservation deleted");
}

Running the same request more than once would not change anything. The outcome will be exactly the same. Nothing to do here, you can move on to the next case.

The alternative read model approach, based on a View component (mentioned in the previous blog), requires a simple change from a List collection to a Set:

public record ShowByReservation(String showId, Set<String> reservationIds) {

  public ShowByReservation add(String reservationId) {
    if (!reservationIds.contains(reservationId)) {
      reservationIds.add(reservationId);
    }
    return this;
  }

  public ShowByReservation remove(String reservationId) {
    reservationIds.remove(reservationId);
    return this;
  }
}

After that, updates are idempotent.

Show Entity deduplication

We need to protect ShowEntity endpoints against duplicated requests. The thing is that for some duplicated requests you may want to return an error (creating the same Show) but for others, like reservation cancellation or confirmation you may prefer to accept the request and make it idempotent. This way the subscription will continue to work in case of some redeliveries or restarts.

To make it harder, let’s assume that you need to distinguish the situation where the record CancelSeatReservation(String reservationId) is a duplicate and where it's an invalid command with a wrong reservationId. For this, you need an additional finishedReservations collection in the Show domain class, which is the state of the ShowEntity. It keeps not only all historical reservation ids but also a reservation status (confirmed or canceled).

private Either<ShowCommandError, ShowEvent> handleCancellation(CancelSeatReservation cancelSeatReservation) {
  String reservationId = cancelSeatReservation.reservationId();
  return pendingReservations.get(reservationId).fold(
    /*no reservation*/
    () -> {
      if (finishedReservations.get(reservationId).exists(FinishedReservation::isCancelled)) {
        return left(DUPLICATED_COMMAND);
      } else {
        return left(RESERVATION_NOT_FOUND);
      }
    },
    /*matching reservation*/
    seatNumber -> //processing as before
}

The handleConfirmation requires very similar modifications. I’m skipping this part, but you can examine the code yourself here.

Reusing an existing business field for deduplication requires extra caution. You need to block a reservation of different seats with the same reservationId. Thus, we also need to check for duplicates in the handleReservation by scanning two collections pendingReservations and finishedReservations.

private Either<ShowCommandError, ShowEvent> handleReservation(ReserveSeat reserveSeat) {
  int seatNumber = reserveSeat.seatNumber();
  if (isDuplicate(reserveSeat.reservationId())) {
    return left(DUPLICATED_COMMAND);
  } else {
    //continue with the reservation
  }
}

private boolean isDuplicate(String reservationId) {
  return pendingReservations.containsKey(reservationId) ||
    finishedReservations.get(reservationId).isDefined();
}

From the domain perspective, duplication is like any other ShowCommandError. While from the ShowEntity layer this could be translated into a positive response.

@PatchMapping("/cancel-reservation/{reservationId}")
public Effect<String> cancelReservation(@PathVariable String reservationId) {
  if (currentState() == null) {
    return effects().error("show does not exists", NOT_FOUND);
  } else {
    CancelSeatReservation cancelSeatReservation = new CancelSeatReservation(reservationId);
    return currentState().process(cancelSeatReservation).fold(
      error -> errorEffect(error, cancelSeatReservation),
      showEvent -> persistEffect(showEvent, "reservation cancelled")
    );
  }
}


private Effect<String> errorEffect(ShowCommandError error, ShowCommand showCommand) {
  if (error == DUPLICATED_COMMAND) {
    return effects().reply("duplicated command, ignoring");
  } else {
    logger.error("processing command {} failed with {}", showCommand, error);
    return effects().error(error.name(), BAD_REQUEST);
  }
}

Wallet Entity deduplication

In the previous approach, we reused existing data (reservationId) for deduplication. Even for this simple example, it is easy to forget about some scenarios and introduce a bug. Extending the commands/events model with a dedicated field, just for the purpose of deduplication, is (in most cases) a better option. This way we can apply a more generic deduplication mechanism and you are more resistant to domain changes.

For example, any wallet command that requires deduplication could extend the RequiresDeduplicationCommand interface with an additional commandId field.

sealed interface RequiresDeduplicationCommand extends WalletCommand {
    String commandId();
  }

  record ChargeWallet(BigDecimal amount, String expenseId, String commandId) implements RequiresDeduplicationCommand {}

  record DepositFunds(BigDecimal amount, String commandId) implements RequiresDeduplicationCommand {}

Based on this field, you can deduplicate the command before it hits the domain logic:

public Either<WalletCommandError, WalletEvent> process(WalletCommand command) {
  if (isDuplicate(command)) {
    return Either.left(DUPLICATED_COMMAND);
  } else {
    return switch (command) {
      case CreateWallet create -> handleCreate(create);
      case ChargeWallet charge -> ifExists(() -> handleCharge(charge));
      case DepositFunds depositFunds -> ifExists(() -> handleDeposit(depositFunds));
    };
  }
}

Proper deduplication requires persisting the state update with the deduplication data in one transaction. For an Event Sourced entity like WalletEntity this means that you need to store this information in the event payload. The recovered state (from events) must contain all used commandIds. This brings us to a very specific problem. For long-lived entities, with many potential commands, you can’t deduplicate all of them without any time and/or space constraints. Based on the expected load you can calculate how many command ids you can keep in the memory (per entity). Next, you might use more sophisticated logic and data structure than just a Set to keep space (last 10000 ids) and/or time (last 2 hours) boundaries.

Although a separate field for deduplication is a much cleaner solution, it has a drawback. What should its value be? The most common choice is some sort of UUID implementation. In most cases this is fine, but if you want to fine-tune the solution, you could use a hash function that produces a shorter hash (less space). The most important aspect is that this value must be the same for the same payload. For HTTP requests you can’t control this, but for our Saga stages, we are in charge of the payloads.

The UUID is generated from a reservationId field, which will be the same in case of retries, this way we get the same commadId each time.

public Effect<String> charge(SeatReserved seatReserved) {
  String expenseId = seatReserved.reservationId();
  String commandId = UUID.nameUUIDFromBytes(seatReserved.reservationId().getBytes(UTF_8)).toString();
  var chargeWallet = new ChargeWallet(seatReserved.price(), expenseId, commandId);

  var chargeCall = kalixClient.patch("/wallet/" + seatReserved.walletId() + "/charge", chargeWallet, String.class);

  return effects().forward(chargeCall);
}

Very often it’s hard to choose which business field is a good candidate for hashing. If the source of the event is a Kalix Entity, you can use additional event information from the metadata. The UUID is generated from a reservationId field, which will be the same in case of retries, this way we get the same commadId each time. For Event Sourced entities (or Value Entities), each event has a unique, monotonically increasing, sequence number.

String sequenceNum = contextForComponents().metadata().get("ce-sequence").orElseThrow();
String commandId = UUID.nameUUIDFromBytes(sequenceNum.getBytes(UTF_8)).toString();

You can also find a “ce-id” field in the metadata, but similar to the CouldEvents specification, it might be different for the same payload, in case of redeliveries, so you can’t use it.

Having a monotonically increasing sequence number is a blessing. It’s a perfect candidate for deduplication without any additional modifications (like transforming to UUID). The deduplication logic could be simplified a lot because you only need to store the last processed sequence number, for each new command a higher number is expected. Otherwise, it’s a duplicate and you can skip it. This works great if the only source of changes for an entity is a subscription from another entity. However the WalletEntity also accepts commands from the outside world, like DepositFunds, because of that the solution is based on UUID.

Summary

I hope that this part clarifies how to achieve exactly-once delivery semantics in Kalix. Technically speaking, it’s not possible in a distributed world, but with an at-least-once delivery and proper deduplication strategy, we can achieve effectively-once delivery. The main takeaway is that there is no silver bullet. Each situation is different. Although, having a rich toolbox helps a lot because you can tailor the solution to a given context.

In the fourth blog post of this series, we will focus on error handling. How to make our Saga process bulletproof. We will also talk about one of the biggest challenges of the Saga pattern - error handling and compensation. Checkout the part3 tag from the source code.