Stacked Ensembles and Word2Vec now available in H2O!

Prepared by: Erin LeDell and Navdeep Gill

Stacked Ensembles

sz42-6-wheels-lightened

H2O’s new Stacked Ensemble method is a supervised ensemble machine learning algorithm that finds the optimal combination of a collection of prediction algorithms using a process called stacking or “Super Learning.” This method currently supports regression and binary classification, and multiclass support is planned for a future release. A full list of the planned features for Stacked Ensemble can be viewed here.

H2O previously has supported the creation of ensembles of H2O models through a separate implementation, the h2oEnsemble R package, which is still available and will continue to be maintained, however for new projects we’d recommend using the native H2O version. Native support for stacking in the H2O backend brings support for ensembles to all the H2O APIs.

Creating ensembles of H2O models is now dead simple. You simply pass a list of existing H2O model ids to the stacked ensemble function and you are ready to go. This list of models can be a set of manually created H2O models, a random grid of models (of GBMs, for example), or set of grids of different algorithms. Typically, the more diverse the collection of base models, the better the ensemble performance. Thus, using H2O’s Random Grid Search to generate a collection of random models is a handy way of quickly generating a set of base models for the ensemble.

R:

ensemble <- h2o.stackedEnsemble(x = x, y = y, training_frame = train, base_models = my_models)

Python:

ensemble = H2OStackedEnsembleEstimator(base_models=my_models)
ensemble.train(x=x, y=y, training_frame=train)

Full R and Python code examples are available on the Stacked Ensembles docs page. Kagglers rejoice!

Word2Vec

w2v
\(\)

H2O now has a full implementation of Word2Vec. Word2Vec is a group of related models that are used to produce word embeddings (a language modeling/feature engineering technique in natural language processing where words or phrases are mapped to vectors of real numbers). The word embeddings can subsequently be used in a machine learning model, for example, GBM. This allows user to utilize text based data with current H2O algorithms in a very efficient manner. An R example is available here.

Technical Details

H2O’s Word2Vec is based on the skip-gram model. The training objective of skip-gram is to learn word vector representations that are good at predicting its context in the same sentence. Mathematically, given a sequence of training words $w_1, w_2, \dots, w_T$, the objective of the skip-gram model is to maximize the average log-likelihood

$$\frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t)$$

where $k$ is the size of the training window.
In the skip-gram model, every word w is associated with two vectors $u_w$ and $v_w$ which are vector representations of $w$ as word and context respectively. The probability of correctly predicting word $w_i$ given word $w_j$ is determined by the softmax model, which is

$$p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})}$$

where $V$ is the vocabulary size.
The skip-gram model with softmax is expensive because the cost of computing $\log p(w_i | w_j)$ is proportional to $V$, which can be easily in order of millions. To speed up training of Word2Vec, we used hierarchical softmax, which reduced the complexity of computing of $\log p(w_i | w_j)$ to $O(\log(V))$

Tverberg Release (H2O 3.10.3.4)

Below is a detailed list of all the items that are part of the Tverberg release.

List of New Features:

PUBDEV-2058- Implement word2vec in h2o (To use this feature in R, please visit this demo)
PUBDEV-3635- Ability to Select Columns for PDP computation in Flow (With this enhancement, users will be able to select which features/columns to render Partial Dependence Plots from Flow. (R/Python supported already). Known issue PUBDEV-3782: when nbins < categorical levels, PDP won't compute. Please visit also this post.)
PUBDEV-3881- Add PCA Estimator documentation to Python API Docs
PUBDEV-3902- Documentation: Add information about Azure support to H2O User Guide (Beta)
PUBDEV-3739- StackedEnsemble: put ensemble creation into the back end.

List of Improvements:

PUBDEV-3989- Decrease size of h2o.jar
PUBDEV-3257- Documentation: As a K-Means user, I want to be able to better understand the parameters
PUBDEV-3741- StackedEnsemble: add tests in R and Python to ensure that a StackedEnsemble performs at least as well as the base_models
PUBDEV-3857- Clean up the generated Python docs
PUBDEV-3895- Filter H2OFrame on pandas dates and time (python)
PUBDEV-3912- Provide way to specify context_path via Python/R h2o.init methods
PUBDEV-3933- Modify gen_R.py for Stacked Ensemble
PUBDEV-3972- Add Stacked Ensemble code examples to Python docstrings

List of Bugs:

PUBDEV-2464- Using asfactor() in Python client cannot allocate to a variable
PUBDEV-3111- R API's h2o.interaction() does not use destination_frame argument
PUBDEV-3694- Errors with PCA on wide data for pca_method = GramSVD which is the default
PUBDEV-3742- StackedEnsemble should work for regression
PUBDEV-3865- h2o gbm : for an unseen categorical level, discrepancy in predictions when score using h2o vs pojo/mojo
PUBDEV-3883- Negative indexing for H2OFrame is buggy in R API
PUBDEV-3894- Relational operators don't work properly with time columns.
PUBDEV-3966- java.lang.AssertionError when using h2o.makeGLMModel
PUBDEV-3835- Standard Errors in GLM: calculating and showing specifically when called
PUBDEV-3965- Importing data in python returns error - TypeError: expected string or bytes-like object
Hotfix: Remove StackedEnsemble from Flow UI. Training is only supported from Python and R interfaces. Viewing is supported in the Flow UI.

List of Tasks

PUBDEV-3336- h2o.create_frame(): if randomize=True, value param cannot be used
PUBDEV-3740- REST: implement simple ensemble generation API
PUBDEV-3843- Modify R REST API to always return binary data
PUBDEV-3844- Safe GET calls for POJO/MOJO/genmodel
PUBDEV-3864- Import files by pattern
PUBDEV-3884- StackedEnsemble: Add to online documentation
PUBDEV-3940- Add Stacked Ensemble code examples to R docs

Download here: http://h2o-release.s3.amazonaws.com/h2o/rel-tverberg/4/index.html

Red herring bites

At the Bay Area R User Group in February I presented progress in big-join in H2O which is based on the algorithm in R’s data.table package. The presentation had two goals: i) describe one test in great detail so everyone understands what is being tested so they can judge if it is relevant to them or not; and ii) show how it scales with data size and number of nodes.

These were the final two slides :

Slide14     Slide15

I left a blank for 1e10 (10 billion high cardinality rows joined with 10 billion high cardinality rows returning 10 billion high cardinality rows) because it didn’t work at that time. Although each node has 256GB RAM (and 32 cores) the 10 billion row test involves joining two 10 billion row tables (each 200GB) and returning a third table (also ~10 billion rows) of 300GB, total 700GB. I was giving 200GB to each of the 4 H2O nodes (to leave 50GB on each node for the operating system and my forgiving colleagues) which meant the H2O cluster had 800GB RAM. The join algorithm needed more than a mere 100GB to complete the task and hence failed. Given that a common rule of thumb is “3x data size” for working memory, to fail with 0.1x data size as working memory is very reasonable. Internally we refer to H2O as a fast calculator. Like R and Python it is in-memory. Unlike R and Python a single data frame can be bigger than a single node.

So I scaled up to 10 nodes: 2TB RAM and 320 cores.

But it still didn’t work. It ran for 30 minutes and then failed with out-of-memory. I felt that 2TB of RAM really should be enough to complete this task: 200GB joined with 200GB returning 300GB. 2TB-700GB = 1.3TB; that amount of working memory should be enough. It pointed to something being wrong somewhere. Indeed with help from colleagues we identified one point where data was being duplicated and shouldn’t have been. This reduced the working memory needed and then it worked.

But it took 20 minutes.

This didn’t fit with my gut feel given I knew how the algorithm works (or was supposed to work). It should be nearer to linear scaling. 1 billion rows took under a minute so 10 billion should take 10 minutes. But it was taking twice that: 20 minutes. More to the point I’m not just having a cup of coffee or lunch while it’s running; I’m watching the CPUs and the network traffic between the nodes. The puzzling thing was that the network wasn’t saturated and the CPU’s were often cold. Something was wrong.

There ensued many proposals and discussions as to what it might be. Focusing on the algorithm and its implementation.

Until one day I went back to the 1 billion row test and ran it on 4 nodes, but a different set of 4 nodes. I usually run on servers 6-9. There was no reason for picking those servers. The first time I did it I asked which ones I could use and then never changed it. This time I ran on servers 1-4 instead. It was 3 times slower. At first I thought there must be a difference in the libraries on the servers or my code. After ruling out many things I rubbed my eyes and ran it again a few times again and then again on servers 6-9. It was repeatable and confirmed. How on earth could it make a difference which servers I ran on? All 10 servers are identical spec in the same rack connected to the same switch. Here’s a photo :

rackFront

I was monitoring CPU and network usage. Nothing else (either human or artificial) was using the cluster. I was the only one. I had this physical cluster to myself. For sure.

I realized we might have been barking up the wrong tree when we were looking at the algorithm and its implementation. When I scaled up to 10bn rows and 10 nodes, perhaps I didn’t just scale up as intended, but perhaps I included a server that was somehow, faulty?!

I reran on servers 2-5 and it was 3 times faster than on servers 1-4. The culprit appears to be server 1, then. We have a network tester in H2O that I had already run but ran it again. Our server names correspond to the last digit of their IP addresses: server 1 = .181, server 2 = .182, server 3 = .183, etc. Here was the image :

NetworkTest

There doesn’t appear to be much wrong here. I asked colleagues and they advised to more thoroughly check the network speeds as they’d seen problems in the past. I Googled for how to test network speed which quickly returned iperf. I knew the problem might be server 1 so I chose server 3 to be the server and compared speeds from servers 1, 2 and 4 as clients to server 3 as the server. Here’s the result :

iperf

So server 1 is more than 10 times slower the others. I trotted over to our server room again and I had a look at the back of server 1.

rackBack   rackBackOrange

See that orange light? That’s what was wrong. Either the switch or the network card had auto negotiated itself down to 1G speed when all its friends in the rack are happy at 10G speed. Despite it being up 57 days, it hadn’t auto-negotiated itself back up to 10G speed. Or something like that. We think.

What’s the solution? Old school: I unplugged the ethernet cable and plugged it back in. The orange light turned green. I went back to my laptop and tested again with iperf. This time iperf reported 10G speed for server 1 consistent with the other servers. The non-physical way to do this is to use ethtool. Next time a problem occurs I’ll try it to save some foot steps to the server room.

Rerunning the 10 billion row to 10 billion row high cardinality join test now comes in twice as fast: 10 minutes instead of 20 minutes. I’m not really sure why that made such a big difference since the network wasn’t saturated enough for it to be a pure data transfer speed issue. I’ll chalk it up to something to do with that network card or switch and move on. I’ll ensure that iperf reports 10G speed between all nodes going forward.

In my talk in Chicago this week I connected to these servers and gave a live demo. That video along with all our other presentations that day are available here.

The event itself was quite packed as seen in the picture below:
h2o-chicago

Big Joins, Scalable Data Munging and Data.Table

Matt Dowle, Hacker, H2O.ai

I’ll be presenting on the same topic at Data by the Bay on Monday 16 May.

Fast csv writing for R

R has traditionally been very slow at reading and writing csv files of, say, 1 million rows or more. Getting data into R is often the first task a user needs to do and if they have a poor experience (either hard to use, or very slow) they are less likely to progress. The data.table package in R solved csv import convenience and speed in 2013 by implementing data.table::fread() in C. The examples at that time were 30 seconds down to 3 seconds to read 50MB and over 1 hour down to 8 minute to read 20GB. In 2015 Hadley Wickham implemented readr::read_csv() in C++.

But what about writing csv files?

It is often the case that the end goal of work in R is a report, a visualization or a summary of some form. Not anything large. So we don’t really need to export large data, right? It turns out that the combination of speed, expressiveness and advanced abilities of R (particularly data.table) mean that it can be faster (so I’m told) to export the data from other environments (e.g. a database), manipulate in R, and export back to the database, than it is keeping the data in the database. However, the data export step out of R being the biggest bottleneck preventing that workflow is being increasingly heard from practitioners in the field. The export step may be needed to send clean data or derived features to other teams or systems in the user’s organization.

Yes: feather, I hear you thinking! Indeed feather is a new fast uncompressed binary format which is also cross-language, by Wes McKinney and Hadley Wickham. As Roger Peng pointed out R has had fast XDR binary format for many years but the innovation of feather is that it is compatible with Python and Julia and other systems will adopt the format too. This is a great thing. Note that it is very important to set compress=FALSE in save() and saveRDS() to achieve comparable times to feather using base R.

I was thinking about jumping on board with feather when to my surprise 2 weeks ago, the data.table project received a new user contribution from Otto Seiskari: fwrite(). Analogous to fread() it writes a data.frame or data.table to .csv and is implemented in C. I looked at the source and realized a few speed improvements could be made. Over the years I’ve occasionally mulled over different approaches. This contribution gave me the nudge to give those ideas a try.

The first change I made was to stop using the fprintf() function in C. This takes a file handle and a format specification string and writes a field to the file. If you have 1 million rows and 6 columns that’s 6 million calls to that function. I’ve never looked at its source code but I guessed that because it was being iteratively called 6 million times, some tasks like interpreting the format specification string and checking the file handle is valid were being performed over and over again wastefully. I knew it maintains its own internal buffer automatically to save writing too many very small chunks of data to disk. But I wondered how much time was spent writing data to the disk versus how much time was spent formatting that data. So I added my own in-memory buffer, writing to it directly and moved from fprintf() to the lower level C functions write(). Note: not the C function fwrite() but write(). This writes a buffer of data to a file; it doesn’t have an internal buffer in the way. I did this so I could wrap the write() call with a timing block and see how much time was being spent writing to file versus formatting.

Here is the result :

Step1

Wow: 99% of the time is being spent formatting! The time is not spent writing to disk at all. Not one ounce.

Can we parallelize it then? On first glance, the data has to be written to the file in a serial fashion: row 1, then row 2, then row 3. Perhaps the data could be divided into N subsets, each subset written out in parallel, then joined together afterwards. Before rushing ahead and doing it that way, let’s think what the problems might be. First the N subsets could be writing at the same time (to different files) but that could saturate the internal bus (I don’t know much about that). Once the N files are obtained, they still have to be joined together into one file. I searched online and asked my colleagues and it appears they is no way to create a file by reference to existing files; you have to concatenate the N files into a new single file (a copy). That step would be serial and would also churn through disk space. It’s possible there would not be enough disk space available to complete the task. Having anticipated these problems, is there a better way I could implement from the get go?

What did I have one of that I could make N of? Well, I had one buffer: the input to write(). If I make N threads and give each thread its own buffer can it be made to work? This is inherently thread safe since the only memory write would be to a thread’s private buffer. The only tricky aspect is ensuring that each piece is written to the output csv file in the correct order. I’m on good terms with Norman Matloff who has recommended OpenMP to me in the past and kindly gave me a copy of his excellent book Parallel Computing for Data Science. I turned to Google and quickly found that OpenMP has exactly the construct I needed :

Step2

OpenMP does a lot of work for the programmer. It directs the team of threads to work on each iteration of the for loop. The code after the #pragma omp order gets executed one-at-a-time and in-the-order-of-the-for-loop, automatically. So if iteration 7 finishes quickly for some reason, that thread will wait for iterations 1-6 to finish before it writes its buffer and receives the next piece to work on.

I chose the buffer size for each thread to be small (1MB) to fit in each core’s cache. This results in the number of pieces N to be much larger (size of data / 1MB) than the number of threads so that each piece is not too big to get stuck and cause a blockage and not too small either to incur wasteful startup and scheduling overhead. Here is the result on my 4-core MacBook Pro (running bare-metal Ubuntu, of course):

Step3

This was one of those rare moments where the speedup factor was the best I hoped for. An ideal 4x improvement: 3.6 seconds down to 0.9 seconds.

I thought I’d have to add lots of tests for fwrite() but I looked at the test suite and found that Otto had already added 26 tests in his pull request that I’d merged. My changes had broken quite a few of them so I fixed the code to pass his tests. Then I pushed to GitHub for others to test on MacOS and Windows. Within hours feedback came back that the speedup and correctness were confirmed.

One outstanding itch was that each field was being written to the buffer by the C function sprintf(). Although this is writing to an in-memory buffer directly, I wondered how big the overhead of interpreting the format string and actually calling the library function was. I knew from fread() that specialized code that is tuned to be iterated can make a big difference. So I created specialized writeNumeric and writeInteger functions in C using base R’s C source code (e.g. format.c:scientific()) as a guide to give me some clues.

Step4

That’s better. Another 3x speedup: 0.9 seconds down to 0.3. This feels like the lower bound to me. So far we’ve been working with 1 million rows and an output csv file of around 150MB. Quite small by many standards. Let’s scale up 10x to 10 million rows on my laptop with SSD and 100x to 100 million rows on one of our 32 core / 256 GB physical servers. The reproducible code is at the end of this article. On the server we test writing to ram disk (/run/shm) versus writing to hard disk. Ram disk may be appropriate for ephemeral needs; e.g. transferring to a database where there is no requirement to keep the transfer file.

Results

To my surprise, writing a csv file using data.table::fwrite() appears to be faster than writing a binary file with feather. Your mileage may of course vary. If you ever need to quickly inspect the data using familiar command line text tools like head, tail, wc, grep and sed then it seems that csv files are still in the running then. Sometimes a csv format can turn out to be quite efficient. Consider the number 1.2 for example. In R that is an 8 byte ‘double’ but in the csv it’s just 3 characters wide, a 62% saving. This may be why the csv size (7.5GB) is slightly smaller than the binary (9.1GB).

Here’s a small display of the data being tested:

Step5

fwrite is available to test now in data.table v1.9.7 in development. Check out our installation notes for how to install the dev version and get OpenMP where necessary. Please try it out and let us know your findings. It works just as well on both data.frame and data.table. Note there are some outstanding items to complete before release to CRAN, collated here.

fwrite() is a great improvement over write.csv(): 63 seconds down to 2 seconds for the 10 million row test on my laptop. write.csv() converts the input to character format first in-memory. This creates new string objects for each and every unique integer or numeric which takes compute cycles to create and hits the global character cache (a hash table) and uses more RAM. There is an advantage of doing it that way, though. The character conversion is object oriented so many user or package defined classes with their own print methods in R code will just work automatically with no C programming required.

What’s the connection to H2O? Usually you would import data into H2O using h2o.importFile() called from either the R or Python H2O packages. h2o.importFile() has been parallel and distributed for some years. To test big-join I’m using it to load two 200GB files into a 10 node cluster with 2TB of RAM, for example. I’ll write more about this in a future article. Sometimes however, it’s necessary or convenient to transfer data between H2O and the R client. This step currently uses base R’s write.csv and read.csv. We intend to replace these calls with fwrite/fread. We’ll also look at the H2O Python package and see if we can improve that similarly.

If you’d like to hear more about big-join in H2O I’ll be presenting at our Open H2O Tour in Chicago on May 3rd, 2016 together with lots of other amazing people including Leland Wilkinson, Jeff Chapman, Erin LeDell, Mark Landry and others. And use the promo code H2OYEAH to get a 50% discount. Also please catch up with me at Data-by-the-Bay where I’ll be presenting on May 16th, 2016.

This article was also published on R-Bloggers here: http://www.r-bloggers.com/fast-csv-writing-for-r/

Appendix

Reproducible code adapted from this Stack Overflow question

install.packages(“data.table”, type=”source”, repos=”https://Rdatatable.github.io/data.table”)
For Mac and Windows, see https://github.com/Rdatatable/data.table/wiki/Installation
require(data.table) # v1.9.7

devtools::install_github(“wesm/feather/R”)
require(feather) # v0.0.0.9000

require(readr) # CRAN v0.2.2

DTn = function(N) data.table( # or data.frame. Both work just as well.
str1=sample(sprintf(“%010d”,sample(N,1e5,replace=TRUE)), N, replace=TRUE),
str2=sample(sprintf(“%09d”,sample(N,1e5,replace=TRUE)), N, replace=TRUE),
str3=sample(sapply(sample(2:30, 100, TRUE), function(n) paste0(sample(LETTERS, n, TRUE), collapse=””)), N, TRUE),
str4=sprintf(“%05d”,sample(sample(1e5,50),N,TRUE)),
num1=sample(round(rnorm(1e6,mean=6.5,sd=15),2), N, replace=TRUE),
num2=sample(round(rnorm(1e6,mean=6.5,sd=15),10), N, replace=TRUE),
str5=sample(c(“Y”,”N”),N,TRUE),
str6=sample(c(“M”,”F”),N,TRUE),
int1=sample(ceiling(rexp(1e6)), N, replace=TRUE),
int2=sample(N,N,replace=TRUE)-N/2
)
set.seed(21)
DT = DTn(1e6)
Either ram disk :
setwd(“/dev/shm”)
or HDD/SDD :
setwd(“~”)
system.time(fwrite(DT,”fwrite.csv”))
system.time(write_feather(DT, “feather.bin”))
system.time(save(DT,file=”save1.Rdata”,compress=F))
system.time(save(DT,file=”save2.Rdata”,compress=T))
system.time(write.csv(DT,”write.csv.csv”,row.names=F,quote=F))
system.time(readr::write_csv(DT,”write_csv.csv”))

NB: It works just as well on both data.frame and data.table