In the post Python 3 priority queues – source code examples I spoke about the priority queues provided by Python 3. They are very useful, especially because they are provided out of the box.
However, some features that are indispensable when doing multithreaded programming are missing. The most important is a way to avoid starvation. Starvation in the Python 3 PriorityQueue is due to the simple scheduling algorithm used.
An item, with low priority, pushed into the queue can wait forever to be consumed if other items with higher priorities are pushed into the queue continuously.
It is very simple to simulate this behaviour, take a look at the following code:

"""simple module showing priority queue usage. Full info and description on www.xappsoftware.com
"""
import time
import threading
from random import randint
# python 2.x import Queue
import queue

def producer(the_queue, timing):
    """[Produces and inserts new data into the_queue]

    Arguments:
        the_queue {[CQueue]} -- [the queue used to store data]
        timing {[Integer]} -- [the interval of time between the production of two items]
    """
    while True:
        the_queue.put((0, 'high priority insertion'))
        time.sleep(timing)

def consumer(the_queue, timing):
    """[Consumes data from the queue]

    Arguments:
        the_queue {queue item} -- [the queue used to get data.]
        timing {Integer}} -- [the interval of time between two retrieves.]
    """
    while True:
        time.sleep(timing)
        print('consumer - get() - ' + str(the_queue.get()))


if __name__ == '__main__':
    # python2 Queue.Queue() and Queue.LifoQueue()
    PRIO_QUEUE = queue.PriorityQueue()
    print('Populating the priority queue')
    for counter in range(10):
        time.sleep(0.4)
        prio = randint(0, 3)
        strn = "data - "+ str(prio) + " " + str(time.asctime())
        PRIO_QUEUE.put((prio, strn))
        print((prio, strn))

    time.sleep(1)

    CONSUMER_PRIO_QUEUE = threading.Thread(group=None, target=consumer,
                                           name='consumer_queue', args=(PRIO_QUEUE, 1))
    CONSUMER_PRIO_QUEUE.start()
    PRODUCER_PRIO_QUEUE = threading.Thread(group=None, target=producer,
                                           name='producer_queue', args=(PRIO_QUEUE, 1))
    PRODUCER_PRIO_QUEUE.start()
    PRODUCER_PRIO_QUEUE.join()
    CONSUMER_PRIO_QUEUE.join()

This simple program, at first, initializes the queue with random values for the priority. Then it launches two threads, one thread consumes data from the queue the other produces new items with the highest priority.

The following few lines show the output produced by a run of the script:

(2, 'data - 2 Mon Jan 22 08:29:10 2018')
(3, 'data - 3 Mon Jan 22 08:29:10 2018')
(2, 'data - 2 Mon Jan 22 08:29:11 2018')
(3, 'data - 3 Mon Jan 22 08:29:11 2018')
(1, 'data - 1 Mon Jan 22 08:29:12 2018')
(2, 'data - 2 Mon Jan 22 08:29:12 2018')
(3, 'data - 3 Mon Jan 22 08:29:12 2018')
(2, 'data - 2 Mon Jan 22 08:29:13 2018')
(1, 'data - 1 Mon Jan 22 08:29:13 2018')
(0, 'data - 0 Mon Jan 22 08:29:14 2018')
consumer - get() - (0, 'data - 0 Mon Jan 22 08:29:14 2018')
consumer - get() - (0, 'high priority insertion')
consumer - get() - (0, 'high priority insertion')
consumer - get() - (0, 'high priority insertion')
....
....

As you can see, data with lower priority won’t be consumed.

There are several algorithms to avoid starvation, the most used are aging and round robin.

Aging

From Wikipedia

“Fixed priority scheduling is a scheduling discipline, in which tasks queued for utilizing a system resource are assigned a priority each. A task with a high priority is allowed to access a specific system resource before a task with a lower priority is allowed to do the same. A disadvantage of this approach is that tasks assigned with a lower priority may be starved when a large number of high priority tasks are queued. Aging is used to gradually increase the priority of a task, based on its waiting time in the ready queue.”

Our queue has priority range of 0-3. In this queue, 0 means highest priority.
Consider an item with priority 3. If we increase its priority by 0.1 at every dequeue call, then after thirty dequeue calls it will age to 0 priority and it will be in pole position to be fetched.

Round Robin

In Round Robin, what we want to do is:
1. Host of the time, the important items are fetched.
2. However, we guarantee that some of the time, the less important items are fetched.

To reach 1. goal we can use the priority queue.
To reach 2. goal we have to define when the queue should return higher priority items and when the queue should return lower priority items. We can do this providing to the queue a roundrobin list for example
round_robin = [0, 0, 1, 0, 0, 2, 0, 0, 1]

The meaning of the list is:

  • For the first and second fetches, take items with priority = 0
  • For the third fetch, take the first item with priority = 1
  • For the fourth and fifth fetches, take items with priority = 0
  • For the sixth fetch, take an item with priority = 2
  • For the seventh and eighth fetches, take items with priority = 0
  • For the nineth fetch, take the first item with priority = 1

 

  • If items are not available take the first item in the prio order.

In this way we guarantee that each 9 times:

  • 6 times the fetched item have priority 0
  • 2 times the fetched item have priority 1
  • 1 time the fetched item have priority 2

The post Python 3 priority queues with aging – Avoiding starvation shows the code to implement priority queues with aging.

The post Python 3 Round Robin Priority queues – Avoiding starvation shows how to implement a round robin priority queue.

Gg1