The circular queue is a data structure that I have used a lot to solve communication problems. It is as simple as useful.

From wikipedia:

“A circular buffer, circular queue, cyclic buffer or ring buffer is a data structure that uses a single, fixed-size buffer as if it were connected end-to-end. This structure lends itself easily to buffering data streams.

The useful property of a circular buffer is that it does not need to have its elements shuffled around when one is consumed. (If a non-circular buffer were used then it would be necessary to shift all elements when one is consumed.) In other words, the circular buffer is well-suited as a FIFO buffer while a standard, non-circular buffer is well suited as a LIFO buffer.

Circular buffering makes a good implementation strategy for a queue that has fixed maximum size. Should a maximum size be adopted for a queue, then a circular buffer is a completely ideal implementation; all queue operations are constant time. However, expanding a circular buffer requires shifting memory, which is comparatively costly. For arbitrarily expanding queues, a linked list approach may be preferred instead.”

Some years ago I wrote another post about the circular queues, in this post “A simple implementation of a circular queue in C language” you can find a little bit of theory and definitions, and also some the C implementation.

Circular queues in Python

since I have to synchronize the access between threads, the first thing I do is the implementation of the synchronization logic. I explained this decorator in the post Python decorators in the real world. All the methods that can be accessed from outside the class will be decorated.

import threading
LOCK = threading.Lock()
def synchro(lock_id):
    def wrap(method):
        def wrapped_function(*args, **kw):
            with lock_id:
                return method(*args, **kw)
        return wrapped_function
    return wrap

Let’s start with the circular queue implementation. It will be a class with the following properties:
queue: a list of items (initialized to none)

head: the index of element to be popped from the queue
tail: the index of the element where the next push will be done
q_size: the maximum number of elements in the queue

class CQueue():
    def __init__(self, q_size=32):
        self.queue = [None] * q_size
        self.head = 0
        self.tail = 0
        self.q_size = q_size

The first method I add is a method that computes the number of valid items in the queue.
If the index of the tail element is greater or equal to the index of the head of the queue, the numebr of valid items is the difference between. Otherwise I have to apply a wrap around and the number of valid items will be comupted as the max number of items in the queue minus the index of the head element plus the index of the tail element.

    def size(self):
        if self.tail >= self.head:
            return self.tail - self.head
        else:
            return self.q_size - self.head + self.tail

Let’s enqueue an element, an element can be enqueued if the number of valid elements is less than the max number of elements.

    @synchro(LOCK)
    def enqueue(self, data):
        res = False
        if self.__size() == self.q_size - 1:
            print("Queue Full!")
        else:
            self.queue[self.tail] = data
            self.tail = (self.tail + 1) % self.q_size
            res = True
        return res

And the dequeue method….

    @synchro(LOCK)
    def dequeue(self):
        data = False
        if self.size() == 0:
            print("Queue Empty!")
        else:
            data = self.queue[self.head]
            self.head = (self.head + 1) % self.q_size
        return data

Last, to observe the content of the queue a simple print function

    @synchro(LOCK)
    def print_queue(self):
        to_be_printed = list()
        for i in range(self.head, self.__size()):
            to_be_printed.append(self.queue[i % self.q_size])
        print(to_be_printed)

to test our queue we need a couple of threads, a producer thread and a consumer thread which will use the queue


def producer(the_queue, timing):
    for counter in range(50):
        the_queue.enqueue(str(counter))
        time.sleep(timing)


def consumer(the_queue, timing):
    for _ in range(20):
        time.sleep(timing)
        print('consumer - get() - ' + str(the_queue.dequeue()))
        print('consumer - list() -')
        the_queue.print_queue()


if __name__ == '__main__':
    C_QUEUE = CQueue(q_size=32)

    CONSUMER_C_QUEUE = threading.Thread(group=None, target=consumer,
                                        name='consumer_queue', args=(C_QUEUE, 0.8))
    CONSUMER_C_QUEUE.start()
    PRODUCER_C_QUEUE = threading.Thread(group=None, target=producer,
                                        name='producer_queue', args=(C_QUEUE, 0.2))
    PRODUCER_C_QUEUE.start()
    PRODUCER_C_QUEUE.join()
    CONSUMER_C_QUEUE.join()

You can find the full source code on my github at cqueue.py

Gg1