rq (redis queue)实践

最近想写个东西,需要用到消息队列。Python支持的消息队列有好多,以前工作中也用过Celery,配置起来挺麻烦的。然后就想到曾经在V2EX上看到过的RQ,RQ是一个Python实现的消息队列系统,使用Redis作为后端存储,依赖挺少的,就这个吧。 RQ有多种使用方式,文档里已有介绍 http://python-rq.org/docs/,我使用了装饰器的方法,这样比较方便。 文档中是这样的,

from rq.decorators import job

@job('low', conn=my_redis_conn, timeout=5)
def add(x, y):
    return x + y

job = add.delay(3, 4)
time.sleep(1)
print job.result

需要在装饰器中传入配置参数,这样岂不是很麻烦,能不能在一处定义呢?通过查看源码,发现job其实是一个类,传入参数对其实例化,然后在"call"方法里为被装饰的函数增加一个"delay"的方法,这个方法其实就是使用"enqueue_call"方法将函数调用添加到队列中,既然知道了原理就好办了。

from model.connect import redis
from rq.decorators import job

def rq(f):
    _job = job(queue='default', connection=redis)
    return _job(f)

在此,我定义了一个叫rq的函数,作为新的装饰器,传入的函数"f"将被"job"的"call"方法修改,实现了和文档相同的效果,使用很简单

from somewhere import rq

@rq
def count_words_at_url(x):
    return len(x)

调用函数时和普通调用相同,如果需要将其加入消息队列则使用其"delay"方法

from func import count_words_at_url

count_words_at_url.delay('hello')

需要注意的是,无论使用哪种方法加入消息队列,函数的定义和调用不能在一个文件里,不然会报错 "Functions from the main module cannot be processed by workers." 我猜是因为rq调用函数时需要import 函数所在文件,而import时又会运行函数调用,造成死循环。

作者: Lerry
发表时间:2013-05-22