文章详情页 您现在的位置是:网站首页>文章详情

异步任务组件Celery的使用(一)

图片丢失 Jeyrce.Lu 发表于:2020年4月18日 16:11 分类:【Python 2874次阅读

业务系统中我们通常都需要用到异步任务。举一个我们现在最常用的例子,当你注册或者登录微信的时候,会让你输入自己的手机号,然后点一个发送验证码按钮,按钮立马开始倒计时(防止你一通猛点),过了一会你收到一条短信,将短信中的验证码填写提交就完成了注册或者登录,在此过程中从你点击发送按钮到接收到短信的这个阶段就是一次典型的异步任务应用。在这个过程中,假设不使用异步任务,由于发送短信大约需要2-3秒的阻塞时间,在你点击发送按钮后会出现页面卡主不动,你也不知道发生了什么事,总之就是页面动不了,直到收到短信后页面又好了,如果是这样那你肯定忍不住要吐槽了,什么垃圾软件,我还以为手机坏了。这样的体验当然是致命的,多来几次也就没人用了。

异步任务介绍

当然上面的例子不仅仅是是为了提升用户体验,将发送短信这个任务放入到任务队列中也可起到缓冲的作用,不然同一时间n多人注册服务器是会直接崩溃的。那么,由这个例子引出,当有这些业务场景时,我们就应该考虑使用异步任务了:

1、不涉及共享资源,或对共享资源只读,即非互斥操作

2、没有时序上的严格关系

3、不需要原子操作,或可以通过其他方式控制原子性

4、常用于耗时操作,因为比较影响客户体验和使用性能

5、不影响主线程逻辑

除以上所列之外的场景,我们使用同步任务更为合适,因为同步任务相比也有自己的优点:

1、通常情况程序语言默认就是同步执行的阻塞过程,编写异步代码通常比同步代码复杂。

2、同步任务的资源始终处于同一上下文,更便于管理和使用资源。

3、同步代码中能更好的捕捉和处理异常,而异步任务需要存储执行过程和结果,并采用一些比如回调或客户端多次查询的方式获取任务结果。

Celery是一个python编写的异步任务组件,也提供了php在内的其他语言接口,许多python语言的web框架也集成了相关组件,本文接受Celery的初步使用方法,后续篇章进一步深入细节。

快速开始

一、下载安装

pip install -U celery

以上只是最简单的下载方式仅会下载celery,通常情况我们还需要celery配合其他组件使用,例如我们需要存储序列化数据,需要调度,需要存储结果,因此官方推荐了几个下载组合。

$ pip install "celery[librabbitmq]"
$ pip install "celery[librabbitmq,redis,auth,msgpack]"

# 还有其他很多种组合下载

celery[auth]:用于安全认证的序列化.    
celery[msgpack]:基于msgpack的序列化.    
celery[yaml]:基于yaml的序列化.    

celery[eventlet]: 基于eventlet的线程池  
celery[gevent]:基于gevent的线程池  

# 还有一些其他backend的组合
celery[librabbitmq]: 
celery[redis]:for using Redis as a message transport or as a result backend.    
celery[sqs]:for using Amazon SQS as a message transport (experimental).    
celery[tblib]:for using the task_remote_tracebacks feature.    
celery[memcache]:for using Memcached as a result backend (using pylibmc)    
celery[pymemcache]:for using Memcached as a result backend (pure-Python implementation).    
celery[cassandra]:for using Apache Cassandra as a result backend with DataStax driver.    
celery[couchbase]:for using Couchbase as a result backend.    
celery[arangodb]:for using ArangoDB as a result backend.    
celery[elasticsearch]:for using Elasticsearch as a result backend.    
celery[riak]:for using Riak as a result backend.    
celery[dynamodb]:for using AWS DynamoDB as a result backend.    
celery[zookeeper]:for using Zookeeper as a message transport.    
celery[sqlalchemy]:for using SQLAlchemy as a result backend (supported).    
celery[pyro]:for using the Pyro4 message transport (experimental).    
celery[slmq]:for using the SoftLayer Message Queue transport (experimental).    
celery[consul]:for using the Consul.io Key/Value store as a message transport or result backend (experimental).    
celery[django]:specifies the lowest version possible for Django support.

二、实现异步任务

(ishare) jeeyshe@jeeyshe-PC:~/Code/python/ishare$ tree tasks/
tasks/
├── cron.py
├── __init__.py
├── mail.py
└── settings.py

0 directories, 4 files

其中__init__.py实例化了celery应用,settings.py中定义了一些常见的必要配置项,mail.py和cron.py实现了几个典型的异步任务。

# __init__.py

import celery


# 实例化celery应用
app = celery.Celery()


# 读取配置
app.config_from_object("tasks.settings")


# 自动发现注册任务
app.autodiscover_tasks([
    "tasks.mail",
    "tasks.cron",
])

配置文件如下,可以看到我们此处使用的是redis来作为消息队列存储序列化数据和执行结果的。

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# Celery 配置

import os
import datetime

from kombu import Exchange, Queue
import django
from celery.schedules import crontab
from celery import platforms

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ishare.settings")
django.setup()
platforms.C_FORCE_ROOT = True   # 允许root用户启动worker

# 记录日志
CELERYD_HIJACK_ROOT_LOGGER = True
CELERY_TIMEZONE = 'Asia/Shanghai'

# 注册Celery任务, 或者使用celery.autodiscover_tasks也可
# CELERY_IMPORTS = (
#     "tasks.mail",
# )

# 序列化方法
CELERY_TASK_SERIALIZER = "pickle"
# 指定任务接受的序列化类型.
CELERY_ACCEPT_CONTENT = ["msgpack", "pickle", "json", "yaml", ]
# 结果序列化方法
CELERY_RESULT_SERIALIZER = "pickle"
# 结果保存
CELERY_RESULT_BACKEND = 'redis://:{}@{}:{}/2'.format("lujianxin.com", "127.0.0.1", 6379)
# Broker使用redis
BROKER_URL = 'redis://:{}@{}:{}/3'.format("lujianxin.com", "127.0.0.1", 6379)

default_exchange = Exchange('default', type='direct')
topic_exchange = Exchange('topic', type='topic')
fanout_exchange = Exchange('fanout', type='fanout')

CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('topic', topic_exchange, routing_key='topic'),
    Queue('fanout', fanout_exchange, routing_key='fanout'),
)

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

# 定时任务配置如下
CELERYBEAT_SCHEDULE = {
    # 更新网站访问量
    'loop_task_1': {
        'task': 'cron.update_visit_count',
        'schedule': datetime.timedelta(hours=1),  # 周期任务: 每隔time执行一次
        'args': (),
    },
    # 刷新文章点赞到库中
    'loop_task_2': {
        'task': 'cron.update_like_click',
        'schedule': datetime.timedelta(hours=1),
        'args': (),
    },
    # 提醒站长新的友链申请
    'fixed_task_3': {
        'task': 'cron.notify_new_link',
        'schedule': crontab(hour='21', minute='00'),  # 定时任务: 固定的时间点执行
        'args': (),
    },
    # 提醒站长新增的待审核文章
    'fixed_task_4': {
        'task': 'cron.notify_new_article',
        'schedule': crontab(hour='21', minute='30'),
        'args': (),
    },
    # 每月向友链推荐阅读
    'fixed_task_5': {
        'task': 'cron.recommend_month',
        'schedule': crontab(day_of_month='1', hour='10', minute='30'),
        'args': (),
    },
}

实现的具体任务

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
 邮件异步任务
"""
import os
import logging

from django.core.mail import send_mail, send_mass_mail
from django.core.mail import EmailMultiAlternatives
from django.template import loader
from django.contrib.auth import get_user_model

from ishare.settings import EMAIL_SUBJECT_PREFIX, SERVER_EMAIL
from blog.models import Link
from tasks import app

UserAccount = get_user_model()
logger = logging.getLogger(__name__)
add_prefix = lambda txt: '{}{}'.format(EMAIL_SUBJECT_PREFIX, txt) if not txt.startswith(EMAIL_SUBJECT_PREFIX) else txt


def supervisor_receivers():
    """
    超管接收组
    """
    receivers = UserAccount.objects.filter(is_superuser=True, is_staff=True, is_active=True)
    receivers = [getattr(user, UserAccount.EMAIL_FIELD) for user in receivers]
    return list(set([mail for mail in receivers if mail]))


def webmaster_receivers():
    """
    站长接收组
    """
    links = Link.objects.filter(is_active=True)
    receivers = [link.email for link in links]
    return list(set([mail for mail in receivers if mail]))


@app.task(name='mail.test_mail')
def test_mail():
    subject = '{}这是一封测试邮件,邀你共赏美文《青春》'.format(EMAIL_SUBJECT_PREFIX)
    text_content = """
    青春不是年华,而是心境;青春不是桃面、丹唇、柔膝,而是深沉的意志、恢宏的想像、炽热的感情;青春是生命的深泉涌流。

    青春气贯长虹,勇锐盖过怯弱,进取压倒苟安。如此锐气,二十后生有之,六旬男子则更多见。年岁有加,并非垂老;理想丢弃,方堕暮年。

    岁月悠悠,衰微只及肌肤;热忱抛却,颓唐必致灵魂。忧烦、惶恐、丧失自信,定使心灵扭曲,意气如灰。

    无论年届花甲,抑或二八芳龄,心中皆有生命之欢乐,奇迹之诱惑,孩童般天真久盛不衰。人的心灵应如浩淼瀚海,只有不断接纳美好、希望、欢乐、勇气和力量的百川,才能青春永驻、风华长存。

    一旦心海枯竭,锐气便被冰雪覆盖,玩世不恭、自暴自弃油然而生,即使年方二十,实已垂垂老矣;然则只要虚怀若谷,让喜悦、达观、仁爱充盈其间,你就有望在八十高龄告别尘寰时仍觉年轻。
    """
    # html_content = '<p>这是一封<strong>重要的</strong>邮件.</p>'
    msg = EmailMultiAlternatives(subject, text_content, SERVER_EMAIL, ['support@lujianxin.com'])
    # msg.attach_alternative(html_content, "text/html")
    msg.send()


@app.task(name="mail.send_one")
def send_one(subject, message, recipient_list, html=None):
    """
    发送一条消息: 自动添加主题前缀和签名
    """
    send_mail(
        subject=add_prefix(subject),
        message=message,
        from_email=SERVER_EMAIL,
        recipient_list=recipient_list,
        html_message=html,
    )


@app.task(name="mail.send_many_text")
def send_many_text(datatuple):
    """
    一次性发送多条消息: 自动添加前缀和主题签名
    datatuple:
    (
        (subject0, message0, sender, recipient),
        (subject1, message1, sender, recipient),
        (subject2, message2, sender, recipient),
    )
    """
    datatuple = ((add_prefix(d[0]), d[1], d[2], d[3]) for d in datatuple)
    send_mass_mail(datatuple)


@app.task(name="mail.send_password_reset_link")
def send_password_rest_link(subject_template_name, email_template_name,
                            context, from_email, to_email, html_email_template_name=None):
    subject = loader.render_to_string(subject_template_name, context)
    subject = ''.join(subject.splitlines())
    subject = '{}{}'.format(EMAIL_SUBJECT_PREFIX, subject)
    body = loader.render_to_string(email_template_name, context)
    email_message = EmailMultiAlternatives(subject, body, from_email, [to_email])
    if html_email_template_name is not None:
        html_email = loader.render_to_string(html_email_template_name, context)
        email_message.attach_alternative(html_email, 'text/html')
    email_message.send()

三、开启异步任务

此时我们已经实现好了代码,是时候让任务跑起来了,进入到tasks统计目录下启动celery,当前前提是先正常开启redis并且配置好帐密,运行worker。

celery -A tasks worker -l debug

此时我们能看到使用的队列信息和我们所有注册的任务,如果backend(也就是我们使用的redis)出了问题,他会一直尝试重连。

celery-start.png

四、业务逻辑中调用异步任务

现在我们已经成功注册并开启了异步任务,那么我们该如何在业务中使用呢,下面以一个简单的例子进行说明。

from django.contrib.auth.forms import PasswordResetForm

from tasks.mail import send_password_rest_link

class SyncMailPasswordResetForm(PasswordResetForm):
    """
    覆盖父类同步发送邮件,此处使用异步任务
    """

    def send_mail(self, subject_template_name, email_template_name, context, from_email, to_email,
                  html_email_template_name=None):
        send_password_rest_link.delay(subject_template_name, email_template_name, context, from_email, to_email,
                                      html_email_template_name=None)

我们只需要从tasks中导入这个function,使用function.delay()即可实现异步发送邮件,此时在页面点击发送邮件时,可以看到页面立马显示成功,查看后台日志发现约2s之后邮件正常发出,这就实现了我们文章开头所说的功能。

已经注册的任务,并不是一定要使用这种调用方式,通常我们还有以下这几种用法满足不同需求。

(1)同步使用,和不开启异步任务直接调用相同

func_name()

func_name.run()

(2)延迟触发:例如倒计时60秒后执行,实际上delay()调用方式就是基于apply_saync中计时参数=0实现

func_name.apply_async(60, *args)

五、定时任务

同linux中的crontab一样,celery还提供了定时任务,定时任务的注册方式和异步任务完全相同,但是需要在配置中声明且开启celery的另一个定时进程beat。

# 定时任务配置如下
CELERYBEAT_SCHEDULE = {
    # 更新网站访问量
    'loop_task_1': {
        'task': 'cron.update_visit_count',
        'schedule': datetime.timedelta(hours=1),  # 周期任务: 每隔time执行一次
        'args': (),
    },
    # 刷新文章点赞到库中
    'loop_task_2': {
        'task': 'cron.update_like_click',
        'schedule': datetime.timedelta(hours=1),
        'args': (),
    },
    # 提醒站长新的友链申请
    'fixed_task_3': {
        'task': 'cron.notify_new_link',
        'schedule': crontab(hour='21', minute='00'),  # 定时任务: 固定的时间点执行
        'args': (),
    },
    # 提醒站长新增的待审核文章
    'fixed_task_4': {
        'task': 'cron.notify_new_article',
        'schedule': crontab(hour='21', minute='30'),
        'args': (),
    },
    # 每月向友链推荐阅读
    'fixed_task_5': {
        'task': 'cron.recommend_month',
        'schedule': crontab(day_of_month='1', hour='10', minute='30'),
        'args': (),
    },
}

开启celery.beat之后会在tasks统计目录生成一个celerybeat-schedule的临时文件用于存储调度信息,观察定时任务,实际上beat会向worker按照定时规则发送一个执行信号,也就是所定时任务实际上也是由worker执行。

celery -A tasks beat -l debug

celery-brater-start.png

到这里celery的异步任务和定时任务两个主要功能演示已经完了,下一节我们将梳理Celery的主要架构和作用。

版权声明 本文属于本站  原创作品,文章版权归本站及作者所有,请尊重作者的创作成果,转载、引用自觉附上本文永久地址: http://blog.lujianxin.com/x/art/7zb1psn7alm8

文章评论区

作者名片

图片丢失
  • 作者昵称:Jeyrce.Lu
  • 原创文章:61篇
  • 转载文章:3篇
  • 加入本站:2046天

站点信息

  • 运行天数:2047天
  • 累计访问:164169人次
  • 今日访问:0人次
  • 原创文章:69篇
  • 转载文章:4篇
  • 微信公众号:第一时间获取更新信息