rq (redis queue)实践
最近想写个东西,需要用到消息队列。Python支持的消息队列有好多,以前工作中也用过Celery,配置起来挺麻烦的。然后就想到曾经在V2EX上看到过的RQ,RQ是一个Python实现的消息队列系统,使用Redis作为后端存储,依赖挺少的,就这个吧。 RQ有多种使用方式,文档里已有介绍 http://python-rq.org/docs/,我使用了装饰器的方法,这样比较方便。 文档中是这样的,
from rq.decorators import job
@job('low', conn=my_redis_conn, timeout=5)
def add(x, y):
return x + y
job = add.delay(3, 4)
time.sleep(1)
print job.result
需要在装饰器中传入配置参数,这样岂不是很麻烦,能不能在一处定义呢?通过查看源码,发现job其实是一个类,传入参数对其实例化,然后在"call"方法里为被装饰的函数增加一个"delay"的方法,这个方法其实就是使用"enqueue_call"方法将函数调用添加到队列中,既然知道了原理就好办了。
from model.connect import redis
from rq.decorators import job
def rq(f):
_job = job(queue='default', connection=redis)
return _job(f)
在此,我定义了一个叫rq的函数,作为新的装饰器,传入的函数"f"将被"job"的"call"方法修改,实现了和文档相同的效果,使用很简单
from somewhere import rq
@rq
def count_words_at_url(x):
return len(x)
调用函数时和普通调用相同,如果需要将其加入消息队列则使用其"delay"方法
from func import count_words_at_url
count_words_at_url.delay('hello')
需要注意的是,无论使用哪种方法加入消息队列,函数的定义和调用不能在一个文件里,不然会报错 "Functions from the main module cannot be processed by workers." 我猜是因为rq调用函数时需要import 函数所在文件,而import时又会运行函数调用,造成死循环。
作者: Lerry
文章标题:rq (redis queue)实践
发表时间:2013-05-22
版权说明:CC BY-NC-ND 4.0 DEED