White to Black Gradient
blog

Saga Patterns in Kalix Part 4 - Error Handling, DLQ

Andrzej Ludwikowski.
Software Architect, Lightbend.
  • 1 Sept 2023,
  • 6 minute read

The error handling topic is one of the most tricky parts when working with long-lived business processes. Even our basic example with just two steps can be a good playground to summarize what can go wrong.

The current implementation is pretty solid without any extra modifications. Thanks to the Kalix Subscription mechanism (at-least-once delivery guarantees) the process will recover after any restarts or crashes.

The main green path contains three steps: seat reservation, charging and confirmation (1).

seat reservation, charging and confirmation

In case of insufficient funds, we need to apply compensation action and cancel the reservation (2).

insufficient funds

Due to the ordered event stream nature, we don’t have to worry about the reservation cancellation being consumed before the actual charge rejection. While ordered processing provides a significant advantage, it also presents a major challenge in choreography-based Sagas.

Poison message

All SeatReserved events for a given Show will be processed one by one. Now, imagine that for some reason charging a single wallet (or a small subset of wallets) keeps failing with an unexpected exception. This could be due to a bug in the code or an external service problem (in case we would use some different wallets/payment providers). Despite that this error occurs relatively rarely, the whole process is blocked and can’t move forward. This is the definition of a poison message (event) problem, also called head-of-line blocking. We consume an ordered stream of events, so we can’t just skip an event in case of failure, because this could lead to an inconsistent system state, e.g. a paid reservation that was never confirmed.

We can minimize this problem with stream partitioning, similar to partitions in Apache Kafka. In case of an error, only one or some partitions are affected and the rest can continue the processing. In fact, Kalix will do something like that for us under the hood.

Dead letter queue

That won’t solve the problem completely. Alternatively, we can leave the safe space of ordered event processing and address this issue with a dead-letter queue solution. After a few retries, if the error is still present, we divert the poison event for later processing and release the main processing stream. In Kalix there is no built-in solution for the dead-letter queue, but you can easily simulate it with existing components.

insufficient funds

Our DLQ is based on another Event Sourced Entity - WalletFailureEntity. The implementation is simple, we just need to record the fact of the failure. An entity state WalletFailureState can be empty, but we can also use it to collect some statistics, like a failures counter. The event(s) should contain all the necessary information to recover after the failure. In our case, the WalletChargeFailureOccurred event reuses the ChargeWallet command (source of the problem) with an additional error message (command processing result) for debugging.

public class WalletFailureEntity extends EventSourcedEntity<WalletFailureState, WalletFailureEvent> {

  public record WalletFailureState(int numberOfFailures) {
    public WalletFailureState inc() {
      return new WalletFailureState(numberOfFailures + 1);
    }
  }

  interface WalletFailureEvent {}

  public record WalletChargeFailureOccurred(ChargeWallet source, String msg) implements WalletFailureEvent {}

  @Override
  public WalletFailureState emptyState() {
    return new WalletFailureState(0);
  }

  @PostMapping
  public Effect<String> registerChargeError(@RequestBody ChargeWallet source, @RequestParam String msg) {
    return effects().emitEvent(new WalletChargeFailureOccurred(source, msg))
        .thenReply(__ -> "registered");
  }

  @EventHandler
  public WalletFailureState apply(WalletChargeFailureOccurred __) {
    return currentState().inc();
  }
}

Let’s focus on the ChargeForReservation Action. We don’t want to register a failure after the first attempt, because this could just be a temporal and recoverable glitch. It’s better to have a retry policy that will retry the call a few times before it finally gives up with a failure registration. For that, we can use Akka Future Patterns or any similar solution.

@Subscribe.EventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class ChargeForReservation extends Action {

  public Effect<String> charge(SeatReserved seatReserved) {

    return effects().asyncReply(
      Patterns.retry(() -> chargeWallet(walletId, chargeWallet),
          attempts,
          retryDelay,
          actorSystem)
        .exceptionallyComposeAsync(throwable ->
          registerFailure(throwable, walletId, chargeWallet)
        )
    );
  }

}

Processing DLQ events from HandleWalletFailures is like any other Action subscription. In our case, we want to cancel the reservation.

@Subscribe.EventSourcedEntity(value = WalletFailureEntity.class)
public class HandleWalletFailures extends Action {

  public Effect<String> handle(WalletChargeFailureOccurred walletChargeFailureOccurred) {
    String reservationId = walletChargeFailureOccurred.source().expenseId();

    return effects().asyncReply(getShowIdBy(reservationId).thenCompose(showId ->
        cancelReservation(reservationId, showId)
    ));
  }

}

And now the hard part. After applying the DLQ, we can’t be sure about the order of processing. Besides the basic scenarios mentioned above (1, 2), we need to be ready for more twisted flows, like reservation confirmation (wallet was charged) after the cancellation (3), or duplicated cancellation (4).

Image 3Image 4

This could happen in case of wallet timeouts. We don’t know what the ChargeWallet command result was, or if the command was processed at all. After the reservation cancellation, we can get the missing event, that the wallet was actually charged).

A refund action from the (3) flow is triggered by an additional CancelledReservationConfirmed show event.

@Subscribe.EventSourcedEntity(value = ShowEntity.class, ignoreUnknown = true)
public class RefundForReservation extends Action {

  public Effect<String> refund(CancelledReservationConfirmed cancelledReservationConfirmed) {
    ...

    return effects().asyncReply(
        getReservation(cancelledReservationConfirmed.reservationId()).thenCompose(reservation ->
            refund(reservation.walletId(), reservation.price(), commandId)
        )
    );
  }
}

Implementing such flows requires additional changes in the event model, in the Show aggregate logic, and a more detailed Reservation entity (acting as a read model). All changes can be reviewed in this diff.

Summary

Poison messages are one of the most complex errors to handle when working with choreography-based Sagas, especially when the implementation relies on the fact that the event stream is ordered. The proposed dead letter queue solution based on an Event Sourced Entity helps us to solve the technical challenges but that is not the only way to solve this problem.

In the Kalix ecosystem, there is a dedicated Workflow component for implementing orchestration-based Sagas, where the flow is more request-driven, hence a single poison event can't stop the whole processing. However, there are other challenges that may surprise you, which will be the topic of the fifth part of the series. In the meantime - checkout the part4 tag from the source code and play with shouldConfirmCancelledReservationAndRefund integration test.