Batch Acknowledged Pipelines with ZeroMQ

ZeroMQParallel processing with a task ventilator is a common pattern with ZeroMQ.  The basics of this pattern are outlined in the “Divide and Conquer” section of the ZeroMQ guide.  The pattern consists of the following components:

  • A task ventilator that produces tasks.
  • A number of workers that do the processing work.
  • A sink that collects results from the worker processes.

This pattern works wonderfully as long as your consumers can outpace your producers. If you start producing tasks faster than you can process them, then messages will start backing up in ZeroMQ socket queues.  This will drive the memory utilization of the processes up, make a clean shutdown of the distributed processing system difficult, and result in a sizeable number of messages lost in the event of a crash.  This can be avoided by using ZMQ_REP and ZMQ_REQ sockets, but in that case you lose the speed advantage of a pipeline pattern.

To maintain the speed of a pipeline pattern while allowing for some control over the number of messages in flight at any given time, we can add batch acknowledgements to the basic ventilator / worker / sink pattern.  Accomplishing this only requires a few minor changes:

  • Add a pull socket to the ventilator for receiving acknowledgements
  • Add a push socket to the manager for sending acknowledgements
  • Add a batch size variable

So without further ado, let’s dive into some code.  I’m going to keep things simple for this example.  I’ll define a function for the ventilator, a function for the worker, and a function for the sink.  These will be started using multiprocessing.Process. For any Java programmers in the audience: Python has first class functions.  There is no requirement to wrap a function in a class.

The ventilator uses a ZMQ_PUSH socket to send tasks to listening workers, and a ZMQ_PULL socket to receive acknowledgements from the sink process.  The ventilator will send N messages (where N is the batch size) and then wait for an acknowledgement:

import zmq
from time import time
from multiprocessing import Process

def ventilator(batch_size, test_size):
    """task ventilator function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a push socket for sending tasks to workers"""
    send_sock = context.socket(zmq.PUSH)
    send_sock.bind("tcp://*:5555")

    """create a pull socket for receiving acks from the sink"""
    recv_sock = context.socket(zmq.PULL)
    recv_sock.bind("tcp://*:5557")

    """initiate counter for tasks sent"""
    current_batch_count = 0

    """start the message loop"""
    for x in range(test_size):

        """send until we reach our batch limit"""
        while current_batch_count < batch_size:
            send_sock.send("task")
            current_batch_count += 1

        """reset the batch count"""
        current_batch_count = 0

        """wait for an acknowledgement and block while waiting -
           note this could be more sophisticated and provide
           support for other message types from the sink,
           but keeping it simple for this example"""
        msg = recv_sock.recv()

The workers use a ZMQ_PULL socket to receive tasks from the ventilator, and a ZMQ_PUSH socket to send results to the sink process:

def worker():
    """task worker function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a pull socket for receiving tasks from the ventilator"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.connect("tcp://*:5555")

    """create a push socket for sending results to the sink"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5556")

    """receive tasks and send results"""
    while True:
        task = recv_socket.recv()
        send_socket.send("result")

The sink process uses a ZMQ_PULL socket to receive results from the workers, and a ZMQ_PUSH socket to send batch acknowledgements to the ventilator process:

def sink(batch_size, test_size):
    """task sink function"""

    """set up a zmq context"""
    context = zmq.Context()

    """create a pull socket for receiving results from the workers"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.bind("tcp://*:5556")

    """create a push socket for sending acknowledgements to the ventilator"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5557")

    result_count = 0
    batch_start_time = time()
    test_start_time = batch_start_time

    for x in range(test_size):
        """receive a result and increment the count"""
        msg = recv_socket.recv()
        result_count += 1

        """acknowledge that we've completed a batch"""
        if result_count == batch_size:
            send_socket.send("ACK")
            result_count = 0
            batch_start_time = time()

    duration = time() - test_start_time
    tps = test_size / duration
    print "messages per second: %s" % (tps)

The main routine for the test allows for a configurable batch size, test size, and number of workers:

if __name__ == '__main__':
    num_workers = 4
    batch_size = 100
    test_size = 1000000

    workers = {}
    ventilator = Process(target=ventilator, args=(batch_size, test_size,))
    sink = Process(target=sink, args=(batch_size, test_size,))

    sink.start()

    for x in range(num_workers):
        workers[x] = Process(target=worker, args=())
        workers[x].start()

    ventilator.start()

For the test, I sent 1,000,000 small messages, in batch sizes of 1,000,000, 1,000, 100, 50, and 10:

batched acknowledgement results

There are two main conclusions I draw from these results.  The first is that the pipeline pattern obtains an extremely high throughput due to the fact that it does not have to ACK messages.  The second is that if your workers are performing work that takes any amount of time, you can limit the batch size to a rather small amount of messages without impacting your overall throughput.  Even with a batch size of 10, a throughput of around 81k messages a second was obtained.  As long as the combined messages per second your workers can process is less than the raw throughput of your pipeline for a given batch size, batched acknowledgements will give you some degree of flow control without compromising your throughput.

The “batched pipeline pattern” allows tradeoffs to be made between acceptable message loss and throughput, and offers a great deal of control that a simple pipeline pattern with no batching does not offer. In my experience it is an extremely useful starting point a distributed system for processing streaming data.

Comments

  1. How do you kill your workers after the job is done? making sure that no msg is left unprocessed?

    • Flavio – avoiding message loss in cases where you do not know ahead of time how many messages to expect can be tricky with push / pull in zeromq. I’ve done it by implementing a PUB / SUB subscription bus that I use to send commands to the ventilator and workers. Basically you need to be able to notify your ventilator to stop sending new jobs, and notify your workers to let the ventilator know they are “finished” (finished being defined as “not receiving a job in X time period), then have all processes shut down. Note that if you shut down an individual worker while a ventilator is still sending you will lose messages due to the way zeromq buffers.

      For a higher degree of reliability in a system where individual workers need to be stopped and started, request / reply is recommended over push / pull. See chapter four of the zeromq guide for some good starting points: http://zguide.zeromq.org/page:all#Chapter-Four-Reliable-Request-Reply

  2. Hi Brian, Thank you for this post. I gave your example code a whirl in Scala, but am getting no where near the performance you show. Can you elaborate a little on your environment?

    Also, I was wondering if you had given any thought to what the performance would be like if the messages were placed in a buffer and then send as one big block using REQ/REP ?

Trackbacks

  1. […] that I finished a piece about batch acknowledgements with the pipeline pattern with zeromq on the Aggregate Knowledge blog. By brianknox, on June 23, 2011 at 1:03 pm, under 0mq, Python, zeromq. No Comments Post a […]

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: