Redis list is great to use it as a processing queue. So great that it sort of became the standard and many people using that instead of dedicated queues such as ZeroMQ or RabbitMQ because it also gives further features that are useful. It’s main idea is to provide programming structures that you would normally use in any language. Variables, lists, sets, hashes (dictionaries), etc. But it can do even more than that by taking advantage of the functions it provides.
The use case
Sometimes it’s useful to not process items in the order of inserts and/or only necessary to process once but it may be put more than once in the queue. I faced with this problem when working with Django signals. Let’s say I want to do something with my parent class on child object being saved. That’s a very common use case for the post_save signal. Now there are usually more than one child class which has a foreign key to the same parent object. I actually don’t need to process every time an object is saved. If you’re familiar with Django imagine an admin form with inline related objects. Every time I would click save my main object would be updated N + 1 times in the database (N is the number of related objects) absolutely unnecessarily. I can do that a) in the background b) only once.
Priority queue
Sorted sets (ZSET) provides perfect way to remedy both the situation. Let’s say I have a process or cron job picks up items from the queue but instead of increasing the queue size everytime a new child class is saved it simply increases the priority for the parent class to be processed. You only need three commands to do that: ZINCRBY, ZREVRANGE, ZREM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
redis 127.0.0.1:6379[5]> type prio_list none redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test1 "1" redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test1 "2" redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test1 "3" redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test2 "1" redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test3 "1" redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test4 "1" redis 127.0.0.1:6379[5]> ZRANGE prio_list 0 -1 1) "test2" 2) "test3" 3) "test4" 4) "test1" redis 127.0.0.1:6379[5]> ZREVRANGE prio_list 0 0 1) "test1" redis 127.0.0.1:6379[5]> ZREM prio_list test1 (integer) 1 redis 127.0.0.1:6379[5]> ZREVRANGE prio_list 0 -1 1) "test4" 2) "test3" 3) "test2" |
Now you have a perfect priority queue implementation using basic redis commands.
ZREM vs ZREMRANGEBYRANK
ZREM has the time complexity (quoting redio.io):
O(M*log(N)) with N being the number of elements in the sorted set and M the number of elements to be removed.
ZREMRANGEBYRANK’s time complecity:
O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements removed by the operation.
So hypothetically the latter is better because it’s the sum of the log(N) and M while ZREM has the complexity of the product of these two but in our case it doesn’t matter since we only pop one element a time from the queue it won’t make any different.
In python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
import redis class PriorityQueue(object): def __init__(self, queue): self.queue = queue self._r = redis.StrictRedis(host='localhost', port=6379, db=5) def push(self, item): return self._r.zincrby(self.queue, item, 1) def pop(self): try: _item = self._r.zrevrange(self.queue, 0, 0)[0] if self._r.zrem(self.queue, _item) == 1: # We manager to pop the item from the queue return _item else: # Somebody else also got the same item and removed before us # Try again return self.pop() except IndexError: # Queue is empty pass |
Notice that we don’t need any special locking or concurrency control because we check the return value of zrem command. If it managed to remove the element it returns 1. If it didn’t it means we had a race condition and combody removed the element before our thread/process/script. In that case we retry.
Removing the top element from the sorted set is also referred to as ZPOP. If you prefer a single eval command there is a Lua implementation also available on redisgreen: https://www.redisgreen.net/library/zpop.html
Update (2016-02-07) based on Itamar’s comment:
The above implementation uses recursion in the assumption that contention will be a rare case. However with high concurrency or fast processing time (we did the whole thing to offload some of the heavier updates from the web serving time to a background process) the probability of such collisions increases and then a while loop is a better option than a recursive call.
A while loop version would look like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
class PriorityQueue(object): def __init__(self, queue): self.queue = u'amq:pq:%s' % queue self._r = redis.StrictRedis(host='localhost', port=6379, db=5) def push(self, item): return self._r.zincrby(self.queue, item, 1) @property def first(self): return self._r.zrevrange(self.queue, 0, 0)[0] def pop(self): try: _item = self.first while self._r.zrem(self.queue, _item) == 0: # Somebody else also got the same item and removed before us # Try again _item = self.first # We manager to pop the item from the queue return _item except IndexError: # Queue is empty pass |
Recent comments