Migrated to DynamoDB!

Or how I wasted time and broke things?!

Hi Folks,

I've recently migrated rr to use DynamoDB. I thought I'd share the why and how for technical folks that might be interested.

The Tradeoff

The original tradeoff was - save $5 per month in exchange for potentially not being able to charge for some instances run.

When I originally threw this service together I wasn't sure if it was going to be worth my time, or if other people would find value in it, so I cut a few corners. I had decided to use an event sourcing approach and had found Axon Framework would do a fair bit out of the box for me, I just had to give it a relational database. I had a look at managed rds services from AWS and baulked at the minimum cost of $5/month. So instead I just used an embedded h2 database writing to the local filesystem, and would periodically back up a copy to s3 which instead would cost just a few cents. Except that if I lost the server I'd loose data since the last backup. Considering that payments are collected by stripe.com, I could check stripe's records to reinstate any missing payment data - it's just the records of instance time I had delivered that was really at risk. So the tradeoff I made was saving $5/month in exchange for potentially not being able to charge folks for some instances run (which did happen a couple of times).

Having my cake and eating it too

I had heard that DynamoDb is AWS's 'serverless' database and wondered if it would work for rr. The promise of serverless being that I wouldn't need to manage dynamodb, and that I just count on it being there whilst only paying for what I'd used.

Some blog posts on how folks had used DynamoDb for event sourcing, as well as explaining the optimistic concurrency mechanism gave me some confidence.

The key to optimistic concurrency being the condition expression preventing a transaction from being committed if there are any sequence number conflicts.

DynamoDbClient ddb;
...
ddb.transactWriteItems(TransactWriteItemsRequest.builder()
        .transactItems(items.stream()
                .map(i -> TransactWriteItem.builder().put(Put.builder()
                                .tableName(m.getType())
                                .item(i)
                                .conditionExpression("attribute_not_exists(sequenceNumber)")
                                .build()
                        ).build()
                ).collect(Collectors.toList()))
        .build());

Jamming it in

I thought I might be able to adapt DynamoDb to fit Axon, as it already had an adapter for MongoDb which is similar to DynamoDb in that they are both nosql databases. But it turned out that coming up with something similar for DynamoDb was going to be a bit more work than I was prepared for. Also the documentation pointed out the design of Axon had evolved in a way that wasn't ideal for MongoDb.

In pre Axon Framework 3 release we found MongoDb to be a very good fit as an Event Store. However with the introduction of Tracking Event Processors and how they track their events, we have encountered some inefficiencies in regards to the Mongo Event Store implementation.

I think this is referring to MongoDb not having a global ordering, since DynamoDb appears to be similar in that records are only ordered within a partition. So instead I decided to remove Axon and roll my own event sourcing in order to make use of DynamoDb.

Caching

One aspect of rolling my own was caching. One nice property of events is that since they represent the past, by definition they don't change. This makes them ideal for caching, limiting queries to just check for new events, further reducing the exchanges with DynamoDb and consequently the cost of DynamoDb.

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Cache;
...
private final Cache<String, List<GenericDomainEventMessage<?>>> cache = Caffeine.newBuilder()
            .build();
...
Stream<GenericDomainEventMessage<?>> aggregateEvents(String type, String aggregateIdentifier, long firstSequenceNumber) {
    List<GenericDomainEventMessage<?>> cached = Optional.ofNullable(cache.getIfPresent(aggregateIdentifier)).orElseGet(List::of);
    Long next = cached.stream().reduce((a, b) -> b).map(e -> e.getSequenceNumber() + 1).orElse(0L);
    List<GenericDomainEventMessage<?>> newEvents = readEvents(type, aggregateIdentifier, next).collect(Collectors.toList());
    List<GenericDomainEventMessage<?>> merged = Stream.of(cached.stream(), newEvents.stream()).flatMap(s -> s).collect(Collectors.toList());
    cache.put(aggregateIdentifier, merged);
    Stream<GenericDomainEventMessage<?>> result = merged.stream().filter(e -> e.getSequenceNumber() >= firstSequenceNumber);
    return result;
}

Concurrency

Another aspect was tracking scheduled missions. With Axon this was performed by queries sent to a view model essentially an object that maintains the state it needs to answer queries as a function of published events. A service that Axon performed here though was serialising the events and queries (much like an actor model) in order to avoid concurrency related races. The alternative I found was to wrap the view object as an actor using the simple actor model implementation actr.

I found the api exposed by actr surprisingly pleasant to use as

CompletableFuture<List<MissionId>> f = queryGateway.query(
    new FindInstancesOverdueForTermination(),
    ResponseTypes.multipleInstancesOf(MissionId.class)
);

became

CompletableFuture<List<MissionId>> f = actor.ask(
    model -> model.query(new FindInstancesOverdueForTermination())
);

The latter form more directly expresses the dependency on the model object, as well as not requiring the use of the command object pattern to carry the query message. The return type also doesn't need to supplied since its driven from the direct call to the model object. In my opinion these characteristics make the code easier to work with.

Some false starts

I took an iterative approach to migrating, where for a period the application was running some parts from h2 and other parts from DynamoDb. It didn't prevent me from making mistakes though.

The biggest mistake I made which unfortunately resulted in a bit of frustration for some folks (thanks again for your patience everyone :)) was an incompatibility in the way I changed the serialisation for DynamoDb around map data i.e. Map<String, Object>. This impacted the generation of the configuration files to start the DCS server (options.lua), so the result was the server not starting.

Essentially some values that were java boolean values were now String values, so

Map<String, Object> lotatcOptions;
...
if((boolean) lotatcOptions.getOrDefault("enabled", false)) {..}

resulted in a exception, and no options.lua file for the server.

I've since updated to a more tolerant form

if(Boolean.parseBoolean(lotatcOptions.getOrDefault("enabled", false).toString())) {..}

Victory! I hope. Time will tell.

So I've had rr running now for a while without any Axon or rds dependencies. I haven't been billed for DynamoDb yet. Hopefully usage will stay within the free tier and rr continues to operate smoothly. Time will tell.

Cheers,

Noisy

Ready Web Services Pty Ltd ABN: 28 657 568 977