We launched the Rivet Beta a couple of weeks ago in hopes of flusing out issues before we reached a production status. Thanks to some feedback from one of our users, we’ve made a change that should make several things work more smoothly. This post goes into detail about the problems we discovered and how we handled them.

Geth 1.9.0

Just days before we launched the Rivet beta the Geth Team announced Geth v1.9.0. We weren’t going to delay our Beta to merge in changes that wouldn’t effect the experience of Rivet’s end users, but we put it high on our list for post-launch tasks. As of now, Ether Cattle is updated to v1.9.0 and Rivet is running with those updates.

Block consistency Issue

The Bug Report

Twitter user @nipolnipol reached out to bring a problem to our attention. He found that when they tracked block numbers over time, he would occasionally get number out-of-sequence. That is, if he saw 8,214,560 one second, he might see 8,214,559 a second later.

The reason the user observed this behavior is that we were running multiple Ether Cattle Clusters, each with one master and a pool of replicas containing data processed by the master. While we take measures to make sure our masters are peered with one another and do not fall drastically behind each other, each one has to process blocks, and some blocks take a while to process. As a result, one master might finish processing a given block a second or two before the other master. Because the replicas are behind the same load balancer, one request from a user might go to one pool, while the next request goes to a different pool, resulting in this mixed behavior.

In general, our team wasn’t surprised by the fact these consistency issues existed — this is a distributed system, and we were aware that we had elected to compromise on consistency in favor of scalability and availability, but we were surprised by the magnitude. Getting block 8,214,559 a full second after observing the existence of block 8,214,560 was too much.

Background

So why do we run multiple Ether Cattle Clusters in the first place? If we can scale up the number of replicas as high as necessary to meet our capacity demands, what’s the point in running multiple clusters with independent masters?

Well, eventually everything fails. Sooner or later our master server is bound to incur a failure, and when it does the replicas for that master will stop getting updates. To avoid serving extremely stale data, when the replicas lose touch with the master, they remove themselves from the load balancer and wait for the master to catch them back up. But for that to work, we have to have other replicas that aren’t dependent on the same master, or there will be nothing left in the load balancer to serve users requests. Ether Cattle’s Replication system enables us to replace masters quickly, but we don’t want downtime while we do it.

Solution

So we need redundant masters, but do they need to be fully independent clusters? Would it be possible for both masters to feed the same set of replicas?

When we started this project, we didn’t really know the answer. We didn’t know if the write operations between masters were consistent enough that a single replica could merge the writes from two different masters without running into problems.

As we’ve wrapped our heads around the problem space, we’ve found that generally the answer is yes, replicas could deal with writes from two masters, with a couple of very specific caveats.

In general, when Geth receives a block from a peer it:

Only once the last stage has happened is the block considered to be current by our replicas, and all of the other stages should write the same data regardless of when they happen (eg. state trie updates will always update the same records in the database, as will block updates).

The only place a problem would occur is with the LastHeaderHash and LastBlockHash records. Suppose you had two masters, where one runs consistently behind the other.

So in this case all the replicas would bounce back and forth as the two masters overwrite the LastHeaderHash and LastBlockHash values.

But there’s a simple solution to this problem: Track the values written to those keys, and don’t write values we’ve already seen. When M2 tries to write block N to LastHeaderHash and LastBlockHash, the replicas quietly ignore it.

The result is that we can now run multiple masters to support a single pool of replicas. The replicas will reflect the latest block received by any master, without any cases where they would roll back to an earlier block.

Other Benefits

While this change was prompted by the consistency concerns, it has several other convenient side-effects.

Since we’re running multiple masters, the replicas should reflect the latest block number of the first master to process a given block, meaning we should be able to report the latest block a bit faster than we would have previously.

Additionally, we previously had to run each cluster with enough replicas that if any master failed and its replicas dropped out of the load balancer, the remaining replicas would be able to carry the load. Since all of our masters now feed all of our replicas, we don’t have to carry as much extra replica capacity, because the failure of a single master won’t reduce our replica capacity at all.

We’re also able to stand up new masters faster now. Under the old model if a master failed, the autoscaler would replace it, it would sync the latest available data from Kafka, then find peers on the Internet to sync from. But the latest data from Kafka would be capped at when the previous master failed. Since we now have multiple masters feeding Kafka, the failure of one master doesn’t stop progress in Kafka, so the new master will be able to sync right up to the current block before it switches to syncing from peers on the Internet. The masters in a cluster will also peer directly with each other, so there’s much less peer discovery time before a master is validating blocks on its own.

All told, the multi-master capability is a huge leap in our ability to maintain a fast, reliable cluster.

Outstanding Concerns

While this has drastically narrowed the window for our consistency issues, it doesn’t eliminate them entirely. We’re still working with networked systems. Each master will send its write operations to Kafka, and each replica will process those at their own pace. In general, we expect the various replicas in a pool to process messages from Kafka within about 30 milliseconds of each other, as opposed to a window of potentially multiple seconds for different masters processing the same blocks.

In practice, we see that if we just hammer our servers with requests for the latest block, every once in a while we’ll see a case where one request returns N + 1, then a subsequent request returns N. If we put a 50ms delay between requests, this issue goes away. If you’re polling for the latest block every few seconds (instead of 20+ times per second), it’s very unlikely that your request would come in close enough to the block boundary of a block that has consistency issues significant enough that your second request would return an earlier block.

The consistency concern also complicates our software upgrade process a bit. Masters and replicas need to be on the same version of the software. Previously our upgrade process was simply to add a new cluster to the load balancer running the new software, and some time later remove clusters running older software. But as we’ve found, running multiple clusters introduces the consistency issues we observed earlier. For now, our plan is to establish a weekly maintenance window during which we may stand up new clusters. During that time, there is a higher possibility that users will see out-of-order blocks for at most a few minutes between standing up the new cluster and removing the old one. As we have time to refine our process, the likelihood of seeing out-of-order blocks should fall even during a maintenance window.

Replica Startup Times

As we’ve been working on the Ether Cattle Initiative and into the Rivet Beta, we’ve found that the amount of time it takes for a new replica to start up has been creeping higher and higher. As our ability to scale effectively depends on being able to stand up new replicas quickly, this became a problem. This issue was founded on two main factors:

Compaction

A major contributor to replica startup times (and the reason it seems to be getting worse over time) is Leveldb compaction. Leveldb is a Key-Value store that Geth uses to store all of the information about the blockchain and the state trie (which includes balances and contract data). When you write a value into Leveldb, it writes the new key / value pair immediately, then later has a compaction process where it goes through to delete older versions of the key. The compaction process runs in the background, and while it slows things down a little bit, it’s not usually a big deal.

Unfortunately, the Ether Cattle Initiative uses a snapshotting process that doesn’t play very nicely with Leveldb’s compaction process. Once a day we start up a new server, sync all new information from Kafka, take an AWS Snapshot, and shut the server down. This makes sure that our snapshotting process doesn’t interfere with any of the masters or replicas that are serving users, and that the replica service has shutdown cleanly before taking a snapshot.

The problem with this approach is that because we shutdown as soon as we’re up-to-date with Kafka, we don’t allow the compaction process to run its course. When we spin up a new server from the snapshot, it gets slowed down due to the incomplete compaction process. Every time we take a new snapshot the problem gets worse, as we add new records that need to be compacted without giving it time to complete the compaction process.

The multi-master system we introduced as a solution to the consistency issue makes the compaction problem even worse, because now the vast majority of keys will be written twice, while previously most of them were only ever written once.

Solution

Because of the cold start problem (which we’ll discuss in the next section), running a full compaction daily delays our snapshots more than we’re comfortable with. Instead, we’ve opted to do full compactions once a week. We still take our regular daily snapshot, but then we compact that snapshot and replace it. Thus, the snapshot we take the next day will start from fully compacted, and we’ll never have more than a week of compaction outstanding.

Cold Starts

In AWS, when you take a snapshot of a disk it stores all of your block data in Amazon S3. When you create a volume from that snapshot, it sort of cheats. It doesn’t copy all of your data onto a new volume, as that would take a considerable amount of time to make the volume available. Instead it makes the volume available immediately, and as you request data off the volume it goes back to Amazon S3 to get the requested blocks. If your snapshot just includes your operating system and an application you’re going to run, everything gets loaded from S3 to your volume pretty quickly, and you never notice the extra latency.

On the other hand, when your snapshot is a several hundred gigabyte Geth chaindata folder, the latency can be pretty noticeable. When we start up a new server with a cold volume (where most of the snapshot data is still in S3), we have to sync from Kafka and write the data to Leveldb. But far from being a flat, append-only file, Leveldb consists of thousands of files that need to be read, cross-referenced and rewritten as we pull data from Kafka into Leveldb. This makes it much slower to sync data from Kafka into Leveldb than it would if we were syncing to a fresh disk. Compounded with compactions (which need to read from all over the disk), it often takes over an hour to sync the latest data from Kafka.

Solution

To solve this, we made a new extension to Geth. Rather than operating off of a single Leveldb database, we now support running two Leveldb databases layered on top of eachother. For the top layer, we start a brand new Leveldb database on NVME local storage that lives with our EC2 instance. Anything we pull from Kafka gets written to this new Leveldb database. For the bottom layer, we start with our snapshotted volume. When a read request comes in, we check the top layer first. If we find the key there, we return that value immediately. If it’s not there, we check to see if we have a deletion marker for that key in the top layer. If there’s no record in the top layer, we check the bottom layer for the key.

By taking this layered approach, we’re syncing from Kafka into an empty (and thus perfectly compacted) database on some of the fastest disks AWS has to offer. With these parameters we can sync 24 hours worth of data from Kafka in about 10 minutes, and since we take snapshots every 24 hours, on average our snapshots will be 12 hours old, so our average time for syncing a brand new replica from Kafka is about 5 minutes.

Upcoming Innovations

A big priority for our team is supporting websockets, which will . We’ve had this request from a few of our Beta partners, and it’s the final nail in the coffin of the consistency issue.

We’d love to hear from you about what features you’d like to see us focus on next. Some ideas on our roadmap for after websockets include:

Tweet your ideas to @R_I_V_E_T to help us set our priorities.

Join The Beta!

We greatly appreciate the feedback we’ve received from users so far. Sign up for the beta at Rivet.cloud — We’d appreciate your feedback too.