Never trust a profiler

A week or so ago I had mentioned to Timon that for the first time a profiler had actually pointed me in a direction that directly lead to a positive increase in performance. Initially Timon just gave me that “you’re just a crotchety old man” look (which, in most cases, is the correct response). I pointed him to Josh Bloch’s Performance Anxiety presentation which dives into why it is so hard (in fact “impossible” in Josh’s words) to benchmark modern applications. It also references the interesting paper “Evaluating the Accuracy of Java Profilers”.

Just last week I was trying to track down a severe performance degradation in my snapshot recovery code. I was under some time pressure so I turned on my profiler to try to point me in the right direction. The result that it gave was clear, repeatable and unambiguous and pointed me into the linear probing algorithm of the hash table that I am using. Since I had recently moved to a new hash table (one that allowed for rehashing and resizing) it was possible that this was in fact the root of my performance problem but I had my doubts. (More on this in a future post.) I swapped out my hash table and re-profiled. The profiler again gave me a clear, repeatable and unambiguous result that my performance woes were solved so I moved on. When we were able to test the snapshot recovery code on production snapshots, we found that the performance problems still existed.

My profiler lied to me. Never trust your profiler.

A bit of Show and Tell

The show:

Oct 21, 2011 -- Events per second

Nov 7, 2011 -- Events per second

The tell:

We are in the process of ramping up the traffic on the new architecture that we’ve been blogging about. We recently reached a new milestone: 2 billion events in a single day. The system poked its head up, looked around a little, shrugged and went back to sleep. It performed very well yawning the whole time.

Information Sharing

In a startup there is always a worry that one would reveal too much information about their infrastructure to their competitors that they’ll be able to “simply copy it” and erode their competitive advantage. I will readily admit that from time to time I too suffer from that worry but then the logical side takes over and says “a single technology / idea / approach does not make a scalable infrastructure”. This morning I was reading Alex Payne‘s blog post on “Node and Scaling in the Small vs Scaling in the Large“. There are a number of passages that resonated with me but I will focus on one in particular:

In a system of significant scale, there is no magic bullet.

When your system is faced with a deluge of work to do, no one technology is going to make it all better. When you’re operating at scale, pushing the needle means a complex, coordinated dance of well-applied technologies, development techniques, statistical analyses, intra-organizational communication, judicious engineering management, speedy and reliable operationalization of hardware and software, vigilant monitoring, and so forth. Scaling is hard. So hard, in fact, that the ability to scale is a deep competitive advantage of the sort that you can’t simply go out and download, copy, purchase, or steal.

Either the ability to scale is in the DNA of your technologists or its not. If you don’t have that DNA then you can pick up any technology no matter how performant and make it fail. If you do have the DNA then you’re going to find a way to be successful.

Building a Big Analytics Infrastructure

There is much buzz about “big data” and “big analytics” but precious little information exists about the struggle of building an infrastructure to tackle these problems. Some notable exceptions are Facebook, Twitter’s Rainbird and MetaMarket’s Druid. In this post we provide an overview of how we built Aggregate Knowledge’s “big analytics” infrastructure. It will cover how we mix rsyslog, 0MQ and our in-house streaming key-value store to route hundreds of thousands of events per second and efficiently answer reporting queries over billions of events per day.

Overview and Goal

Recording and reporting on advertising events (impressions, interactions, conversions) is the core of what we do at Aggregate Knowledge. We capture information about:

  • Who: audience, user attributes, browser
  • What: impression, interaction, conversion
  • Where: placement, ad size, context
  • When: global time, user’s time, day part

just to name a few. We call these or any combination of these our keys. The types of metrics (or values) that we support but aren’t limited to:

  • Counts: number of impressions, number of conversions, number of unique users (unique cookies)
  • Revenue: total inventory cost, data cost
  • Derived: click-through rate (CTR), cost per action (CPA)

Our reports support drill-downs and roll-ups all of the available dimensions — we support many of the standard OLAP functions.

We are architecting for a sustained event ingest rate of 500k events per second over a 14 hour “internet day” yielding around 30 billion events per day (or around 1 trillion events a month). Our daily reports run over billions of events should take seconds to run and our monthly or lifetime reports run over hundreds of billion events should take at most minutes.

Over the past few years we have taken a few different paths to produce our reports with varying degrees of success.

First Attempt: Warehouses, Map-Reduce and Batching

When I first started at Aggregate Knowledge we had a multi-terabyte distributed warehouse that used Map-Reduce to process queries. The events were gathered from the edge where they were recorded and batch loaded into the warehouse on regular intervals. It stored hundreds of millions of facts (events) and took hours to generate reports. Some reports on unique users would take longer than a day to run. We had a team dedicated to maintaining and tuning the warehouse.

At the time our event recorders were placed on many high-volume news sites and it was quite common for us to see large spikes in the number of recorded events when a hot news story hit the wires. It was common for a 5 minute batch of events from a spike to take longer than 5 minutes to transfer, process and load which caused many headaches. Since the time it took to run a report was dependent on the number of events being processed, whenever a query would hit one of these spikes, reporting performance would suffer. Because we provided 30-, 60- and 90-day reports, a spike would cause us grief for a long time.

After suffering this pain for a while, this traditional approach of storing and aggregating facts seemed inappropriate for our use. Because our data is immutable once written, it seemed clear that we needed to pre-compute and store aggregated summaries. Why walk over hundreds of millions of facts summing along some dimension more than once if the answer is always a constant — simply store that constant. The summaries are bounded in size by the cardinality of the set of dimensions rather than the number of events. Our worries would move from something we could not control — the number of incoming events — to something that we could control — the dimensionality and number of our keys.

Second Attempt: Streaming Databases and Better Batching

Having previously worked on a financial trading platform, I had learned much about streaming databases and Complex Event Processing (e.g. Coral8, StreamBase, Truviso). Our second approach would compute our daily summaries in much the same way that a financial exchange keeps track and tally of trades. The event ingest of the streaming database would be the only part of our infrastructure affected by spikes in the number of events since everything downstream worked against the summaries. Our reporting times went from hours to seconds or sub-seconds. If we were a retail shop that had well-known dimensionality then we would likely still be using a streaming database today. It allowed us to focus on immediate insights and actionable reports rather than the warehouse falling over or an M-R query taking 12 hours.

Once worrying about individual events was a thing of the past, we started to look at the dimensionality of our data. We knew from our old warehouse data that the hypercube of dimensional data was very sparse but we didn’t know much else. The initial analysis of the distribution of keys yielded interesting results:

Zipf: Key frequency

Keys are seen with frequencies that tend to follow Zipf’s Law:

the frequency of any key is inversely proportional to its rank in the frequency table

Put simply: there are a large number of things that we see very infrequently and a small number of things that we see very often. Decile (where the black line is 50%) and CDF plots of the key frequency provide additional insights:

Key Count Distribution DecileKey Frequency CDF

60% of our keys have been seen hundreds of times or less and around 15% of our keys had been seen only once. (The graph only covers one set of our dimensions. As we add more dimensions to the graph the CDF curve gets steeper.) This told us that not only is the hypercube very sparse but the values tend to be quite small and are updated infrequently. If these facts could be exploited then the storage of the hypercube could be highly compressed even for many dimensions with high cardinality and stored very efficiently.

We improved our transfer, transform and loading batch processes to better cope with event volume spikes which resulted in less headaches but it still felt wrong. The phrase “batching into a streaming database” reveals the oxymoron. We didn’t progress much in computing unique user counts. Some of the streaming databases provided custom support for unique user counting but not at the volume and rate that we required. Another solution was needed.

Third Attempt: Custom Key-Value Store and Streaming Events

From our work with streaming databases we knew a few things:

  • Out-of-order data was annoying (this is something that I will cover in future blog posts);
  • Counting unique sets (unique users, unique keys) was hard;
  • There was much efficiency to be gained in our distribution of keys and key-counts;
  • Structured (or semi-structured) data suited us well;
  • Batching data to a streaming database is silly;

Unfortunately none of the existing NoSQL solutions covered all of our cases. We built a Redis prototype and found that the majority of our code was in ingesting our events from our event routers, doing key management and exporting the summaries to our reporting tier. Building the storage in-house provided us the opportunity to create custom types for aggregation and sketches for the cardinality of large sets (e.g. unique user counting). Once we had these custom types it was a small leap to go from the Redis prototype to a full-featured in-house key-value store. We call this beast “The Summarizer”. (“The Aggregator” was already taken by our Ops team for event aggregation and routing.)

The summarizer simply maps events into summaries by key and updates the aggregates. Streaming algorithms are a natural fit in this type of data store. Many O(n^m) algorithms have streaming O(n) counterparts that provide sufficiently accurate results. (We’ll be covering streaming algorithms in future posts.) It provides us with a succinct (unsampled) summary that can be accessed in O(1). Currently we can aggregate more than 200,000 events per second per core (saturating just shy of 1 million events per second) where some events are aggregated into more than ten summaries.

Our summaries are computed per day. (Future blog posts will provide more information about how we treat time.) They are designed such that they rarely contain more than 10M rows and are stored in CSV format. Initially we used CSV simply because we already had all of the code written for ingesting 3rd party CSV data. We quickly found other uses for them: our analysts gobbled them up and use them in Excel, our data scientists use them directly in R, and even our engineers use them for back-of-the-envelope calculations. Having manageable summaries and/or sketches enabled agile analytics.

To get the events into our summarizer we completely rethought how events move through our infrastructure. Instead of batching events, we wanted to stream them. To deal with spikes and to simplify maintenance, we wanted to allow the events to be queued if downstream components became unavailable or unable to meet the current demand. We needed to be able to handle our desired ingest rate of 500k events per second. The answer was right under our noses: rsyslog and 0MQ. (See the “Real-Time Streaming for Data Analytics” and “Real-Time Streaming with Rsyslog and ZeroMQ” posts for more information.)

Wrap Up

Our challenge was to be able to produce reports on demand over billions of events in seconds and over hundreds of billions in minutes while ingesting at most 500,000 events per second. Choosing the defacto technology de jour caused us to focus on fixing and maintaining the technology rather than solving business problems and providing value to our customers. We could have stayed with our first approach, immediately scaling to hundreds of nodes and taking on the challenges that solution presents. Instead, we looked at the types of answers we wanted and worked backwards until we could provide them easily on minimal hardware and little maintenance cost. Opting for the small, agile solution allowed us to solve business problems and provide value to our customers much more quickly.

Footnote

Astute readers may have noticed the point on the far-left of the CDF graph and wondered how it was possible for a key to have been seen zero times or wondered why we would store keys that have no counts associated with them. We only summarize what is recorded in an event. The graph shows the frequency of keys as defined by impressions and it doesn’t include the contribution of any clicks or conversions. In other words, these “zero count keys” mean that for a given day there are clicks and/or conversions but no impressions. (This is common when a campaign ends.) In hindsight we should have summed the count of impressions, clicks and conversions and used that total in the graph but this provided the opportunity to show a feature of the summarizer — we can easily find days for which clicks and conversions have no impressions without running a nasty join.

For the bit hacker in you

My colleagues and I are wrapping up version 2 of our unique counting algorithms. The last pieces are the database tools that our analysts use — some custom functions in Postgres9 to manipulate our custom types. One step in our algorithm is to use the least-significant bit of a bit array. In Java we use a lookup technique (a 256-entry lookup table that you use to look up each byte) and in C we can use the bsf assembly instruction. (Both techniques can been researched further on Sean Eron Anderson’s awesome Bit Twiddling Hacks.)

Since I’m a hacker to the core I wanted to do a little more research to see what other folks were doing. Here are some additional references:

In the last reference, there is a response that mentions the fascinating technique of using the floating-point representation to extract the exponent:

unsigned GetLowestBitPos(unsigned value)
{
   double d = value ^ (value - !!value); 
   return (((int*)&d)[1]>>20)-1023; 
}

This response refers you to another post that explains a bit more the technique. In that response, Tony Lee mentions one of my favorite constants 0x5f3759df. I spent many years doing graphics and gaming programming (some of my first “real” programs in college where doing visual simulations on SGIs) and I remember the first time that I came across 0x5f3759df in the Quake source code. I’m always amazed at the bit techniques that people will come up with to solve a problem.

I’ll end this post with a link to one of my favorite articles on the internet: a history of the origins of 0x5f3759df in the Quake source. It’s a wonderful walk down memory lane.

Working with large sets

SetsWe do a lot of work with unique user counting and we have developed some techniques for accurate counting in small bounded-size structures.  Periodically I like to make sure that all of our assumptions still hold as the world changes around us.  I was recently running a number of experiments on large sets to get our science folks some data to analyze.  It involved getting the cardinality, union and intersection of large sets of user ids (which are 64bit values) the brute-force way.

Since I spend a good deal of my time writing Java, I figured I would just quickly whip something up.  For set sizes of 1M or less, the “standard techniques” worked well enough — java.util.HashSet will do the trick.  In general, for larger collections of primitives it’s a good idea to use one of the 3rd party libraries that is specifically tailored to primitives such as Trove or Colt to cut down on the time and memory bloat of autoboxing primitives into objects.  (There are a number of postings around what are the “best” collections for a given circumstance such as this one on StackOverflow.)  You can get to around 10M entries in a traditional set before running union and intersection take prohibitively long due to the fact that it works element-by-element.

Working with sets with over 10M entries requires different techniques.  The most common approach is to use bit arrays.  Bit arrays are not only compact in size (in that each element only takes one bit of RAM), but they are very fast for doing set operations. The set operations are typically performed by taking chunks of the bit array and performing regular instructions on them. For example, the bit array can be chunked up into longs (64bit ‘words’) and then bitwise or or bitwise and operations are performed pairwise on the two ‘sets’. Java provides java.util.BitSet which does all of the heavy lifting for you.  (There are 3rd party bit arrays available too.)

When using a hash-based set, one simply gives it the element that is to be stored (in our case, that’s a 64bit user id). With a bit array the interface is basically setBit(index, value) and getBit(index).  The problem comes down to:  what is the index. A naive approach would simply use the bit array in the same way as the hash set — pass it the element. Unfortunately, this would require a bit array that is 264-1 bits long. If you were a little dangerous with your knowledge, you could exploit some of the RLE (run-length encoding) techniques for compressing your bit array such as Word Aligned Hybrid (WAH — there’s even a Java implementation as well as others found here). Another, call it ‘sane’, approach is to use a map together with the bit array to provide the index. The map is used to map from the element to a sequential index and that index is used in the bit array.

To insert (pseudo code):

index = elementToIndexMap.get(elementId)
if(index does not exist)
    index = sequence++
    elementToIndexMap.put(elementId, index)
endif
bitarray.setBit(index, true)

To retrieve (pseudo code):

index = elementToIndexMap.get(elementId)
if(index does not exist)
    return false/*assume not present means 'not set'*/
endif
return bitarray.getBit(index)

WIth this approach, you can easily get it 100M or even billions of elements in a set given that you have enough RAM.

Going beyond 1B requires other techniques and usually involve disk-based techniques. Unix join and sort for example can get you a long way.

Even though the examples that I gave were Java-based, the techniques presented here are universal. Underlying all of this is the necessity for a little profiling, a bit of understanding of the algorithms involved, and the realization that sometimes the default facilities provided are insufficient for all cases.

Very large heaps in Java

Java HeapI’ve been wanting to write up a juicy post on how we deal with very large heaps in Java to reduce GC pauses.  Unfortunately I keep getting side tracked getting the data together.  The latest bump in the road is due to a JVM bug of sorts.

Backstory:  Todd Lipcon’s twitter post pointed me to the JVM option -XX:PrintFLSStatistics=1 to be able to get out some good information about heap fragmentation. He was even kind enough to provide the Python and R scripts! I figured that it would be a few minutes of fiddling and I’d have some good data for a post. No such luck. Our JVM GC/heap options are -XX:+UseConcMarkSweepGC -Xms65g -Xmx65g. When -XX:PrintFLSStatistics=1 is used with this, the following output is seen:

Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: -1824684952
Max   Chunk Size: -1824684952
Number of Blocks: 1
Av.  Block  Size: -1824684952
Tree      Height: 1

A few seconds of digging into the Hotspot source reveals:

void BinaryTreeDictionary::reportStatistics() const {
  verify_par_locked();
  gclog_or_tty->print("Statistics for BinaryTreeDictionary:\n"
         "------------------------------------\n");
  size_t totalSize = totalChunkSize(debug_only(NULL));
  size_t    freeBlocks = numFreeBlocks();
  gclog_or_tty->print("Total Free Space: %d\n", totalSize);
  gclog_or_tty->print("Max   Chunk Size: %d\n", maxChunkSize());
  gclog_or_tty->print("Number of Blocks: %d\n", freeBlocks);
  if (freeBlocks > 0) {
    gclog_or_tty->print("Av.  Block  Size: %d\n", totalSize/freeBlocks);
  }
  gclog_or_tty->print("Tree      Height: %d\n", treeHeight());
}

in hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/binaryTreeDictionary.cpp. (“%d” just doesn’t cut it with a “long”‘s worth of data.)  I filed a hotspot bug so hopefully it will be fixed in some release in the not-too-distant-future.

I can work around this but it has slowed down my getting to the juicy blog post. Stay tuned!

2eight7

2-eight-7When I was writing my post yesterday I was reflecting on how much time we were spending making our code int friendly — specifically, dealing with the problems when you’re working with values around Integer.MAX_VALUE or 2147483647 (231-1). I likened it to i18n (internationalization) or l10n (localization). Much in that same vein, I’d like to coin the term “2eight7” to represent the problems one runs into when working with a signed integer and any code that depends (implicitly or explicitly) on it.

Let the debates begin!

Billions of anything

In most programming languages an int is 32 bits wide providing for 4294967295 (232-1) values or 2147483647 (231-1) if signed. In the case of Java, which we use for a number of components in our infrastructure, many of the fundamental components use int‘s: array indexes, NIO, IO, most collections (as they are commonly based on arrays), etc. When you’re working with billions of anything, its easy to run into these bounds which result in subtle bugs that are hard to track down due to exceptions that aren’t what they seem. The most common cases that we run into are due to the roll-over that occurs when you add any positive value to 2147483647 — the value becomes negative (since Java’s int‘s are signed). Sometimes this will result in an ArrayIndexOutOfBounds exception or sometimes it will result in a seemingly impossible callpath from deep inside of some java.* class.

I remember working on my first few i18N (internationalization) and l10n (localization) projects where I learned the do’s and don’ts of how to write code that worked seamlessly (or at least was easy to work with) in multiple locales. Working with “big data” feels exactly like that — you have to slowly build up a set of techniques:  instead of a single array, you need to keep around arrays of arrays (since each dimension is limited to 2147483647 elements); you have to know how to shard your collections so that they do not exceed the maximum allowed capacity (e.g. HashMap is limited to 1073741824 (230) buckets); if(value > Integer.MAX_VALUE) doesn’t do what you think it does (and most of the time it’s hard to tell that that’s the code that you wrote). The list goes on.

One interesting development was “announced” at EclipseCon: there’s talk about “big data” support in Java 9 (ref Reinhold Already Talking About Java 9 for example). This is something that I will keep my eye on. Unfortunately, it wont help us for the next few years.

Welcome!

Welcome to the AK Tech Blog!

This blog comes to you from down in the trenches of building a massively scalable data management platform.  Through this blog we will be detailing some of the behind-the-scenes science, engineering and hacks that we deal with every day including:

  • Probabilistic data analytics
  • Streaming algorithms
  • Attribution
  • Real-time data management
  • Tips, tricks and techniques for working with billions of events

We welcome your comments and will try to respond as appropriate. Feel free to forward our blog postings to all your peers and keep the communication flow open.

If you have questions that you don’t want posted, you can always send them to blog@aggregateknowledge.com.

Again, welcome, and enjoy.