Building a Big Analytics Infrastructure

There is much buzz about “big data” and “big analytics” but precious little information exists about the struggle of building an infrastructure to tackle these problems. Some notable exceptions are Facebook, Twitter’s Rainbird and MetaMarket’s Druid. In this post we provide an overview of how we built Aggregate Knowledge’s “big analytics” infrastructure. It will cover how we mix rsyslog, 0MQ and our in-house streaming key-value store to route hundreds of thousands of events per second and efficiently answer reporting queries over billions of events per day.

Overview and Goal

Recording and reporting on advertising events (impressions, interactions, conversions) is the core of what we do at Aggregate Knowledge. We capture information about:

  • Who: audience, user attributes, browser
  • What: impression, interaction, conversion
  • Where: placement, ad size, context
  • When: global time, user’s time, day part

just to name a few. We call these or any combination of these our keys. The types of metrics (or values) that we support but aren’t limited to:

  • Counts: number of impressions, number of conversions, number of unique users (unique cookies)
  • Revenue: total inventory cost, data cost
  • Derived: click-through rate (CTR), cost per action (CPA)

Our reports support drill-downs and roll-ups all of the available dimensions — we support many of the standard OLAP functions.

We are architecting for a sustained event ingest rate of 500k events per second over a 14 hour “internet day” yielding around 30 billion events per day (or around 1 trillion events a month). Our daily reports run over billions of events should take seconds to run and our monthly or lifetime reports run over hundreds of billion events should take at most minutes.

Over the past few years we have taken a few different paths to produce our reports with varying degrees of success.

First Attempt: Warehouses, Map-Reduce and Batching

When I first started at Aggregate Knowledge we had a multi-terabyte distributed warehouse that used Map-Reduce to process queries. The events were gathered from the edge where they were recorded and batch loaded into the warehouse on regular intervals. It stored hundreds of millions of facts (events) and took hours to generate reports. Some reports on unique users would take longer than a day to run. We had a team dedicated to maintaining and tuning the warehouse.

At the time our event recorders were placed on many high-volume news sites and it was quite common for us to see large spikes in the number of recorded events when a hot news story hit the wires. It was common for a 5 minute batch of events from a spike to take longer than 5 minutes to transfer, process and load which caused many headaches. Since the time it took to run a report was dependent on the number of events being processed, whenever a query would hit one of these spikes, reporting performance would suffer. Because we provided 30-, 60- and 90-day reports, a spike would cause us grief for a long time.

After suffering this pain for a while, this traditional approach of storing and aggregating facts seemed inappropriate for our use. Because our data is immutable once written, it seemed clear that we needed to pre-compute and store aggregated summaries. Why walk over hundreds of millions of facts summing along some dimension more than once if the answer is always a constant — simply store that constant. The summaries are bounded in size by the cardinality of the set of dimensions rather than the number of events. Our worries would move from something we could not control — the number of incoming events — to something that we could control — the dimensionality and number of our keys.

Second Attempt: Streaming Databases and Better Batching

Having previously worked on a financial trading platform, I had learned much about streaming databases and Complex Event Processing (e.g. Coral8, StreamBase, Truviso). Our second approach would compute our daily summaries in much the same way that a financial exchange keeps track and tally of trades. The event ingest of the streaming database would be the only part of our infrastructure affected by spikes in the number of events since everything downstream worked against the summaries. Our reporting times went from hours to seconds or sub-seconds. If we were a retail shop that had well-known dimensionality then we would likely still be using a streaming database today. It allowed us to focus on immediate insights and actionable reports rather than the warehouse falling over or an M-R query taking 12 hours.

Once worrying about individual events was a thing of the past, we started to look at the dimensionality of our data. We knew from our old warehouse data that the hypercube of dimensional data was very sparse but we didn’t know much else. The initial analysis of the distribution of keys yielded interesting results:

Zipf: Key frequency

Keys are seen with frequencies that tend to follow Zipf’s Law:

the frequency of any key is inversely proportional to its rank in the frequency table

Put simply: there are a large number of things that we see very infrequently and a small number of things that we see very often. Decile (where the black line is 50%) and CDF plots of the key frequency provide additional insights:

Key Count Distribution DecileKey Frequency CDF

60% of our keys have been seen hundreds of times or less and around 15% of our keys had been seen only once. (The graph only covers one set of our dimensions. As we add more dimensions to the graph the CDF curve gets steeper.) This told us that not only is the hypercube very sparse but the values tend to be quite small and are updated infrequently. If these facts could be exploited then the storage of the hypercube could be highly compressed even for many dimensions with high cardinality and stored very efficiently.

We improved our transfer, transform and loading batch processes to better cope with event volume spikes which resulted in less headaches but it still felt wrong. The phrase “batching into a streaming database” reveals the oxymoron. We didn’t progress much in computing unique user counts. Some of the streaming databases provided custom support for unique user counting but not at the volume and rate that we required. Another solution was needed.

Third Attempt: Custom Key-Value Store and Streaming Events

From our work with streaming databases we knew a few things:

  • Out-of-order data was annoying (this is something that I will cover in future blog posts);
  • Counting unique sets (unique users, unique keys) was hard;
  • There was much efficiency to be gained in our distribution of keys and key-counts;
  • Structured (or semi-structured) data suited us well;
  • Batching data to a streaming database is silly;

Unfortunately none of the existing NoSQL solutions covered all of our cases. We built a Redis prototype and found that the majority of our code was in ingesting our events from our event routers, doing key management and exporting the summaries to our reporting tier. Building the storage in-house provided us the opportunity to create custom types for aggregation and sketches for the cardinality of large sets (e.g. unique user counting). Once we had these custom types it was a small leap to go from the Redis prototype to a full-featured in-house key-value store. We call this beast “The Summarizer”. (“The Aggregator” was already taken by our Ops team for event aggregation and routing.)

The summarizer simply maps events into summaries by key and updates the aggregates. Streaming algorithms are a natural fit in this type of data store. Many O(n^m) algorithms have streaming O(n) counterparts that provide sufficiently accurate results. (We’ll be covering streaming algorithms in future posts.) It provides us with a succinct (unsampled) summary that can be accessed in O(1). Currently we can aggregate more than 200,000 events per second per core (saturating just shy of 1 million events per second) where some events are aggregated into more than ten summaries.

Our summaries are computed per day. (Future blog posts will provide more information about how we treat time.) They are designed such that they rarely contain more than 10M rows and are stored in CSV format. Initially we used CSV simply because we already had all of the code written for ingesting 3rd party CSV data. We quickly found other uses for them: our analysts gobbled them up and use them in Excel, our data scientists use them directly in R, and even our engineers use them for back-of-the-envelope calculations. Having manageable summaries and/or sketches enabled agile analytics.

To get the events into our summarizer we completely rethought how events move through our infrastructure. Instead of batching events, we wanted to stream them. To deal with spikes and to simplify maintenance, we wanted to allow the events to be queued if downstream components became unavailable or unable to meet the current demand. We needed to be able to handle our desired ingest rate of 500k events per second. The answer was right under our noses: rsyslog and 0MQ. (See the “Real-Time Streaming for Data Analytics” and “Real-Time Streaming with Rsyslog and ZeroMQ” posts for more information.)

Wrap Up

Our challenge was to be able to produce reports on demand over billions of events in seconds and over hundreds of billions in minutes while ingesting at most 500,000 events per second. Choosing the defacto technology de jour caused us to focus on fixing and maintaining the technology rather than solving business problems and providing value to our customers. We could have stayed with our first approach, immediately scaling to hundreds of nodes and taking on the challenges that solution presents. Instead, we looked at the types of answers we wanted and worked backwards until we could provide them easily on minimal hardware and little maintenance cost. Opting for the small, agile solution allowed us to solve business problems and provide value to our customers much more quickly.

Footnote

Astute readers may have noticed the point on the far-left of the CDF graph and wondered how it was possible for a key to have been seen zero times or wondered why we would store keys that have no counts associated with them. We only summarize what is recorded in an event. The graph shows the frequency of keys as defined by impressions and it doesn’t include the contribution of any clicks or conversions. In other words, these “zero count keys” mean that for a given day there are clicks and/or conversions but no impressions. (This is common when a campaign ends.) In hindsight we should have summed the count of impressions, clicks and conversions and used that total in the graph but this provided the opportunity to show a feature of the summarizer — we can easily find days for which clicks and conversions have no impressions without running a nasty join.

Real-Time Streaming with Rsyslog and ZeroMQ

Rsyslog + 0MQHow do we go about streaming data in real time? At AK, we use Rsyslog in conjunction with ZeroMQ and a little AK secret sauce. Why Rsyslog? We looked for technology that existed in the world today that solved >90% of the problem. Since the beginning of modern UNIX operating systems, system logging has existed in the computer world and has evolved into real-time log routers and aggregators.

  • Rsyslogd allows for multiple inputs and outputs.
  • Rsyslogd allows for multiple routes based on stream type, origination (location and/or application), destinations.

As such, AK has written a ZeroMQ Rsyslog module

  • ZeroMQ input/output interface (connect/bind, push/pull)
  • pub/sub type coming soon

Simply put, we at AK have moved to a real-time data streaming process by integrating the Rsyslog service with the ZeroMQ library. This has allowed us to move from a brittle system of large scheduled data migrations and deferred processing to a lighter weight model of real time streaming data and processing. There are many benefits to this, including high scalability, durability and verification, real-time streaming among multiple data centers, and efficiency. We believe that others who have the same issues of counting and providing insights to massive data sets will follow in moving to a real time data analytics platform.

The pub-sub ZeroMQ integration. This is beyond cool since it basically allows us to expose a tap into the event stream. You want to simply connect to the event stream and try out some new algorithm? It’s trivial. Put ZeroMQ on the front and start listening. You want to grab a few minutes worth of events as they come in? Just connect and take what you need. No more going off to the log server, finding the logs, parsing them, breaking them up, etc, etc, etc. Just tap and go.

Batch Acknowledged Pipelines with ZeroMQ

ZeroMQParallel processing with a task ventilator is a common pattern with ZeroMQ.  The basics of this pattern are outlined in the “Divide and Conquer” section of the ZeroMQ guide.  The pattern consists of the following components:

  • A task ventilator that produces tasks.
  • A number of workers that do the processing work.
  • A sink that collects results from the worker processes.

This pattern works wonderfully as long as your consumers can outpace your producers. If you start producing tasks faster than you can process them, then messages will start backing up in ZeroMQ socket queues.  This will drive the memory utilization of the processes up, make a clean shutdown of the distributed processing system difficult, and result in a sizeable number of messages lost in the event of a crash.  This can be avoided by using ZMQ_REP and ZMQ_REQ sockets, but in that case you lose the speed advantage of a pipeline pattern.

To maintain the speed of a pipeline pattern while allowing for some control over the number of messages in flight at any given time, we can add batch acknowledgements to the basic ventilator / worker / sink pattern.  Accomplishing this only requires a few minor changes:

  • Add a pull socket to the ventilator for receiving acknowledgements
  • Add a push socket to the manager for sending acknowledgements
  • Add a batch size variable

So without further ado, let’s dive into some code.  I’m going to keep things simple for this example.  I’ll define a function for the ventilator, a function for the worker, and a function for the sink.  These will be started using multiprocessing.Process. For any Java programmers in the audience: Python has first class functions.  There is no requirement to wrap a function in a class.

The ventilator uses a ZMQ_PUSH socket to send tasks to listening workers, and a ZMQ_PULL socket to receive acknowledgements from the sink process.  The ventilator will send N messages (where N is the batch size) and then wait for an acknowledgement:

import zmq
from time import time
from multiprocessing import Process

def ventilator(batch_size, test_size):
    """task ventilator function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a push socket for sending tasks to workers"""
    send_sock = context.socket(zmq.PUSH)
    send_sock.bind("tcp://*:5555")

    """create a pull socket for receiving acks from the sink"""
    recv_sock = context.socket(zmq.PULL)
    recv_sock.bind("tcp://*:5557")

    """initiate counter for tasks sent"""
    current_batch_count = 0

    """start the message loop"""
    for x in range(test_size):

        """send until we reach our batch limit"""
        while current_batch_count < batch_size:
            send_sock.send("task")
            current_batch_count += 1

        """reset the batch count"""
        current_batch_count = 0

        """wait for an acknowledgement and block while waiting -
           note this could be more sophisticated and provide
           support for other message types from the sink,
           but keeping it simple for this example"""
        msg = recv_sock.recv()

The workers use a ZMQ_PULL socket to receive tasks from the ventilator, and a ZMQ_PUSH socket to send results to the sink process:

def worker():
    """task worker function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a pull socket for receiving tasks from the ventilator"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.connect("tcp://*:5555")

    """create a push socket for sending results to the sink"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5556")

    """receive tasks and send results"""
    while True:
        task = recv_socket.recv()
        send_socket.send("result")

The sink process uses a ZMQ_PULL socket to receive results from the workers, and a ZMQ_PUSH socket to send batch acknowledgements to the ventilator process:

def sink(batch_size, test_size):
    """task sink function"""

    """set up a zmq context"""
    context = zmq.Context()

    """create a pull socket for receiving results from the workers"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.bind("tcp://*:5556")

    """create a push socket for sending acknowledgements to the ventilator"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5557")

    result_count = 0
    batch_start_time = time()
    test_start_time = batch_start_time

    for x in range(test_size):
        """receive a result and increment the count"""
        msg = recv_socket.recv()
        result_count += 1

        """acknowledge that we've completed a batch"""
        if result_count == batch_size:
            send_socket.send("ACK")
            result_count = 0
            batch_start_time = time()

    duration = time() - test_start_time
    tps = test_size / duration
    print "messages per second: %s" % (tps)

The main routine for the test allows for a configurable batch size, test size, and number of workers:

if __name__ == '__main__':
    num_workers = 4
    batch_size = 100
    test_size = 1000000

    workers = {}
    ventilator = Process(target=ventilator, args=(batch_size, test_size,))
    sink = Process(target=sink, args=(batch_size, test_size,))

    sink.start()

    for x in range(num_workers):
        workers[x] = Process(target=worker, args=())
        workers[x].start()

    ventilator.start()

For the test, I sent 1,000,000 small messages, in batch sizes of 1,000,000, 1,000, 100, 50, and 10:

batched acknowledgement results

There are two main conclusions I draw from these results.  The first is that the pipeline pattern obtains an extremely high throughput due to the fact that it does not have to ACK messages.  The second is that if your workers are performing work that takes any amount of time, you can limit the batch size to a rather small amount of messages without impacting your overall throughput.  Even with a batch size of 10, a throughput of around 81k messages a second was obtained.  As long as the combined messages per second your workers can process is less than the raw throughput of your pipeline for a given batch size, batched acknowledgements will give you some degree of flow control without compromising your throughput.

The “batched pipeline pattern” allows tradeoffs to be made between acceptable message loss and throughput, and offers a great deal of control that a simple pipeline pattern with no batching does not offer. In my experience it is an extremely useful starting point a distributed system for processing streaming data.

Something Cool From AK on GitHub

Rsyslog + 0MQToday we at AK have released a rsyslog module for 0MQ on GitHub! We are very excited about how we have moved to a real-time processing model and wanted to share. Keep checking back to this blog for more info!

0MQ input and output modules for rsyslog

Follow

Get every new post delivered to your Inbox.

Join 230 other followers