SuperPumpup (dot com)
General awesomeness may be found here.

12 January 2017

Protecting Long-running streams


We have long-running entities that we want to operate on with confidence.


  • We need to be able to restart a system in a reasonable timeframe * We really want snapshots in-band (ES)

Canonical question

We have an account with 100k events (Hold, Capture, Release, Deposit) to process. Because of distributed systems, we will receive "duplicate" commands.

Solutions examined

Re-projecting all history to build state

Keeping 'rich' snapshots

  • All 100k transaction ids (UUID, 16 bytes each gives ~ 1.6 mb)
  • The 1.6 mb is too big to put "in-band"
  • This grows monotonically with account usage
  • "Hotspots"

Expiring old (vector) transactions

  • Only keeping, say, the last 1k transactions (16kb is in line with the upper limit we have seen of ~25kb for reasonable performance for ES) in the snapshot
  • This is fine under normal use, but in a "bursty" scenario, your wall time for margin-of-error goes down just when your system comes under load (and presumably more failure-prone)
  • Busy accounts may generate more than 1000 transactions in a single payroll run

Expiring old (wall clock) transactions

  • To do this, you need to also store a timestamp in addition to the UUID in memory, making the per-record size more than 20 bytes
  • On a bursty day, you wind up needing more space, and each record takes more space, so the general snapshotting performance deteriorates

Keeping transaction IDs in small, "external" streams

  • The first implementation we came up with, we could not figure out how to get integrity assured
  • The "command" that gets replayed writes that it was received (transaction-, event 0), this triggers a handler to try to process the request (does an NSF check, then a hold or reply that it's not-held), which either writes a hold in the account stream and then a "held" in the transaction stream, or writes a "NotHeld" in the transaction stream
  • Handler of the transaction stream dispatches a reply to the requesting process

The problem here is subtle.

If the system crashes here:

And handler 1 picks up event A, it will need to check to see whether the account stream has the event already. However, the account stream has a million events. Searching for that event is problematic. If the stream has 1mm events already, scanning for that one event is prohibitively expensive.

ES has a provision for an "Event ID" A unique identifier representing this event. This is used internally for idempotency if you write the same event twice you should use the same identifier both times. So if you can generate an Event ID (GUID) deteriministically, then actually you would be ok. However, that couples more tightly to an ES implementation detail than we would prefer.

But if we write the version number of the account projection in the "HoldRequested" event, then we know that the subsequent "Held" event must be in the events following that, and we can search only that space to ensure that our work is not repeated.

So in the end, we wind up with a messaging pattern that looks like this:

Account Messaging

You'll see that to process a Hold command, we write three events within the Account Service, and then communicate that back out by writing another.

To Capture the held funds, we also write three messages.

One thing that is striking about this pattern is that the handler that is handling Hold events projects entities from the Account stream, does business logic, and writes to the Account stream. These processes are coupled, for all intents and purposes - which is reasonable. They are coupled as a business process, so their implementation is coupled as well. Decoupling is not always necessary (or desirable).

Categories: Software