0xdata.com is building in-memory analytics (no surprise, see 0xdata.com). What may be a surprise, though, is that there’s a full-fledged high-performance Key/Value store built into H2O and that is central to both our data management and our control logic.
We use the K/V store in two main ways:
- All the Big Data is stored striped across the cluster with pseudo-random Keys. For any given Key we can find the node which “homes” it from the Key itself, and cache it locally as needed. Pseudo-random striping in practice means the data is typically well distributed around the cluster. Caching (as opposed to a fixed placement) means we do not need to be perfect about which CPU works on which chunk of data – as long as *most* of the work is CPU-local.
- About 1/2 the control logic goes through the K/V store (and the other half uses the RPC mechanism). Anything which needs cross-node visibility and persistence uses the store, including progress counters, meta-data for temporary distributed vectors, built models, loaded datasets, and results from all kinds of work we wish to cache to avoid doing it again later.
First some simple facts:
- The H2O K/V store supports the full Java Memory Model – lazy consistency can be asked for in places but typically only increases performance under certain use-cases (a mix of high volume reads & writes to the same keys). Note that the JMM does not order writes to non-volatile variables, so high volume writes to unrelated Keys (as is common on temp data in the middle of some big calculation) all runs in parallel with both the network traffic and the actual Value production. i.e., we don’t typically stall-on-write on any common use-case, and we still maintain exact JMM semantics.
- All Keys can be cached locally – meaning a typical hot Key ‘get’ is cached, and costs no more than a hashtable ‘get’ – about 150ns. Same for a ‘put’ – the write is cached locally, then forwarded to another node (AFAIK, this is the fastest K/V get/put on the planet, but see below about not persistent). The forwarding happens in the background by default (unless you’ve specified volatile-like behavior). Meanwhile, local readers will see the write immediately, and the writer is stalled for no more time than the hashtable ‘put’ and a single UDP-packet send.
- H2O also supports transactional K/V updates – the transaction function is forwarded to the Key’s home, where it is run in a loop until the transaction succeeds or is aborted. We often use this for e.g. asynchronous updates to progress bars.
- The H2O Cloud is peer-to-peer. There is no “name-node” nor central Key dictionary. Each Key has a home-node, but the homes are picked pseudo-randomly per-key.
- H2O’s store is not persistent, nor is it an HA solution. We looked at those (and even at one point had a fully functioning Key auto-replicate & repair) and decided that the market was well served by existing technologies. We opted to put our energies into the Math side of things.
Some Big Picture Details:
H2O’s K/V store is a classic peer-to-peer Distributed Hash Table, with the Keys distributed around the cluster via a psuedo-random hash function. Pseudo-random because we can (and frequently do) muck with it, to force Keys to ‘home’ to different nodes (usually for load-balance reasons). A Key’s ‘home’ is solely responsible for breaking ties in racing writes and is the “source of truth” for that Key. To repeat: Keys can be cached anywhere, and both reads & writes can be cached (although a write is not complete until it reaches ‘home’, gets ordered with other writes, and an ACK is returned). The ‘home’ is only consulted when breaking ties on conflicting writes or to fetch the value on a Key miss.
Keys are not much more than a blob of unique bytes, often char arrays from Strings or random UUID’s.
Values hold bits for maintaining consistency, plus status bits for being on some backing store (for the user-mode swap-to-disk), plus a big blob of bytes. The blob of bytes is typically a serialized POJO, and if so aninflated copy of the POJO is kept around also. We use generics to auto-inflate the POJO form:
MyPOJO pojo = DKV.get(key).get();
This code will set the variable “pojo” to a POJO pulled from the K/V store. If all caches hit, this will take about 150ns. There is a lot of raw compressed data (the Big Data part of big data), so Big Data is read directly from the bytes and its “POJO” form is the self byte array – i.e., we don’t keep two copies of Big Data (both a serialized and deserialized form).
Inflated POJOs (and their backing Values) are “immutable”. While we cannot enforce this at the Java level, updates to the POJOs will not stick in the K/V store unless a ‘put’ is done. More on this later, but mostly it turns into a coding style issue: if you need to update a POJO AND make the changes globally visible, you need to do a “DKV.put(key,pojo)” at some point.
The further restriction on POJOs is that they inherit from the class “Iced”. The bytecode weaver will then inject all the code needed to serialize & deserialize (and a JSON pretty-printer, and a bunch of other code). This rules out the default Java collections (although we have some equivalents – that can run *distributed*, because the collection can get larger than what a single node can hold). In practice this hasn’t been an issue. We serialize Iced & primitives, arrays of Iced & primitives and recursive subclasses of Iced.
Reliable Remote Procedure Call
H2O is a clustered solution which requires network communication to JVMs in unrelated process or machine memory spaces. That network communication can be fast or slow, or may drop packets & sockets (even TCP can silently fail), and may need to be retried. We have implemented a reliable RPC mechanism which retries failed communications at the RPC level. An RPC contains a command (or call) to execute on the remote, plus the call arguments; there is a return value. Both args & returns may be void, or small or may contain gigabytes of data.
Our mechanism has all the obvious optimizations: message data is compressed in a variaty of ways (because CPU is cheaper than network). Short messages are sent via 1 or 2 UDP packets; larger message use TCP for congestion control. RPCalls are retried periodically until we get an ACK back; the ACK also contains the call’s return value. The ACK itself is also retried until the called node gets an ACKACK back (and this ends the cycle of Call/ACK/ACKACK). We handle all the problems with double-sending of tasks & replies. The end experience is the client makes a blocking call, sending the ‘this’ POJO over the wire – and gets back a modified ‘this’ POJO with results filled in.
In practice, we can pull cables from a running cluster, and plug them back in, and the cluster will recover; – or drop >50% of all UDP packets and still have the system work (albeit more slowly with lots of retries).
And For Next Time, The Gory Details
Been too long since I’ve blogged, but this blog has become quite long already! And I think I’m going to need pictures also… so, next time the gory details (and yes, building on the above parts).