文章详情页 您现在的位置是:网站首页>文章详情
Django应用celery,实现多worker,多队列
Amir 发表于:2019年6月15日 16:24 分类:【Python】 11342次阅读
一、原理
celery是一个分布式的任务调度模块,那么怎么实现它的分布式功能呢,celery可以支持多台不同的计算机执行不同的任务或者相同的任务。
简单理解:
可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,
而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。
exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker。
二、实例目录结构
三、案例实现
1. 简单介绍目录结构
首先创建一个django项目,创建一个app测试用。
在app的同级目录下创建关于celery的一个目录(celery_tasks),在celery_tasks创建一个config.py文件用来放配置文件,创建一个main.py文件,即为入口、配置加载文件。并在同级目录下创建一个/多个app(比如放送邮件,发送短信等模块,),然后在此app下创建存放异步任务代码的tasks.py文件,目录机构如上图。
2. main.py文件介绍如注释
from celery import Celery
import os
# 为celery使用django配置文件进行设置
if not os.getenv('DJANGO_SETTINGS_MODULE'):
os.environ['DJANGO_SETTINGS_MODULE'] = 'amirtestcelery.settings'
# 创建celery应用
app = Celery('amirtestcelery')
# 导入celery配置文件
app.config_from_object('celery_tasks.config')
# 自动注册celery任务,需要启动哪个模块下,则加入列表中
app.autodiscover_tasks(['celery_tasks.appone', 'celery_tasks.apptwo'])
3. 分别在tasks.py文件下创建异步任务
这里提醒的是 @app.task(name='apptwo_mult') 这里的name是为这个方法提供一个命名空间(唯一),后面会用到
4. config.py文件介绍(关键)
import datetime
from celery.schedules import crontab
from kombu import Exchange, Queue
BROKER_URL = 'redis://localhost:6379/10'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/11'
CELERY_TIMEZONE = 'Asia/Shanghai'
# 队列
CELERY_QUEUES = (
Queue("default", Exchange("default"), routing_key="default"),
Queue("appone_add_queue", Exchange("appone_add_queue"), routing_key="appone_add_router"),
Queue("apptwo_mult_queue", Exchange("apptwo_mult_queue"), routing_key="apptwo_mult_router")
)
# 路由
CELERY_ROUTES = {
'appone_add': {"queue": "appone_add_queue", "routing_key": "appone_add_router"},
'apptwo_mult': {"queue": "apptwo_mult_queue", "routing_key": "apptwo_mult_router"}
}
# 定时任务配置如下
CELERYBEAT_SCHEDULE = {
'beat_task1': {
'task': 'appthree_comment',
'schedule': datetime.timedelta(seconds=2),
'args': (2, 8)
},
'beat_task2': {
'task': 'appthree_comment',
'schedule': crontab(hour=16, minute=32),
'args': (4, 5)
}
}
# 单独对一个任务启动命令 会启动指定queue
# celery -A celery_tasks.main worker -l info -n workerA.%h -Q appone_add_queue
# celery -A celery_tasks.main worker -l info -n workerB.%h -Q apptwo_mult_queue
# 当任务没有指定queue 则任务会加入default 队列 beat(定时任务也会加入default队列)
# celery -A celery_tasks.main worker -l info -n worker -Q celery
# 启动定时任务
# celery beat -A celery_tasks.main -l INFO
# -A 表示 应用目录 这里是celery_tasks.main
# -B 表示 定时任务
# -l 表示日志级别
# -n woker名
# -Q 队列名
# .%h 对应不同主机ip 如果默认localhost,所以可以省略.%h
BROKER_URL 表示生产者存放任务的数据库,我这用的是redis
CELERY_RESULT_BACKEND 表示消费者存放的结果
CELERY_TIMEZONE 设置时区呗
CELERY_QUEUES 创建队列
5. 启动命令(代码都写好了,是时候启动了)
# 单独对一个任务启动命令 会启动指定queue
# celery -A celery_tasks.main worker -l info -n workerA.%h -Q appone_add_queue
# celery -A celery_tasks.main worker -l info -n workerB.%h -Q apptwo_mult_queue
# 当任务没有指定queue 则任务会加入default 队列 beat(定时任务也会加入default队列)
# celery -A celery_tasks.main worker -l info -n worker -Q celery
# 启动定时任务
# celery beat -A celery_tasks.main -l INFO
# -A 表示 应用目录 这里是celery_tasks.main
# -B 表示 定时任务
# -l 表示日志级别 这是英文小写l不是数字1
# -n woker名 自定义
# -Q 队列名
# .%h 对应不同主机ip 如果默认localhost,所以可以省略.%h
6. 随笔
Celery是典型的生产者与消费者模式,生产者(broker)add任务到队列中,消费者(worker)去队列中BROKER_URL拿任务,执行完之后,结果放CELERY_RESULT_BACKEND中。创建多个队列之后,启动多个worker,每个worker启动一个主进程,每个主进程还可以创建多个子进程,当然可以通过CELERY_CONCURRENCY设置子进程个数,也就是并发数。
我是Amir,有兴趣探讨的可以联系我QQ:429771087
Git:git@github.com:AmirHuang/amirtestcelery.git
版权声明 本文属于本站 原创作品,文章版权归本站及作者所有,请尊重作者的创作成果,转载、引用自觉附上本文永久地址: http://blog.lujianxin.com/x/art/27s487xprms4
上一篇:schema-让数据验证更优雅
下一篇:关于tornado框架
猜你喜欢
文章评论区
作者名片
- 作者昵称:Amir
- 原创文章:1篇
- 转载文章:0篇
- 加入本站:2043天
作者其他文章
站长推荐
友情链接
站点信息
- 运行天数:2047天
- 累计访问:164169人次
- 今日访问:0人次
- 原创文章:69篇
- 转载文章:4篇
- 微信公众号:第一时间获取更新信息