We Love Rsyslog

Every day we’re growing to appreciate rsyslog more as we scale up. Yesterday I captured this shot of iptraf running on one of our event routing hosts. 158mbit/s in and 583mbit/s out for a combined total of 740mbit/s of traffic being handled by a single rsyslog server. The server is both filtering and routing by various message properties. At this rate of traffic the hardware isn’t even breaking a sweat. We realize we’re a bit off the beaten path with our use of rsyslog for message routing at AK. This screen capture is part of the story as to why:

rsyslog routing some serious traffic

Batch Acknowledged Pipelines with ZeroMQ

ZeroMQParallel processing with a task ventilator is a common pattern with ZeroMQ.  The basics of this pattern are outlined in the “Divide and Conquer” section of the ZeroMQ guide.  The pattern consists of the following components:

  • A task ventilator that produces tasks.
  • A number of workers that do the processing work.
  • A sink that collects results from the worker processes.

This pattern works wonderfully as long as your consumers can outpace your producers. If you start producing tasks faster than you can process them, then messages will start backing up in ZeroMQ socket queues.  This will drive the memory utilization of the processes up, make a clean shutdown of the distributed processing system difficult, and result in a sizeable number of messages lost in the event of a crash.  This can be avoided by using ZMQ_REP and ZMQ_REQ sockets, but in that case you lose the speed advantage of a pipeline pattern.

To maintain the speed of a pipeline pattern while allowing for some control over the number of messages in flight at any given time, we can add batch acknowledgements to the basic ventilator / worker / sink pattern.  Accomplishing this only requires a few minor changes:

  • Add a pull socket to the ventilator for receiving acknowledgements
  • Add a push socket to the manager for sending acknowledgements
  • Add a batch size variable

So without further ado, let’s dive into some code.  I’m going to keep things simple for this example.  I’ll define a function for the ventilator, a function for the worker, and a function for the sink.  These will be started using multiprocessing.Process. For any Java programmers in the audience: Python has first class functions.  There is no requirement to wrap a function in a class.

The ventilator uses a ZMQ_PUSH socket to send tasks to listening workers, and a ZMQ_PULL socket to receive acknowledgements from the sink process.  The ventilator will send N messages (where N is the batch size) and then wait for an acknowledgement:

import zmq
from time import time
from multiprocessing import Process

def ventilator(batch_size, test_size):
    """task ventilator function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a push socket for sending tasks to workers"""
    send_sock = context.socket(zmq.PUSH)
    send_sock.bind("tcp://*:5555")

    """create a pull socket for receiving acks from the sink"""
    recv_sock = context.socket(zmq.PULL)
    recv_sock.bind("tcp://*:5557")

    """initiate counter for tasks sent"""
    current_batch_count = 0

    """start the message loop"""
    for x in range(test_size):

        """send until we reach our batch limit"""
        while current_batch_count < batch_size:
            send_sock.send("task")
            current_batch_count += 1

        """reset the batch count"""
        current_batch_count = 0

        """wait for an acknowledgement and block while waiting -
           note this could be more sophisticated and provide
           support for other message types from the sink,
           but keeping it simple for this example"""
        msg = recv_sock.recv()

The workers use a ZMQ_PULL socket to receive tasks from the ventilator, and a ZMQ_PUSH socket to send results to the sink process:

def worker():
    """task worker function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a pull socket for receiving tasks from the ventilator"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.connect("tcp://*:5555")

    """create a push socket for sending results to the sink"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5556")

    """receive tasks and send results"""
    while True:
        task = recv_socket.recv()
        send_socket.send("result")

The sink process uses a ZMQ_PULL socket to receive results from the workers, and a ZMQ_PUSH socket to send batch acknowledgements to the ventilator process:

def sink(batch_size, test_size):
    """task sink function"""

    """set up a zmq context"""
    context = zmq.Context()

    """create a pull socket for receiving results from the workers"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.bind("tcp://*:5556")

    """create a push socket for sending acknowledgements to the ventilator"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5557")

    result_count = 0
    batch_start_time = time()
    test_start_time = batch_start_time

    for x in range(test_size):
        """receive a result and increment the count"""
        msg = recv_socket.recv()
        result_count += 1

        """acknowledge that we've completed a batch"""
        if result_count == batch_size:
            send_socket.send("ACK")
            result_count = 0
            batch_start_time = time()

    duration = time() - test_start_time
    tps = test_size / duration
    print "messages per second: %s" % (tps)

The main routine for the test allows for a configurable batch size, test size, and number of workers:

if __name__ == '__main__':
    num_workers = 4
    batch_size = 100
    test_size = 1000000

    workers = {}
    ventilator = Process(target=ventilator, args=(batch_size, test_size,))
    sink = Process(target=sink, args=(batch_size, test_size,))

    sink.start()

    for x in range(num_workers):
        workers[x] = Process(target=worker, args=())
        workers[x].start()

    ventilator.start()

For the test, I sent 1,000,000 small messages, in batch sizes of 1,000,000, 1,000, 100, 50, and 10:

batched acknowledgement results

There are two main conclusions I draw from these results.  The first is that the pipeline pattern obtains an extremely high throughput due to the fact that it does not have to ACK messages.  The second is that if your workers are performing work that takes any amount of time, you can limit the batch size to a rather small amount of messages without impacting your overall throughput.  Even with a batch size of 10, a throughput of around 81k messages a second was obtained.  As long as the combined messages per second your workers can process is less than the raw throughput of your pipeline for a given batch size, batched acknowledgements will give you some degree of flow control without compromising your throughput.

The “batched pipeline pattern” allows tradeoffs to be made between acceptable message loss and throughput, and offers a great deal of control that a simple pipeline pattern with no batching does not offer. In my experience it is an extremely useful starting point a distributed system for processing streaming data.

Greetings!

My co-workers have successfully dragged me kicking and screaming away from my work to write some articles, so first let me introduce myself: I’m Brian, the Service Delivery Data Architect for Aggregate Knowledge.  Everything I do at AK is in service of one goal: allowing customers to gain deeper insights from more data in less time.  My work encompasses but is not limited to designing message formats and data models to researching new data storage and transport technologies.  I will be writing here from time to time about my experiences in helping transition the AK platform to a first class real-time data processing platform.

Follow

Get every new post delivered to your Inbox.

Join 232 other followers