Energy and Persistence conquers

Celery 如何手动路由任务

2013-12-20

什么是Celery

Celery是一款开源的分布式异步任务队列,主要用于完成实时任务,也可以执行计划任务。
它的执行单元叫task,可以在一个或多个任务节点(Worker)上执行。这些Task支持使用multiprocessing,eventlet以及gevent运行。
详细的介绍请参阅Celery官网:http://www.celeryproject.org/

Celery怎么用

Celery在生产环境中一般都是搭配消息队列使用。官方推荐的消息队列为RabbitMQ,同时也支持使用Redis、SQLAlchemy、Amazon SQS、MongoDB以及IronMQ等。
celery可以将任务分发到消息队列(Broker)中,Broker再将任务发送到各个Worker,Worker可以根据各个服务器的负载部署。
关于RabbitMQ的使用,可以看这里:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq

RabbitMQ基础之Exchange和Queue

了解下面的内容需要对RabbitMQ中的Exchange和Queue有基础的了解,这里稍微解释一下。
RabbitMQ中的Exchange和Queue简单来说是一种包含的关系,一个Exchange下可以有多个Queue。
Celery的Worker一般都是连到Exchange下,Topic类型的Exchange(也有其他类型的Exchange,这里不做解释)收到消息之后可以根据消息的路由值(routing key)不同将消息转发的不同的Queue上。
我们这里说的手动路由任务就是利用了RabbitMQ的Exchange,Queue以及Routing Key。

celery路由

每一个Celery的Worker都要连到一个RabbitMQ的Exchange,一般来说Celery的配置文件里都有这样一句:

1
BROKER_URL = 'amqp://username:passwd@10.40.146.104:5672/rabbitmq_vhost'

这其实是指定Celery连的RabbitMQ的Broker,如果不指定Exchange,Queue和Routing Key,RabbitMQ会默认生成。如果要指定,可以这么写:

1
2
3
4
CELERY_DEFAULT_EXCHANGE = 'exchange_name'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_QUEUE = 'queue_name'
CELERY_DEFAULT_ROUTING_KEY = 'routing_key_name'

这样配置了之后,Celery就会用这些配置来路由任务。这其实就是自动路由了。

Celery手动路由

可以看到,Celery的自动路由还是很简单的,但是,在项目中,我们经常会有这样的需求:当有多种Task的时候,我们希望把一种Task给一Worker,而把另外一种Task给另外一个Worker。在这样的需求下,上面的自动路由明显就不能胜任了。
而且目前Celery也不支持Task的优先级设置,所以很明显当Task有优先级时,自动路由简单地排队功能就不适用了。

假设你在两个服务器上分别有Worker X和Worker Y,这两个处理普通的Task;在另一个服务器上有Worker Z,用来处理优先级更高的Task。你可以用这样的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from kombu import Queue
CELERY_DEFAULT_EXCHANGE = ‘exchange_name’
CELERY_DEFAULT_EXCHANGE_TYPE = ‘topic’
CELERY_DEFAULT_QUEUE = ‘default’
CELERY_DEFAULT_ROUTING_KEY = ‘task.default’
CELERY_QUEUES = (
Queue(‘default’, routing_key=‘task.#’),
Queue(‘high_priority’, routing_key=‘high_priority.#’),
)

CELERY_ROUTES = {
‘high_priority_task’: {
‘queue’: ‘high_priority’,
‘routing_key’: ‘high_priority.#’,
},
}

这里配置了2个Queue,default绑定的routing_key是task.#(#是任意匹配),high_priority绑定的routing_key是high_priority.#。默认情况下,Task会发到default Queue上,routing_key是task.#;当你发送Task时指定了Queue和routing_key之后,RabbitMQ会根据CELERY_ROUTES里的配置将Task发送到配置好的Queue上。
这里要注意的是在启Worker的时候也要指定Queue。所以X和Y启动的时候要这样写:

1
celery worker -Q default

Z启动时要这样写:

1
celery worker -Q high_priority

都配置好之后,如果要将Task发送给X和Y(这里省略了一些参数,意思是使用默认的):

1
send_task(‘task_name’, args=[args_list])

发送Task给Z:

1
send_task(‘task_name’, args=[args_list], queue=‘high_priority’, routing_key=‘high_priority.name’)

“自动化”的手动路由

上面的例子是只有2个Queue的情况,在项目中有时候会因为性能或者其他一些方面的考虑而需求让每个Worker处理特定的Task(其实Celery是不希望用户这么做的)。当你有100+台服务器时,你会发现上面的配置会写的很长很长;而且当你有新的服务器时,又要去改配置,非常的不方便。
这里的思路还是比较简单的,一般来说这些服务器都是在同一个网段的,可以默认以每一台机器的IP作为Queue的名字。那么每个Worker的配置可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
import socket
from kombu import Queue
LOCAL_IP = socket.gethostbyname(socket.gethostname()).replace(‘.’, ‘_’)
CELERY_QUEUES = (
Queue(‘%s’ % LOCAL_IP, routing_key=‘routing_key_%s’ % LOCAL_IP),
)
CELERY_ROUTES = {
‘route_%s’ % LOCAL_IP: {
‘queue’: ‘%s’ % LOCAL_IP,
‘routing_key’: ‘routing_key_%s’ % LOCAL_IP,
},
}

这里我用下划线代替了IP地址中的点号。这样写了之后每一个Worker启动时的Queue应该是本机的IP地址,假设我在192.168.10.4上:

1
celery worker -Q 192_168_10_4

发送Task模块的配置有一些不同,需要遍历当前网段的机器IP:

1
2
3
4
5
6
7
8
9
10
11
12
13
from kombu import Queue
SERVERS = []
for address in range(0, 256):
SERVERS.append(‘192_168_10_%d’ % address)
CELERY_QUEUES = []
CELERY_ROUTES = {}
for ser in SERVERS:
CELERY_QUEUES.append(Queue(‘%s’ % ser, routing_key=‘routing_key_%s’ % ser))
CELERY_ROUTES[‘route_%s’ % ser] = {
‘queue’: ‘%s’ % ser,
‘routing_key’: ‘routing_key_%s’ % ser,
}
CELERY_QUEUES = tuple(CELERY_QUEUES)

配置好了之后要发送Task给某个服务器时,Queue就是IP,routing_key就是”routing_key_IP”。

总结

虽然这样的功能可以实现,但是作为一款分布式的任务队列,非特殊情况下最好不要这样使用。毕竟这样其实是失去了分布式的意义。