In the previous post (Python 3 avoid starvation in Priority Queues) I explained the starvation problem you face front when using Python priority queues. In this post I’m going to reimplement a queue that avoids the problem.
To implement the new queue class I’ll use a list a tuple with three elements: the data, the priority and the aging.
class AgingPriorityQueue(object): """ Class implementing a priority queue with aging. """ def __init__(self, qsize=32, max_aging=10): """[summary] Keyword Arguments: qsize {Integer} -- he max size for the queue (default: {32}) max_aging {Integer} -- The maximum value for the aging (default: {10}) """ self.max_aging = max_aging self.queue = list() self.queue_len = 0 self.q_size = qsize
Every time a producer will enqueue a new item, the class will update the aging of the items in the queue.
@synchro(LOCK) def enqueue(self, data, item_prio=0, aging=10): """ Enqueues a new item into the queue Decorators: synchro Synchronizes the access to the queue Arguments: data {data} -- the item to be added to the queue Keyword Arguments: item_prio {Integer} -- The priority of the item (default: {0}) aging {Integer} -- The initial aging of the item (default: {10}) Returns: Boolean -- False if the queue is full, True if the item has been inserted. """ res = False if len(self.queue) == self.q_size: print("Queue Full!") else: self.queue.append((data, item_prio, aging)) self.queue = sorted(self.queue, key=lambda element: (element[1], element[2])) self.queue_len = self.queue_len + 1 self.__update_aging() res = True return res
The update aging method is very simple, the update will decrement the aging field, if the new aging value is zero then the priority is decremented, too.
def __update_aging(self): """ Updates the aging of the items in the list """ for i in range(self.queue_len): elem = list(self.queue[i]) elem[2] = (elem[2] - 1)%self.max_aging if elem[2] == 0: # decrement the priority if the aging is 0 elem[1] -= 1 item = tuple(elem) self.queue[i] = item return
The dequeue method is very simple, and when it is called it will update the aging, too.
def dequeue(self): if self.queue_len == 0: print("Queue is Empty!") return False else: the_tuple = self.queue[0] del self.queue[0] self.queue_len -= 1 self.__update_aging() return the_tuple[0]
the main program is as follows:
if __name__ == '__main__': AG_Q = AgingPriorityQueue() AG_Q.enqueue("1111111111111111", 1) AG_Q.print_queue() time.sleep(0.5) AG_Q.enqueue("7777777777777777", 2) AG_Q.print_queue() time.sleep(0.5) for idx in range(32): AG_Q.enqueue("n"+str(idx), 0) AG_Q.print_queue() time.sleep(0.5) for idx in range(32): AG_Q.print_queue() print(AG_Q.dequeue()) time.sleep(0.5)
Full and working source code is available on my github at prio_queue_aging.py