2025年celery异步与定时任务

celery异步与定时任务celery 简介 Celery 是一个简单 灵活且可靠的 处理大量消息的分布式系统 专注于实时处理的异步任务队列 同时也支持任务调度 Celery 的架构由三部分组成 消息中间件 message broker 任务执行单元 worker

大家好,我是讯享网,很高兴认识大家。

celery简介

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
在这里插入图片描述
讯享网

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

  • 任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  • 任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

celery安装

pip install celery pip install redis 

讯享网

快速上手

项目文件结构

![image.png](https://img-blog.csdnimg.cn/img_convert/9c74bcb530e2fa279c30122d6e72cf14.png#clientId=ud0b9e244-ff1a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=172&id=u5d2c0338&margin=[object Object]&name=image.png&originHeight=172&originWidth=280&originalType=binary&ratio=1&rotation=0&showTitle=false&size=12242&status=done&style=none&taskId=u1a0c6d3c-1137-406b-8bce-6a79f&title=&width=280)

定义workers(消费者)

  • celery_task.py
讯享网import celery import time # 异步执行结果存储 backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1' # 消息中间件 broker = 'redis://:CuiLiang@0302@127.0.0.1:6379/2' cel = celery.Celery('test', backend=backend, broker=broker) @cel.task def send_email(name): print("向%s发送邮件..." % name) time.sleep(5) print("向%s发送邮件完成" % name) return "ok" @cel.task def send_sms(name): print("向%s发送短信..." % name) time.sleep(10) print("向%s发送短信完成" % name) return "ok" 

启动celery

celery -A celery_task worker -l info -------------- celery@cuiliangdeAir v5.2.3 (dawn-chorus) --- * ----- -- * ---- macOS-12.2.1-x86_64-i386-64bit 2022-03-13 12:41:53 - * --- * --- - ---------- [config] - ---------- .> app: test:0x10bb43730 - ---------- .> transport: redis://:@127.0.0.1:6379/2 - ---------- .> results: redis://:@127.0.0.1:6379/1 - * --- * --- .> concurrency: 4 (prefork) -- * ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- * ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery_task.send_email . celery_task.send_sms [2022-03-13 12:41:53,699: INFO/MainProcess] Connected to redis://:@127.0.0.1:6379/2 [2022-03-13 12:41:53,710: INFO/MainProcess] mingle: searching for neighbors [2022-03-13 12:41:54,752: INFO/MainProcess] mingle: all alone [2022-03-13 12:41:54,810: INFO/MainProcess] celery@cuiliangdeAir ready. 

定义broker(生产者)

讯享网from celery_task import send_email, send_sms result = send_email.delay("one") print(result.id) result2 = send_sms.delay("two") print(result2.id) 
  • 直接运行broker后,在celery中可看到日志信息
[2022-03-13 12:42:03,125: INFO/MainProcess] Task celery_task.send_email[bfdfce02-79d3-4fa7-92b7-36e02a6a824a] received [2022-03-13 12:42:03,130: WARNING/ForkPoolWorker-2] 向one发送邮件... [2022-03-13 12:42:03,139: INFO/MainProcess] Task celery_task.send_sms[-88a3-4721-b50e-00da5e5690d0] received [2022-03-13 12:42:03,142: WARNING/ForkPoolWorker-4] 向two发送短信... [2022-03-13 12:42:08,137: WARNING/ForkPoolWorker-2] 向one发送邮件完成 [2022-03-13 12:42:08,182: INFO/ForkPoolWorker-2] Task celery_task.send_email[bfdfce02-79d3-4fa7-92b7-36e02a6a824a] succeeded in 5.0101s: 'ok' [2022-03-13 12:42:13,145: WARNING/ForkPoolWorker-4] 向two发送短信完成 [2022-03-13 12:42:13,175: INFO/ForkPoolWorker-4] Task celery_task.send_sms[-88a3-4721-b50e-00da5e5690d0] succeeded in 10.087s: 'ok' 

定义result(获取任务结果)

讯享网from celery.result import AsyncResult from celery_task import cel def get_result(task): async_result = AsyncResult(id=task, app=cel) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行') if __name__ == '__main__': task_id = 'bfdfce02-79d3-4fa7-92b7-36e02a6a824a' get_result(task_id) 

多任务结构

项目文件结构

在这里插入图片描述

定义workers

  • celery_tasks/celery.py(celery初始化)
from celery import Celery cel = Celery('celery_demo', # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类 include=['celery_tasks.task01', 'celery_tasks.task02' ]) # 通过celery实例加载配置模块 cel.config_from_object('celery_tasks.celery_config') 
  • celery_tasks/celery_config.py(celery配置)
讯享网# 官方配置文档:查询每个配置项的含义。 # http://docs.celeryproject.org/en/latest/userguide/configuration.html # broker(消息中间件来接收和发送任务消息) broker_url = 'redis://:CuiLiang@0302@127.0.0.1:6379/2' # backend(存储worker执行的结果) result_backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1' # 设置时间参照,不设置默认使用的UTC时间 timezone = 'Asia/Shanghai' # 是否使用UTC enable_utc = False 
  • celery_tasks/task01.py
import time from celery_tasks.celery import cel @cel.task def send_email(name): print("向%s发送邮件..." % name) time.sleep(5) print("向%s发送邮件完成" % name) return "邮件发送成功" 
  • celery_tasks/task02.py
讯享网import time from celery_tasks.celery import cel @cel.task def send_sms(name): print("向%s发送短信..." % name) time.sleep(10) print("向%s发送短信完成" % name) return "短信发送成功" 

启动celery

 celery -A celery_task worker -l info 

定义broker

讯享网from celery_tasks.task01 import send_email from celery_tasks.task02 import send_sms result = send_email.delay("张三") print(result.id) result2 = send_sms.delay("李四") print(result2.id) 

定义result

from celery.result import AsyncResult from celery_tasks.celery import cel def get_result(task): async_result = AsyncResult(id=task, app=cel) if async_result.successful(): result = async_result.get() print(result) # result.forget() # 将结果删除 elif async_result.failed(): print('执行失败') elif async_result.status == 'PENDING': print('任务等待中被执行') elif async_result.status == 'RETRY': print('任务异常后正在重试') elif async_result.status == 'STARTED': print('任务已经开始被执行') if __name__ == '__main__': task_id = 'f008d807-7fd6-4094-b2e2-97c2d154dc1d' get_result(task_id) 

定时任务

broker方式调用

  • produce_task.py
讯享网from datetime import datetime, timedelta from zoneinfo import ZoneInfo from celery_tasks.task01 import send_email from celery_tasks.task02 import send_sms # 指定时间运行 run_time1 = datetime(2022, 3, 13, 14, 7, 00, tzinfo=ZoneInfo("Asia/Shanghai")) print(run_time1) result1 = send_email.apply_async(args=["Alex", ], eta=run_time1) print(result1.id) # 10秒钟后运行 run_time2 = datetime.utcnow() + timedelta(seconds=10) print(run_time2) result2 = send_sms.apply_async(args=["Alisa", ], eta=run_time2) print(result2.id) 
  • 查看celery执行结果
[2022-03-13 14:06:35,931: INFO/MainProcess] Task celery_tasks.task01.send_email[506d6747-4b7b-49cd-8e1d-56ae05a31a3b] received [2022-03-13 14:06:35,933: WARNING/ForkPoolWorker-2] 向Alex发送邮件... [2022-03-13 14:06:35,939: INFO/MainProcess] Task celery_tasks.task02.send_sms[73ad47fd-ef1b-45a7-9ef9-6ea121ea9254] received [2022-03-13 14:06:40,936: WARNING/ForkPoolWorker-2] 向Alex发送邮件完成 [2022-03-13 14:06:40,946: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[506d6747-4b7b-49cd-8e1d-56ae05a31a3b] succeeded in 5.04196s: '邮件发送成功' [2022-03-13 14:06:46,292: WARNING/ForkPoolWorker-2] 向Alisa发送短信... [2022-03-13 14:06:56,295: WARNING/ForkPoolWorker-2] 向Alisa发送短信完成 [2022-03-13 14:06:56,303: INFO/ForkPoolWorker-2] Task celery_tasks.task02.send_sms[73ad47fd-ef1b-45a7-9ef9-6ea121ea9254] succeeded in 10.057s: '短信发送成功' 

workers方式调用

  • celery_tasks/celery_config.py(新增定时任务配置)
讯享网from datetime import timedelta from celery.schedules import crontab # 官方配置文档:查询每个配置项的含义。 # http://docs.celeryproject.org/en/latest/userguide/configuration.html # broker(消息中间件来接收和发送任务消息) broker_url = 'redis://:CuiLiang@0302@127.0.0.1:6379/2' # backend(存储worker执行的结果) result_backend = 'redis://:CuiLiang@0302@127.0.0.1:6379/1' # 设置时间参照,不设置默认使用的UTC时间 timezone = 'Asia/Shanghai' # 是否使用UTC enable_utc = False # 设置定时任务 beat_schedule = { 
    'task1': { 
    'task': 'celery_tasks.task01.send_email', # 'schedule': crontab(minute="*/1"), # 每分钟发送一次 'schedule': timedelta(seconds=10), # 每10秒发送一次任务消息 'args': ('张三',) }, 'task2': { 
    'task': 'celery_tasks.task02.send_sms', 'schedule': crontab(hour=20, minute=46), # 在每天的晚上10点24分发送一次任务消息 'args': ('李四',) } } 
  • 启动celery workers消费任务
 celery -A celery_task worker -l info 
  • 启动Celery Beat进程,读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
讯享网 celery -A celery_tasks beat 
  • 观察celery控制台打印
2022-03-13 20:45:45,497: INFO/MainProcess] Task celery_tasks.task01.send_email[54ce4bf2-f746-4c46-83b7-7842e105f449] received [2022-03-13 20:45:45,498: WARNING/ForkPoolWorker-2] 向张三发送邮件... [2022-03-13 20:45:50,501: WARNING/ForkPoolWorker-2] 向张三发送邮件完成 [2022-03-13 20:45:50,513: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[54ce4bf2-f746-4c46-83b7-7842e105f449] succeeded in 5.0468s: '邮件发送成功' [2022-03-13 20:45:55,491: INFO/MainProcess] Task celery_tasks.task01.send_email[-fca7-4391-9d66-1eef4a265a17] received [2022-03-13 20:45:55,493: WARNING/ForkPoolWorker-2] 向张三发送邮件... [2022-03-13 20:46:00,009: INFO/MainProcess] Task celery_tasks.task02.send_sms[cb5c22b5-cd17-42d1-b35d-cdf1ffef2054] received [2022-03-13 20:46:00,012: WARNING/ForkPoolWorker-4] 向李四发送短信... [2022-03-13 20:46:00,494: WARNING/ForkPoolWorker-2] 向张三发送邮件完成 [2022-03-13 20:46:00,503: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[-fca7-4391-9d66-1eef4a265a17] succeeded in 5.0066s: '邮件发送成功' [2022-03-13 20:46:05,492: INFO/MainProcess] Task celery_tasks.task01.send_email[f018aebb-32ca-4f47-9a32-81bc24af8b17] received [2022-03-13 20:46:05,494: WARNING/ForkPoolWorker-2] 向张三发送邮件... [2022-03-13 20:46:10,016: WARNING/ForkPoolWorker-4] 向李四发送短信完成 [2022-03-13 20:46:10,043: INFO/ForkPoolWorker-4] Task celery_tasks.task02.send_sms[cb5c22b5-cd17-42d1-b35d-cdf1ffef2054] succeeded in 10.0006s: '短信发送成功' [2022-03-13 20:46:10,496: WARNING/ForkPoolWorker-2] 向张三发送邮件完成 [2022-03-13 20:46:10,508: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[f018aebb-32ca-4f47-9a32-81bc24af8b17] succeeded in 5.09s: '邮件发送成功' [2022-03-13 20:46:15,496: INFO/MainProcess] Task celery_tasks.task01.send_email[5748ef00-ccfb-4d15-9499-4cfe6d] received [2022-03-13 20:46:15,498: WARNING/ForkPoolWorker-2] 向张三发送邮件... ^C worker: Hitting Ctrl+C again will terminate all running tasks! worker: Warm shutdown (MainProcess) [2022-03-13 20:46:20,499: WARNING/ForkPoolWorker-2] 向张三发送邮件完成 [2022-03-13 20:46:20,511: INFO/ForkPoolWorker-2] Task celery_tasks.task01.send_email[5748ef00-ccfb-4d15-9499-4cfe6d] succeeded in 5.0816s: '邮件发送成功' 

需要注意的是先启动消费者,再启动生产者。否则当生产者启动后,会产生大量任务在队列中,启动消费者后会造成任务大量囤积消费。

DRF使用celery

项目文件结构

在这里插入图片描述

settings中添加配置

  • DrfTest/settings.py
讯享网# celery配置 # broker(消息中间件来接收和发送任务消息) CELERY_BROKER_URL = 'redis://:CuiLiang@0302@127.0.0.1:6379/3' # backend(存储worker执行的结果) CELERY_RESULT_BACKEND = 'redis://:CuiLiang@0302@127.0.0.1:6379/4' # 设置时间参照,不设置默认使用的UTC时间 CELERY_TIMEZONE = 'Asia/Shanghai' # 是否使用UTC CELERY_ENABLE_TUC = False 

创建celery入口文件

  • DrfTest/celery.py
# 主程序 import os from celery import Celery # 把celery和django进行组合,识别和加载django的配置文件 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DrfTest.settings') # 创建celery实例对象 app = Celery("mycelery") # 通过app对象加载配置 app.config_from_object('django.conf:settings', namespace='CELERY') # 自动扫描并加载任务(所有celery任务必须在app的tasks.py下) app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: { 
     self.request!r}') # 启动Celery的命令 # 在项目根目录下启动 # celery -A DrfTest worker -l INFO 
  • DrfTest/init.py(在模块中导入此应用程序。这确保了在Django启动时加载应用程序,以便装饰器(稍后提到)将使用它)
讯享网from .celery import app as celery_app __all__ = ('celery_app',) 

App中创建workers

  • public/tasks.py(celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!)
# celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!! from celery import shared_task import time import logging log = logging.getLogger("django") @shared_task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名 def send_sms(mobile): """发送短信""" print("向%s发送短信.." % mobile) time.sleep(10) print("向%s发送短信完成" % mobile) return "send_email OK" @shared_task def send_email(email): print("向%s发送邮件..." % email) time.sleep(5) print("向%s发送邮件完成" % email) return "send_sms ok" 

启动celery workers

讯享网celery -A mycelery.main worker --loglevel=info -------------- celery@cuiliangdeAir v5.2.3 (dawn-chorus) --- * ----- -- * ---- macOS-12.2.1-x86_64-i386-64bit 2022-03-14 11:01:48 - * --- * --- -  ---------- [config] -  ---------- .> app: mycelery:0x103b30760 -  ---------- .> transport: redis://:@127.0.0.1:6379/3 -  ---------- .> results: redis://:@127.0.0.1:6379/4 - * --- * --- .> concurrency: 4 (prefork) -- * ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- * ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . DrfTest.celery.debug_task . public.tasks.send_email . public.tasks.send_sms [2022-03-14 11:01:49,103: INFO/MainProcess] Connected to redis://:@127.0.0.1:6379/3 [2022-03-14 11:01:49,118: INFO/MainProcess] mingle: searching for neighbors [2022-03-14 11:01:50,162: INFO/MainProcess] mingle: all alone [2022-03-14 11:01:50,251: INFO/MainProcess] celery@cuiliangdeAir ready. 
  • 从控制台打印可知,celery已扫描到public下的异步任务,并自动注册了。

创建DRF broker

  • public/urls.py(路由)
from rest_framework import routers from public import views from django.urls import path app_name = "public" urlpatterns = [ path('sendSms/', views.SendSmsAPIView.as_view()), # 异步发送短信接口 path('sendEmail/', views.SendEmailAPIView.as_view()), # 定时发送邮件接口 path('getTaskResult/<str:task_id>/', views.GetTaskResultAPIView.as_view()), # 获取异步任务执行结果 ] router = routers.DefaultRouter() urlpatterns += router.urls 
  • public/views.py(视图)
讯享网import time from datetime import datetime import zoneinfo from celery.result import AsyncResult from django.shortcuts import render from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.views import APIView from public.tasks import send_sms, send_email from django.conf import settings tz = zoneinfo.ZoneInfo(settings.TIME_ZONE) def apiDoc(request): """ API 接口文档 """ return render(request, 'doc.html') class SendSmsAPIView(APIView): """ celery异步发送短信 """ @staticmethod def post(request): phone = request.data.get('phone') print(phone) result = send_sms.delay(phone) return Response({ 
   'msg': 'success', 'task_id': result.id}, status=status.HTTP_200_OK) class SendEmailAPIView(APIView): """ celery定时发送邮件 """ @staticmethod def post(request): time_str = request.data.get('time') email = request.data.get('email') date_time = time.strptime(time_str, "%Y-%m-%d %H:%M:%S") run_time = datetime(date_time.tm_year, date_time.tm_mon, date_time.tm_mday, date_time.tm_hour, date_time.tm_min, date_time.tm_sec).astimezone(tz) result = send_email.apply_async(args=[email, ], eta=run_time) return Response({ 
   'msg': 'success', 'task_id': result.id}, status=status.HTTP_200_OK) class GetTaskResultAPIView(APIView): """ celery获取任务执行结果 """ @staticmethod def get(request, task_id): async_result = AsyncResult(id=task_id) msg = '' if async_result.successful(): result = async_result.get() print(result) msg = '任务执行成功!' # result.forget() # 将结果删除 elif async_result.failed(): msg = '执行失败' elif async_result.status == 'PENDING': msg = '任务等待被执行' elif async_result.status == 'RETRY': msg = '任务异常后正在重试' elif async_result.status == 'STARTED': msg = '任务已经开始被执行' return Response({ 
   'msg': msg}, status=status.HTTP_200_OK) 

请求测试

  • 执行发送短信异步任务

在这里插入图片描述

  • 获取异步任务执行结果

在这里插入图片描述

  • 定时任务执行

在这里插入图片描述

  • 查询定时任务执行结果(未到设定时间)

在这里插入图片描述

  • 查询定时任务执行结果(已到设定时间)

在这里插入图片描述

  • celery控制台查看日志
[2022-03-14 12:14:51,076: INFO/MainProcess] Task public.tasks.send_sms[09d390f2-1447-4395-aca9-3f6a4ceb0fb3] received [2022-03-14 12:14:51,077: WARNING/ForkPoolWorker-2] 向110发送短信.. [2022-03-14 12:15:01,079: WARNING/ForkPoolWorker-2] 向110发送短信完成 [2022-03-14 12:15:01,097: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[09d390f2-1447-4395-aca9-3f6a4ceb0fb3] succeeded in 10.071s: 'send_email OK' [2022-03-14 12:18:06,084: INFO/MainProcess] Task public.tasks.send_email[f33b2a22-2b3f-4409-ae3c-692ccea78bc7] received [2022-03-14 12:20:00,203: WARNING/ForkPoolWorker-2] 向发送邮件... [2022-03-14 12:20:05,206: WARNING/ForkPoolWorker-2] 向发送邮件完成 [2022-03-14 12:20:05,224: INFO/ForkPoolWorker-2] Task public.tasks.send_email[f33b2a22-2b3f-4409-ae3c-692ccea78bc7] succeeded in 5.0568s: 'send_sms ok' 

django-celery-results

安装与配置

  • 安装软件包
讯享网pip install django-celery-results 
  • settings.py中注册app
INSTALLED_APPS = ( ..., 'django_celery_results', ) # 使用django数据库作为worker执行结果存储 CELERY_RESULT_BACKEND = 'django-db' 
  • 执行数据库迁移建表
讯享网python manage.py migrate 

启动与验证

  • 启动celery worker
 celery -A DrfTest worker -l INFO 
  • 查看admin异步任务记录

在这里插入图片描述

django-celery-results扩展

既然使用django-celery-results可以将所有异步任务执行结果存储起来,那么就可以根据这一特点做一写扩展功能开发。例如通过API接口获取当前所有异步任务执行结果

  • 源码分析

查看django-celery-results库的模型定义(venv/lib/python3.10/site-packages/django_celery_results/models.py)

讯享网class TaskResult(models.Model): """Task result/status.""" task_id = models.CharField( max_length=getattr( settings, 'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH', 255 ), unique=True, verbose_name=_('Task ID'), help_text=_('Celery ID for the Task that was run')) periodic_task_name = models.CharField( null=True, max_length=255, verbose_name=_('Periodic Task Name'), help_text=_('Name of the Periodic Task which was run')) task_name = models.CharField( null=True, max_length=getattr( settings, 'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH', 255 ), verbose_name=_('Task Name'), help_text=_('Name of the Task which was run')) task_args = models.TextField( null=True, verbose_name=_('Task Positional Arguments'), help_text=_('JSON representation of the positional arguments ' 'used with the task')) task_kwargs = models.TextField( null=True, verbose_name=_('Task Named Arguments'), help_text=_('JSON representation of the named arguments ' 'used with the task')) status = models.CharField( max_length=50, default=states.PENDING, choices=TASK_STATE_CHOICES, verbose_name=_('Task State'), help_text=_('Current state of the task being run')) worker = models.CharField( max_length=100, default=None, null=True, verbose_name=_('Worker'), help_text=_('Worker that executes the task') ) content_type = models.CharField( max_length=128, verbose_name=_('Result Content Type'), help_text=_('Content type of the result data')) content_encoding = models.CharField( max_length=64, verbose_name=_('Result Encoding'), help_text=_('The encoding used to save the task result data')) result = models.TextField( null=True, default=None, editable=False, verbose_name=_('Result Data'), help_text=_('The data returned by the task. ' 'Use content_encoding and content_type fields to read.')) date_created = models.DateTimeField( auto_now_add=True, verbose_name=_('Created DateTime'), help_text=_('Datetime field when the task result was created in UTC')) date_done = models.DateTimeField( auto_now=True, verbose_name=_('Completed DateTime'), help_text=_('Datetime field when the task was completed in UTC')) traceback = models.TextField( blank=True, null=True, verbose_name=_('Traceback'), help_text=_('Text of the traceback if the task generated one')) meta = models.TextField( null=True, default=None, editable=False, verbose_name=_('Task Meta Information'), help_text=_('JSON meta information about the task, ' 'such as information on child tasks')) objects = managers.TaskResultManager() class Meta: """Table information.""" ordering = ['-date_done'] verbose_name = _('task result') verbose_name_plural = _('task results') # Explicit names to solve https://code.djangoproject.com/ticket/33483 indexes = [ models.Index(fields=['task_name'], name='django_cele_task_na_08aec9_idx'), models.Index(fields=['status'], name='django_cele_status_9b6201_idx'), models.Index(fields=['worker'], name='django_cele_worker_d54dd8_idx'), models.Index(fields=['date_created'], name='django_cele_date_cr_f04a50_idx'), models.Index(fields=['date_done'], name='django_cele_date_do_f59aad_idx'), ] def as_dict(self): return { 
    'task_id': self.task_id, 'task_name': self.task_name, 'task_args': self.task_args, 'task_kwargs': self.task_kwargs, 'status': self.status, 'result': self.result, 'date_done': self.date_done, 'traceback': self.traceback, 'meta': self.meta, 'worker': self.worker } def __str__(self): return '<Task: {0.task_id} ({0.status})>'.format(self) 

分析发现该模型字段存储了异步任务执行的所有结果详细信息,但是模型字段过多,我们只需要提取其中关键的几个字段信息即可

  • public/serializers.py(自定义序列化器,只提取几个关键字段信息)
from rest_framework import serializers from django_celery_results.models import TaskResult class CeleryTaskSerializer(serializers.ModelSerializer): class Meta: model = TaskResult fields = ['task_id', 'task_name', 'date_done', 'status', 'worker'] 
  • public/urls.py(定义路由,由于是对整个模型做只读操作,所以只读视图集即可)
讯享网from rest_framework import routers from public import views from django.urls import path from public.views import MyTokenObtainPairView app_name = "public" urlpatterns = [ path('getTaskResultAll/', views.GetTaskResultReadOnlyModelViewSet.as_view({ 
   'get': 'list'})) # 获取所有异步任务执行结果 ] router = routers.DefaultRouter() urlpatterns += router.urls 
  • public/views.py(定义视图,使用ReadOnlyModelViewSet)
from public.serializers import CeleryTaskSerializer from django_celery_results.models import TaskResult class GetTaskResultReadOnlyModelViewSet(viewsets.ReadOnlyModelViewSet): """ celery获取全部任务执行结果 """ queryset = TaskResult.objects.all() serializer_class = CeleryTaskSerializer 
  • 访问验证

在这里插入图片描述

django-celery-beat

安装与配置

  • 安装软件包
讯享网pip install django-celery-beat 
  • 修改settings配置
INSTALLED_APPS = [ 'django_celery_beat', ] # 使用django_celery_beat插件用来动态配置任务 CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' 
  • 数据库迁移建表
讯享网python manage.py migrate 

启动与验证

  • 启动 Celery worker 服务
celery -A DrfTest worker -l INFO 
  • 启动beat 调度器
讯享网celery -A DrfTest beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler 
  • django admin生成的模型用途如下
clocked # 指定时间运行的计划任务 Crontabs # 与linux crontab计划任务时间格式一致。 Intervals # 以特定间隔(例如,每5秒)运行的计划。 Periodic tasks # 当前所有计划任务模型列表 solar event # 按照日出日落设置定时任务 
  • django admin创建crontab格式定时任务

在这里插入图片描述

在这里插入图片描述

  • 控制台查看日志
讯享网[2022-03-14 17:40:00,103: INFO/MainProcess] Task public.tasks.send_email[a2a98f8f-2fb9-4620-a660-a106e863f106] received [2022-03-14 17:40:00,122: WARNING/ForkPoolWorker-2]123发送邮件... [2022-03-14 17:40:05,127: WARNING/ForkPoolWorker-2]123发送邮件完成 [2022-03-14 17:40:05,223: INFO/ForkPoolWorker-2] Task public.tasks.send_email[a2a98f8f-2fb9-4620-a660-a106e863f106] succeeded in 5.9804s: 'send_sms ok' 
  • 创建Interval时间间隔类型定时任务

在这里插入图片描述

  • 控制台查看日志
[2022-03-14 21:19:06,010: INFO/MainProcess] Task public.tasks.send_sms[-0a5a-4ed9-a499-fc1e706cbcee] received [2022-03-14 21:19:06,034: WARNING/ForkPoolWorker-2] 向王麻子发送短信.. [2022-03-14 21:19:15,898: INFO/MainProcess] Task public.tasks.send_sms[c6346cec-c464-4e5d-ae07-db3ac83fea78] received [2022-03-14 21:19:15,911: WARNING/ForkPoolWorker-4] 向王麻子发送短信.. [2022-03-14 21:19:16,040: WARNING/ForkPoolWorker-2] 向王麻子发送短信完成 [2022-03-14 21:19:16,093: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[-0a5a-4ed9-a499-fc1e706cbcee] succeeded in 10.0304s: 'send_email OK' [2022-03-14 21:19:25,898: INFO/MainProcess] Task public.tasks.send_sms[0-b7ca-4575-9000-9a2bde] received [2022-03-14 21:19:25,899: WARNING/ForkPoolWorker-2] 向王麻子发送短信.. [2022-03-14 21:19:25,917: WARNING/ForkPoolWorker-4] 向王麻子发送短信完成 
  • django-celery-results查看执行记录

在这里插入图片描述

django-celery-beat扩展

django-celery-beat库的特点除了使用django admin可以创建修改定时任务外,还具备热加载的能力。当我们更新定时任务的配置后,无需重启celery workers,即可实现对定时任务的动态管理功能。
通过查看django-celery-beat的源码发现,我们只需要对PeriodicTask(定时任务列表)、IntervalSchedule(时间间隔列表)、CrontabSchedule(crontab定时任务)三个模型做管理,即可实现定时任务动态管理功能(模型文件路径venv/lib/python3.10/site-packages/django_celery_beat/models.py)
整个项目的开发思路是后端提供interval和crontab表达式的查询和新增接口,还有task定时任务的增删改查接口。当用户新增定时任务时,先填写定时任务表达式,获取到id后在前端显示,然后填写定时任务信息,创建一条定时任务记录。当用户需要暂定、修改定时任务时,直接调用put接口修改即可。删除定时任务也是一样的,调用delete接口即可完成操作。

基于crontab的定时任务

crontab定时任务的动态管理思路是,当新增定时任务时,先新增crontab表达式,获取到crontab表达式的id。然后再创建定时任务,使用crontab表达式id外键关联即可。对于CrontabSchedule,我们开发新增和查询接口使用ViewSet和APIView均可,此处为了方便演示,直接使用ViewSet。对于PeriodicTask,需要提供增删改查接口,我们使用ModelViewSet。

  • public/urls.py(路由,一条是新增crontab的路由,一条是新增task任务的路由)
讯享网from rest_framework import routers from public import views from django.urls import path app_name = "public" urlpatterns = [ path('crontab/', views.CrontabAPIView.as_view()), # crontab表达式 ] router = routers.DefaultRouter() router.register('task', views.TaskModelViewSet, 'userInfo') # 定时任务 urlpatterns += router.urls 
  • public/serializers.py(模型序列化器,主要是对PeriodicTask模型进行字段过滤)
class PeriodicTaskSerializer(serializers.ModelSerializer): """ 定时任务序列化器 """ last_run_at = serializers.DateTimeField(read_only=True) class Meta: model = PeriodicTask fields = ['id', 'name', 'task', 'args', 'last_run_at', 'enabled', 'crontab', 'interval'] 
  • public/views.py(视图,Crontab使用APIView,提供查询和新增接口。Task使用ModelViewSet,提供增删改查接口)
讯享网import time from datetime import datetime from zoneinfo import ZoneInfo from celery.result import AsyncResult from django.db import transaction from django.shortcuts import render from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.views import APIView from rest_framework_simplejwt.views import TokenObtainPairView from public.models import UserDemo from public.serializers import UserDemoSerializer, MyTokenObtainPairSerializer, CeleryTaskSerializer, \ PeriodicTaskSerializer from public.tasks import send_sms, send_email from django_celery_results.models import TaskResult def apiDoc(request): """ API 接口文档 """ return render(request, 'doc.html') class CrontabAPIView(APIView): """ crontab表达式 """ @staticmethod def get(request): # 查询crontab表达式 crontab_id = request.query_params.get('crontab_id') expression = CrontabSchedule.objects.get(id=crontab_id).__str__().split(' ')[:5] return Response({ 
   'id': crontab_id, 'expression': expression}, status=status.HTTP_200_OK) @staticmethod def post(request): # 新增crontab表达式 expression = request.data.get('expression') expression.split(' ') schedule, _ = CrontabSchedule.objects.get_or_create( minute=expression.split(' ')[0], hour=expression.split(' ')[1], day_of_month=expression.split(' ')[2], month_of_year=expression.split(' ')[3], day_of_week=expression.split(' ')[4], timezone=ZoneInfo("Asia/Shanghai") ) return Response({ 
   'crontab_id': schedule.id}, status=status.HTTP_200_OK) class TaskModelViewSet(viewsets.ModelViewSet): """ 定时任务增删改查 """ queryset = PeriodicTask.objects.all() serializer_class = PeriodicTaskSerializer 
  • 启动celery
celery -A DrfTest worker -l INFO celery -A DrfTest beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler 
  • 新增crontab表达式测试(使用linux crontab格式,每天9点30分执行定时任务)

在这里插入图片描述

  • 根据id查询crontab表达式

在这里插入图片描述

  • 新增定时任务

在这里插入图片描述

  • admin查看新增的定时任务

在这里插入图片描述

  • 到了9点30分,我们查看控制台日志
讯享网[2022-03-15 09:30:00,071: INFO/MainProcess] Task public.tasks.send_sms[a4e35e52-d763-46b8-b3fe-b44501d6a63a] received [2022-03-15 09:30:00,084: WARNING/ForkPoolWorker-2] 向adc发送短信.. [2022-03-15 09:30:10,088: WARNING/ForkPoolWorker-2] 向adc发送短信完成 [2022-03-15 09:30:10,125: INFO/ForkPoolWorker-2] Task public.tasks.send_sms[a4e35e52-d763-46b8-b3fe-b44501d6a63a] succeeded in 10.0907s: 'send_email OK' 
  • api接口查询任务详情

在这里插入图片描述

至此,通过api接口新增和查询任务接口演示验证完成,修改和删除接口由于篇幅有限,就不做演示了。

基于interval的定时任务

  • public/urls.py(路由,一条是新增crontab的路由,一条是新增task任务的路由)
from rest_framework import routers from public import views from django.urls import path app_name = "public" urlpatterns = [ path('crontab/', views.CrontabAPIView.as_view()), # crontab表达式 path('interval/', views.IntervalAPIView.as_view()) # interval表达式 ] router = routers.DefaultRouter() router.register('task', views.TaskModelViewSet, 'userInfo') # 定时任务 urlpatterns += router.urls 
  • public/views.py(视图,Crontab使用APIView,提供查询和新增接口。Task保持不变)
讯享网import time from datetime import datetime from zoneinfo import ZoneInfo from celery.result import AsyncResult from django.db import transaction from django.shortcuts import render from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.views import APIView from rest_framework_simplejwt.views import TokenObtainPairView from public.models import UserDemo from public.serializers import UserDemoSerializer, MyTokenObtainPairSerializer, CeleryTaskSerializer, \ PeriodicTaskSerializer from public.tasks import send_sms, send_email from django_celery_results.models import TaskResult def apiDoc(request): """ API 接口文档 """ return render(request, 'doc.html') class IntervalAPIView(APIView): """ Interval表达式 """ @staticmethod def get(request): # 查询Interval表达式 interval_id = request.query_params.get('interval_id') expression = IntervalSchedule.objects.get(id=interval_id).__str__() return Response({ 
   'id': interval_id, 'expression': expression}, status=status.HTTP_200_OK) @staticmethod def post(request): # 新增Interval表达式 every = request.data.get('every') kind = request.data.get('kind') period = None if kind == 'DAYS': # 固定间隔天数 period = IntervalSchedule.DAYS elif kind == 'HOURS': # 固定间隔小时数 period = IntervalSchedule.HOURS elif kind == 'MINUTES': # 固定间隔分钟数 period = IntervalSchedule.MINUTES elif kind == 'SECONDS': # 固定间隔秒数 period = IntervalSchedule.SECONDS else: # 固定间隔微秒 period = IntervalSchedule.MICROSECONDS schedule, created = IntervalSchedule.objects.get_or_create(every=every, period=period) return Response({ 
   'interval_id': schedule.id}, status=status.HTTP_200_OK) class CrontabAPIView(APIView): …… class TaskModelViewSet(viewsets.ModelViewSet): …… 
  • 新增interval表达式

在这里插入图片描述

  • 查询interval表达式

在这里插入图片描述

  • 新增interval类型定时任务

在这里插入图片描述

  • admin查看定时任务信息

在这里插入图片描述

  • celery查看定时任务日志
[2022-03-15 09:39:05,918: INFO/MainProcess] Task public.tasks.send_email[bc042f63-81c0-42a5-959c-b0c80] received [2022-03-15 09:39:05,919: WARNING/ForkPoolWorker-2] 向mvp发送邮件... [2022-03-15 09:39:10,923: WARNING/ForkPoolWorker-2] 向mvp发送邮件完成 [2022-03-15 09:39:10,931: INFO/ForkPoolWorker-2] Task public.tasks.send_email[bc042f63-81c0-42a5-959c-b0c80] succeeded in 5.078s: 'send_sms ok' [2022-03-15 09:39:25,921: INFO/MainProcess] Task public.tasks.send_email[4d20b737-60d9-4a19-8de2-234f1425e7b1] received [2022-03-15 09:39:25,923: WARNING/ForkPoolWorker-2] 向mvp发送邮件... [2022-03-15 09:39:30,929: WARNING/ForkPoolWorker-2] 向mvp发送邮件完成 [2022-03-15 09:39:30,960: INFO/ForkPoolWorker-2] Task public.tasks.send_email[4d20b737-60d9-4a19-8de2-234f1425e7b1] succeeded in 5.0825s: 'send_sms ok' 

基于interval的定时任务创建完成,其他修改,删除接口大家可以自行测试,演示验证完成。

RabbitMQ使用

在大规模应用场景下,推荐使用RabbitMQ作为代理,因为它功能完整、稳定,也是Celery推荐的消息队列。

安装

讯享网docker pull rabbitmq:management docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=CuiLiang@0302 rabbitmq:management 
  • –hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);

celery使用rabbitmq

  • 安装pika
pip install pika 
  • 指定rabbitmq作为celery的队列
讯享网broker_url ="amqp://guest:guest@127.0.0.1:5672" 

参考文档

  • celery文档

https://docs.celeryproject.org/en/stable/index.html

  • django-celery-results文档:

https://github.com/celery/django-celery-results

  • django-celery-beat文档:

https://github.com/celery/django-celery-beat

查看更多

微信公众号

微信公众号同步更新,欢迎关注微信公众号第一时间获取最近文章。在这里插入图片描述

博客网站

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn

小讯
上一篇 2025-03-20 23:24
下一篇 2025-01-11 19:14

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/12753.html