in Uncategorized

An API For Distributed Analytics

There are so many APIs to choose from…

Features of the space:

  • Lots of data – which I'll qualify as “bigger than 1 machine” and thus needing parallel i.o, parallel memory, & parallel compute – and distributed algorithms.
  • Ease of programming; hide details (but expose when want to).  High level for ease-of-use, but “under the covers” has to be easy to understand- because no tool solves all problems – so expect extensions & frequent 1-off hacks.
  • Speed: In-memory by default, where memory can range from 2G to 2T and beyond.  Data placement is required (do not schlop data about unless needed, move code to data, no disk i/o by default).
  • Speed By Default: the normal/average/typical programming style will be fast.  You can “trip over yourself” and go slow, but normal usage is fast.  Obvious when you're moving away from “go fast” to “unknown speed”
  • Correct By Default: the normal/average/typical programming style will not be exposed to weird corner cases.  No data-races (unless you ask for them), no weird ordering rules, job-scheduling, nor counting mappers & reducers, no figuring out sharding or data placement, nor other low-level easy-to-get-wrong details.  Resource management simple by design.
  • Accessible for non-expert programmers, scientists, engineers, managers – looking for a tool, not wanting the tool to be more complicated than the problem

 

Design decisions:

Automatic data placement: It's a hard problem, and its been hard for a long time – but technology is changing: networks are fast relative to the size of memory.  You can move a significant fraction of memory in a cluster in relatively little time.  “Disk is the new tape”:  we want to do work in-memory for that 1000x speedup over disk, but this requires loading memory into one of many little slices in the cluster – which implies data-placement.  Start with random placement – while it's never perfect, it's also rarely “perfectly wrong” – you get consistent decent access patterns. Then add local caching to catch the hot common read blocks, and local caching of hot or streaming writes.  For H2O, this is one of the reasons for our K/V store; we get full JMM semantics, exact consistency, but also raw speed: 150ns for a cache-hitting read or write.  Typically a cache miss is a few msec (1 network hop there and back).

Map/Reduce: It's a simple paradigm shown to scale well.  Much of Big Data involves some kind of structure (log files, bit/byte streams, up to well organized SQL/Hive/DB files).  Map is very generic, and allows an arbitrary function on a unit of work/data to be easily scaled.  Reduce brings Big down to Small in a logarithmic number of steps.  Together, they handle a lot of problems.  Key to making this work: simplicity in the API.  Map converts an A to a B, and Reduce combines two B's into one B – for any arbitrary A and B.  No resource management, no counting Map or Reduce slots, no shuffle, no keys.  Just a nice clean Map and Reduce.  For H2O this means: Map reads your data directly (type A) and produces results in a Plain-Olde Java POJO's (type B) – which is also the Map's “this” pointer.  Results returned directly in “this”.  “This is Not Your Father's Map Reduce”

Correct By Default: No multi-threaded access questions.  Synchronization, if needed, is provided already.  No figuring out sharding or data placement; replication (caching) will happen as-needed.  NO resource management, other than Xmx (Java heap size).  Like sync, resource management is notoriously hard to get right so don't require people to do it.  For 0xdata, this means we use very fine grained parallelism (no Map is too small, Doug Lea Fork/Join), and very fine-grained Reduces (so all Big Data shrinks as rapidly as possible).

Fast By Default: Use of the default Map/Reduce API will produce programs that run in parallel & distributed across the cluster, at memory bandwidth speeds for both reads and writes.  Other clustered / parallel paradigms are available but are not guaranteed to go fast.  The API has a simple obvious design, and all calls have a “cost model” associated with them (these calls are guaranteed fast, these calls are only fast in these situations, these calls will work but may be slow, etc.  For 0xdata, code that accesses any number of columns at once (e.g. a single row) – but independent rows – will run at memory bandwidth speeds.  Same for writing any number of columns, including writing subsets (filtering) on rows.  Reductions will happen every so many Maps in a Log-tree fashion.  All filter results are guaranteed to be strongly ordered as well (despite the distributed & parallel execution).

Easy / Multiple APIs – Not all APIs are for all users!  Java & Map/Reduce are good for Java programmers – but people do math in R, Python and a host of other tools.  For 0xdata, we are a team of language implementers as well as mathematicians and systems' engineers.  We have implemented a generic REST/JSON API and can drive this API from R, python, bash, and Excel – with the ability to add more clients easily.  From inside the JVM, we can drive the system using Scala, or a simple REPL with an R-like syntax.

 

Lets get a little more concrete here, and bring out the jargon –

A H2O Data Taxonomy

Primitives – at the bottom level, the data are a Java primitive – be it a byte, char, long or double.  Or at least that's the presentation.  Under the hood we compress aggressively, often seeing 2x to 4x more compression than the GZIP'd file on disk – and we can do math on this compressed form typically at memory bandwidth speeds (i.e., the work to decompress is hidden in the overhead of pulling the data in from memory).  We also support the notion of “missing data” – a crucial notion for data scientists.  It's similar to Double.NaN, but for all data types.

A Chunk – The basic unit of parallel work, typically holding  1e3 to 1e6 (compressed) primitives.  This data unit is completely hidden, unless you are writing batch-Map calls (where the batching is for efficiency).  It's big enough to hide control overheads when launching parallel tasks, and small enough to be the unit of caching.  We promise that Chunks from Vecs being worked on together will be co-located on the same machine.

A Vec – A Distributed Vector.  Just like a Java array, but it can hold more than 2e31 elements – limited only by available memory.  Usually used to hold data of a single conceptual type, such as a person's age or IP address or name or last-click-time or balance, etc.  This is the main conceptual holder of Big Data, and collections of these will typically make up all your data.  Access will be parallel & distributed, and at memory-bandwidth speeds.

A Frame – A Collection of Vecs.  Pattered after R's data.frame (it's worked very well for more than 20 years!).  While Vecs might Big Data (and thus can be expensive to touch all of), Frames are mere pointers to Vecs.  We add & drop columns and reorganize them willy-nilly.  The data munging side of things has a lot of convenience functions here.

 

The Map/Reduce API

(1) Make a subclass of MRTask2, with POJO Java fields that inherent from Iced, or are primitives, or arrays of either.  Why subclassed from Iced?  Because the bytecode weaver will inject code to do a number of things, including serialization & JSON display, and code & loop optimizations.

class Calc extends MRTask2<Calc> {

(2) Break out the small-data Inputs to the Map, and initialize them in an instance of your subclass.  “Small data” will be replicated across the cluster, and available as read-only copies everywhere.  Inputs need to be read-only as they will be shared on each node in the cluster.  “Small” needs to fit in memory and my example is with doubles, but mega-byte sized data is cheap & commonly done.

final double mean;   // Read-only, shared, distributed
  final int maxHisto;  // Read-only, shared, distributed
  Calc( double meanX, int maxHisto ) { this.meanX=meanX; this.maxHisto = maxHisto; } 

(3) Break out the small-data Outputs from your map, and initialize them in the Map call.  Because they are initialized in the Map, they are guaranteed thread-local.  Because you get a new one for every Map call, they need to be rolled-up in a matching Reduce.

long histogram[];
  double sumError;
  void map( ... ) {
    histogram = new long[maxHisto]; // New private histogram[] object

(4) Break out the Big Data (inputs or outputs).  This will be passed to a doAll() call, and every Chunk of the Big Data will get a private cloned instance of the Calc object, distributed across the cluster:

  new Calc(mean,vec.max()).doAll(myBigVector /*or Frame or Vec[] or ....*/);

(5) Implement your Map.  Here we show a Batching-Map, which typically does a plain “for” loop over all the elements in the Chunk.  Access your data with the “at0” (Chunk-relative addressing) call – which is the fastest accessor but requires a chunk (and takes an “int” index).  Plain Vec-relative addressing is a little slower, but takes a full “long” index: “vec.at(long idx)”.

void map( Chunk chk ) {
     histogram = new long[maxHisto];
     for( int i=0; i<chk.len; i++ ) {
       histogram[(int)chk.at0(i)]++;
       double err = chk.at0(i)-mean;
       sumError += err*err;
     }
   }

(6) Implement a Reduce for any output fields.  Note that Reduce has a “type B” in the “this” pointer, and is passed a 2nd one:

void reduce( Calc that ) {
     sumError += that.sumError;
     // Add the array elements with a simple for-loop... we use this
     // simple utility.
     histogram = Utils.add(histogram,that.histogram);
   }

(7) That's it!  Results are in your original Calc object:

Calc results = new Calc(mean,vec.max()).doAll(myBigVector);
System.out.println(results.sumError+" "+Arrays.toString(histogram));

The Rest of the Story

You have to get the data in there – and we'll import from HDFS, S3, NFS, local disk, or through your browser's upload.  You can drive data upload from Java, but more typically from R, python, REST/JSON, or Excel.  Same for outputing Big Data results: we'll write back Big Data to any store, while being driven by any of the above languages. If you build a predictive model, you'll want to eventually use the model in production.  You can use it in-memory as-is, scoring new datasets on the model – and for example constantly streaming new data through the model while at the same time constant churning out new models to be streamed through.  Or you can get a Java version of any model suitable for dropping into your production environment.

And that's the end of my whirl-wind tour of the H2O Distributed Computing API.  Hope you like it!

Comments & suggestions welcome.

Cliff