Since I’ve been going on about data science culture stuff lately, I think it’d be nice to nerd out a bit this week.
It’s 2020, soon (thankfully) to be 2021. We now largely live in a post “no SQL” (as opposed to noSQL) world where even ridiculously large, planet-scale distributed database systems are largely accessible via the SQL language again.
Even just 5 years ago, there was a rather awkward period of time where many “Big Data” operations required the use of esoteric methods through Hadoop and the MapReduce computing framework. Things like writing straight up MapReduce jobs with custom Java code, or SQL-like (but not quite SQL) frameworks like Apache Pig and Hive.
This was the age when people decided that things like full ACID compliance, something that traditional relational databases are designed to handle was unnecessary to varying degrees, often for performance and scalability reasons. It led to a bit of a rejection the SQL language that is tightly bound to databases, because if we’re adopting newer forms of data storage and access via noSQL to achieve our scaling goals, do we really need the baggage of SQL? Also, people hadn’t figured out how to write query parsers that could translate queries into the required operations yet.
During this time, I found myself writing Python and Golang code to send to the Hadoop cluster’s Streaming MapReduce API to get answers. It was a wild time, though it was really cool to use the cluster as a massively distributed grep processor using just a shell script.
Current industry trends seems to have tilted back to the realization that SQL the language isn’t going anywhere. Too many systems, too many people, are already familiar with SQL. It is actually easier to create a SQL interface for massively scaled noSQL systems than it is to force people to learn a different language API for every new database. Having a SQL interface means you have access to an immense pre-trained pool of users, which is important if you want your product to gain traction and adoption in the vast datastore marketplace.
Today, you don’t hear a ton about MapReduce and Hadoop any more, though the tech stack is definitely still in use all over the world. All that low level functionality has been papered over with more user-friendly systems on top. You don’t write raw MR jobs, you send a SQL query to Spark, Presto, BigQuery, Redshift, etc.. Those systems will let you access the terabytes of data you wanted, in the pretty expressive syntax of SQL, without the extra coding overhead.
So, now that MapReduce is largely out of the big picture now, why care today?
Performance.
More specifically, just like with any tool, there are effective and ineffective ways to apply the tool. While we’re not directly writing MapReduce jobs any more, we’re very often interacting with systems that use some variation on MR, even if only for part of the query execution. Just like how understanding how indexes work in a traditional relational database like PostgreSQL lets you write better queries, understanding how MapReduce often does the same on these modern distributed database systems.
Overview
MapReduce was introduced to the world in 2004 in a Google paper by Jeff Dean and Sanjay Ghemawat. It’s a surprisingly easy to read paper, even for non-CS person like myself. This paper would lead to the open source version of a system that implements MapReduce, which we all now know as Hadoop. Hadoop would provide many of the things that the original MapReduce paper would mention, including the underlying file system (HDFS), all the cluster management facilities, job tracking, etc.
The programming model of MR is so ridiculously simple that it takes 3 whole paragraphs in the paper. That’s it. I’ll just copy it here:
2. Programming Model
The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The user of the MapReduce library expresses the computation as two functions: Map and Reduce.
Map, written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the Reduce function.
The Reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate values are supplied to the user’s reduce function via an iterator. This allows us to handle lists of values that are too large to fit in memory.
The rest of the MapReduce essentially just talks about all the various aspects of cluster and job management, handling failure, and various implementation details. You could argue that it’s all this management stuff that makes MR the horizontally scaling general processing powerhouse that it is. We don’t particularly care about a lot of the implementation details, but sometimes it matters.
So let’s take this framework and make it more concrete and easy to understand. The most simple program that everyone uses as an example is the word count program, so we can use it here.
The Standard Flow
All MR jobs follow these basic steps:
Map
Combine (optional)
Shuffle (sorta transparent to the user)
Reduce
Sometimes, it’s not possible to do the data processing that you want with one series of steps, so you have to a chain of multiple MR jobs to accomplish your task. In such a case, the final output of MR#1’s Reduce would directly feed into MR#2’s Map function.
1. Map
This is the general input step. All the worker nodes are assigned chunks of data to read through. In this wordcount example, imagine we’re reading through a gazillion web pages, like a full copy of Wikipedia, and each Wikipedia page is a file. The MR framework would assign a set of files to every worker to apply the Map function to.
Since we’re counting words, the Map function will read every file, split things into words, and emit (word, 1) for every word it encounters. So you get something like: (The,1) (quick,1) (brown,1) (fox,1) (jumps,1) (over,1) (the,1) (lazy,1) (dog,1). (Yes, I’m being naïve here and not even making “The” lowercase)
It’s important to notice here that there’s no predicting which worker gets which file to read. It can be any file, in any order. This allows for MapReduce’s massive horizontal scaling capability. Just add more workers and you can read more files in parallel. If you had infinite hardware, you could potentially read every single file simultaneously, reducing your initial data read time to the length of the longest page on Wikipedia.
BUT, at the same time, notice that there’s no facility for a more traditional database index. A DB index tells the database “This word exists on line 3, 125, 4414 of this file, which is on this file/section of disk”. MR in this instance can’t read into a specific position of a file. At best, it can potentially split a file into multiple chunks to share amongst workers under very specific situations, but ultimately the entire file must be read.
This then points to some more optimization details of MapReduce. You don’t want your data too clustered. A lot of the horizontal scaling power comes from the ability to seamlessly distribute work evenly amongst workers. You don’t want machines sitting idle waiting for other machines to finish. In a perfect world, every file takes exactly the same amount of time to process.
In practice this often isn’t a big deal because MR is used on typically used on log data, where every line is a data object and logs are rotated regularly, keeping them roughly similar in size. Files are stored in chunks that usually naturally divides the work evenly.
2. Combine (optional)
Map and Reduce functions are expected to be executed completely arbitrary machines, with no context being shared. It could be the same machine, or completely different ones. That means all those tuples of data the Map phase output: (The,1) (quick,1) (brown,1) (fox,1) (jumps,1) (over,1) (the,1) (lazy,1) (dog,1), would have to be transmitted over the network, and possibly written to disk if it’s too large to fit in memory (which it often is).
A very easy way to speed things up is to locally combine word counts on the Mapping machine, before the Map phase ends and the Reduce phase starts. Very often the Combiner here is very similar (if not identical) to the code that would ultimately run in the actual Reduce step.
In fact, you can even optimize the Map phase somewhat before you even get to the Combine step by aggregating a little bit beforehand, as memory allows. For example, you can keep a LRU cache that aggregates values, and when something falls out of the cache, you emit that word's count. You might end up with things like (the, 512), (cat, 3), (hat,4), (cat,1) due to how the cache works, but that’s much more compact that 512x (the,1) entries.
Depending on the exact workload, it might be worth writing one, or both, of these optimizations.
3. Shuffle (transparent to the user)
This step is largely transparent to the user, but is good to know about because it actually lets the Reduce code work cleanly in the way it does. It also shows up as a very time intensive phase of an MR job, so users inevitably will ask what the shuffle step is actually doing.
The shuffle is something of a footnote in the original Google paper, it’s mentioned primarily in the a section on Sorting, because the shuffle is the step that provides a kind of sorting functionality in MR.
Officially, Shuffle is tasked with sorting all the keys that are coming out of the Map phase, the Google paper mentions that it guarantees that the keys (not the values) are sorted monotonically increasing: (a,1), (a,5), (a,2), (b,123), (d,55).
This behavior is very much similar to the Unix sort function.
All those sorted keys are then divided amongst the various Reducer workers. One quirk that makes the system work is that all the reducers (effectively) see only 1 key at a time. The reducer function is called anew whenever it hits a new key.
While the Shuffle is considered a bit of an incidental step, you wind up having to indirectly optimize it while working. The shuffling takes longer when you have to move lots of data around, and can be made better, or worse, due to how you aggregate and emit keys. The combiner step is one way to help optimize the shuffle. Similarly, filtering operations within the Map can also be very effective because it will reduce the total amount of data being moved around by the entire MR job.
4. Reduce
The easiest way to think about the reducer function is they take all the values of a single key and iterate over all the values of the key in arbitrary value order. In our word counting example, a reducer would get all the values of “the”, the hundreds of thousands of ‘1’s, and it would just sum up all the values. When it runs out of values for the key, it will emit a single sum: (the, 154214). This would be the final count of the word in the corpus of data.
BUT, that’s a bit of an oversimplification.
If you look back at the original description of the Reduce step, it says it gets “a set of values for that key”. It doesn’t say any single reducer gets ALL the values at once. How could we get the count value with such a setup? It relies on the fact that the reducer code needs to be idempotent. You can run the Reducer on the data multiple times, on the whole set at once, or a bunch of subsets and then the results of those subsets, and still come to the same final answer.
In our case, the “sum up the values for the key, report the sum” is idempotent, so if the list of (the,1) entries is too long, MR can split it across a ton of machines knowing that the same key has been distributed, and run the Reducer function a bunch of times until it stops changing. That would yield the correct final result.
Why subqueries are so important these days
When you think about how to optimize a relational database queries, subqueries are pretty high in the list of things you do not want to use, because they typically create these un-indexed tables in memory that might potentially spill to disk and be very slow.
This is often the opposite case for a MapReduce-backed noSQL database.
Since indexes don’t really exist in MR, and most steps are spilled to disk between steps anyways, the downsides of a subquery doesn’t matter. Instead, you get one big upside: early filtering.
Most subqueries wind up cutting down the number of entries that are processed. That means less data is transmitted in shuffle and reduce steps. That has knock-on effects on the amount of work done later on.
So how do joins work?
So far we’ve been dealing with a single data source, but what if we need to join data from a second table (or more) to do our analysis, like say, count words provided someone searched for it yesterday on Wikipedia? We need a JOIN of some sort. How would we do this?
There’s two classes of ways to get the job done: Map-side and Reduce-side. These are named for when the information from the outside data source is attached to the main data to be analyzed.
Map side join
This method involves having the mapper be the one joining in external data. These only work for small joining tables that fit in memory of the worker. But it saves a extra computational work if you can do it.
When the mapper initializes, you can tell it to load up a small amount of data into memory, for example read a CSV, run a query on a server, whatever. Then you can use that information to adjust how you emit results out of the Mapper.
For example, imagine if you loaded up a 500-entry list of “words people searched yesterday”. You can put that list into a quick hashmap, and the Mapper will only emit words it reads when the word is in the hashmap. This would be effectively equivalent to an INNER JOIN between the file and the table.
Obviously you can do more complicated things like modify the keys emitted, etc.. Anything pretty much goes since you’re just spitting out key-value pairs.
Reduce side join
By the name, obviously this one has the JOIN code running on the Reducer. However, what does that entail? While in theory you could have the Reducer read in external data like a Map-side join (it’s an arbitrary program after all), this isn’t what reduce-side means in this instance.
A reduce-side join means having all the join data sent to the reducer normally as key-values from the mappers. A common situation where this comes up is if you had your mappers reading 2 files, say, one is a CSV with sales volume data and one is a JSON file with customer data. Both files are too large to fit into worker process memory, but you want to join the two together. What do you do?
Remember that during the shuffle phase, all the keys are sorted and equal keys go together and effectively wind up in the same place for the reducing step. So you need to make sure that all the sales and customer information for the same customer gets to the same reducer by making sure they’re emitted with the same key in the Map phase.
Effectively, this is the ON clause of the JOIN statement, declaring a condition of customer.id = sales.customer_id. Your join key is the same as if it were in SQL.
Once you make sure all the relevant information is sent to reducer properly, you THEN have to make sure the reducer can understand the two bits of information it gets and completes the actual join logic. Remember that since reducer input comes in arbitrary order, you have to do the work to make sure everything is done correctly.
Yes, this is a major pain in the butt, and sometimes it can be very difficult to come up with a good join plan, so your code becomes a mess.
It’s thanks to this increased pain in having to do joins that you see quite a bit of denormalization within log data. You don’t want to have to join and code against multiple data files to get an answer all the time. It’s much faster and easier to have commonly used fields together, even if its a bit repetitious. The extra cost of space is often much less than the cost of someone’s annual salary burned up doing messy joins.
Wait, but how do non-equi joins work?
You may have noticed that JOINs with exact matches translate really well into MapReduce algorithms, you just take all the equality conditions, glob them together, and you have your join key. But what about things that aren’t equalities? Where’s my >, <, !=, and friends?
Imagine we had to do an inequality join, like say, A JOIN B on A.date < B.date, and both tables are too big to do a map-side join. A horribly naive implementation would be to create the Cartesian product — brute force compare (and emit) every A entry with B, an O(n^n) operation… sending out pairs of (<A.date, B.date>, <A.*, B.*>) with along with the fields in both entries I want to join, then have the reducer do the work of trimming it down. Obviously this is MASSIVELY ineffecient.
In fact, this problem was hard enough that when I was writing MR jobs on the job in 2015, we had tested out Apache Hive to do query work, and it flat out did not support non-equi joins. You just couldn’t do them. Part of the massive appeal of Apache Spark at the time was because their new RDD based processing methods and SparkSQL allowed for non-equi joins.
Apparently in the area of Relational Algebra, all these joins fall under the term “theta join”, meaning, they use the symbol theta to represent an operator between two sets, A θ B, where θ could any comparison operator: =, !=, <, <=, >, >=. The one we’ve been able to handle thus far with MR uses the = operator, and is called the Equi Join. So if you want to read about the current research around all the other comparison operators, it seems that “MapReduce theta join” is the set of keywords to read about.
This paper from 2011, “Processing Theta-Joins using MapReduce” (link) by Okcan and Riedewald gives various algorithms for producing theta joins, using statistics about the input cardinality to select the best algorithm for the situation.
This other paper, “Efficient Multi-way Theta-Join Processing Using MapReduce” (arXiv) from 2012 by Zhang, Chen, and Wang goes quite a bit over my head and takes the problem even further by considering multiple table joins. From what little I can understand (what the heck is a Hilbert Space? O.o), there are potentially strategies to enumerate and trim down number of pairwise comparisons made, which would offer a performance boost above the naive solution.
Luckily, technology has marched onwards since 2015. BigQuery had announced theta-join support in 2016, and massively distributed data platforms all essentially support SQL, with all the potential for crazy non-equi joins. Obviously, as the problem has become more and more well-studied since 2011, it has become slightly less of a problem as researchers found newer, more efficient, ways of handling the theta-join.
And so, knowing MapReduce has rapidly dropped out of favor
As you can see in the chart, knowing how to write MapReduce jobs isn’t particularly cool or useful anymore. Anyone who knows how to write these things now mostly has a weird parlor trick they can show off at really boring parties and interviews.
I feel this is generally a good thing.
About this newsletter
I’m Randy Au, currently a quantitative UX researcher, former data analyst, and general-purpose data and tech nerd. The Counting Stuff newsletter is a weekly data/tech blog about the less-than-sexy aspects about data science, UX research and tech. With occasional excursions into other fun topics.
Comments and questions are always welcome, they often give me inspiration for new posts. Tweet me. Always feel free to share these free newsletter posts with others.
All photos/drawings used are taken/created by Randy unless otherwise noted.
Great overview of Mapreduce!