Celery介绍与使用

Celery介绍与使用一 Celery 介绍 Celery 是一个功能完备即插即用的任务队列 它使得我们不需要考虑复杂的问题 使用非常简单 celery 看起来似乎很庞大 本章节我们先对其进行简单的了解 然后再去学习其他一些高级特性 celery 适用异步处理问题 当发送邮件 或者文件上传 图像处理等等一些比较耗时的操作

大家好,我是讯享网,很高兴认识大家。

一.Celery介绍

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,
使用非常简单。celery看起来似乎很庞大,
本章节我们先对其进行简单的了解,然后再去学习其他一些高级特性。
celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,
我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 celery的特点是:
* 简单,易于使用和维护,有丰富的文档。
* 高效,单个celery进程每分钟可以处理数百万个任务。
* 灵活,celery中几乎每个部分都可以自定义扩展。
celery非常易于集成到一些web开发框架中

二.Celery库和schedule库和APScheduler库对比

(Github Star21.2K)

  • celery 是经过生产级考量,但遇到问题,排查时候,比较坑,它的优势重在异步队列,虽也可用在定时任务。通常如果项目中已有Celery在使用,而且不需要动态添加定时任务时可以考虑使用;
    文档(http://www.celeryproject.org/)

(Github Star5.1K)

  • apscheduler 专注于定时任务,功能丰定,文档写得还算全,但没有对各个场景的详细说明。一些具体用法,还得去源代码,怎么调用,比如redis作为store。在实际使用过程中拥有最大的灵活性,可以满足我们的大部分定时任务的相关需求;
    文档(https://apscheduler.readthedocs.io/en/latest/)

(Github Star10.8K)

  • schedule 类似于linux的cron,简单好用。schedule非常轻量级,使用简单,但是不支持任务的持久化,也无法动态添加删除任务,所以主要用于简单的小型应用。
    文档(https://github.com/dbader/schedule)

三.安装Celery

pip install celery==5.3.0 

讯享网
celery不支持在windows下运行任务,需要借助eventlet来完成
讯享网pip install eventlet==0.33.3 
Celery的默认broker(消息中间件)是RabbitMQ, 仅需配置一行就可以
使用Redis做broker(消息中间件)也可以 本地安装redis数据库(推荐使用 RabbitMQ)
pip install redis==4.5.5 

四.基本使用

4.1创建应用

新建tasks.py文件

讯享网from celery import Celery import time # 我们这里案例使用redis作为broker app = Celery('demo', backend='redis://@127.0.0.1:6379/2', broker='redis://@127.0.0.1:6379/1') # 创建任务函数 @app.task def my_task(a,b): print("任务函数正在执行....100s") time.sleep(100) return a+b @app.task def my_task2(a,b): print("任务函数2正在执行....100s") time.sleep(100) return a+b 
#启动Celery celery -A tasks worker --loglevel=info -P eventlet -c 10 # -c是协程的数量,生产环境可以用1000 

在这里插入图片描述
讯享网

这时候出现一个报警,添加参数即可
讯享网app = Celery('demo', backend='redis://@127.0.0.1:6379/2', broker='redis://@127.0.0.1:6379/1', broker_connection_retry_on_startup=True) 

官方文档这里会讲

https://docs.celeryq.dev/en/latest/_modules/celery/worker/consumer/consumer.html

在这里插入图片描述

启动成功

在这里插入图片描述

4.2调用任务

讯享网from tasks import my_task,my_task2 ret=my_task.delay(1,2) ret2=my_task2.delay(1,2) #返回任务id print(ret.id) #b6782be6-2e58-49b1-baca-d3 print(ret2.id) #9032fdad-a0af-40ce-8918-38bc28b4c882 

在这里插入图片描述

4.3存储结果

如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。

创建result.py

from celery.result import AsyncResult from tasks import app import time #任务id与4.2对应的 async_result=AsyncResult(id='b6782be6-2e58-49b1-baca-d3',app=app) while True: if async_result.successful(): result=async_result.get() print("result",result) elif async_result.failed(): print("执行失败") elif async_result.status=='PENDING': print("任务等待中被执行") elif async_result.status == 'RETRY': print("任务异常后正在重试") elif async_result.status == 'STARTED': print("任务已经开始被执行") time.sleep(1) 

更多关于result对象信息,请参阅下列网址:
http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result

五.Celery的配置文件(优化版)

由于Celery的配置信息比较多,通常情况下,我们会创建一个Celery的配置文件, 这里命名为 celery_config.py
在这里插入图片描述
step1:新增文件celery添加如下代码:

讯享网from celery import Celery # include:导入指定的任务模块 # 这一次创建 app,并没有直接指定 broker(消息中间件来接收和发送任务消息) 和 backend(存储结果)。而是在配置文件中。 app = Celery( 'demo', include=[ 'celery_tasks.task1', 'celery_tasks.task2', ] ) # 通过Celery 实例加载配置模块 app.config_from_object( 'celery_tasks.celery_config', ) 

step2:在celery_tasks文件夹下新建一个celery_config.py文件(Celery的配置文件) 添加如下代码:

# 官方配置文档:查询每个配置项的含义。 # <http://docs.celeryproject.org/en/latest/userguide/configuration.html> # broker(消息中间件来接收和发送任务消息) broker_url = 'redis://localhost:6379/1' # backend(存储worker执行的结果) result_backend = 'redis://localhost:6379/2' # 设置时间参照,不设置默认使用的UTC时间 timezone = 'Asia/Shanghai' # 指定任务的序列化 task_serializer='json' # 指定执行结果的序列化 result_serializer='json' broker_connection_retry_on_startup=True 

启动

讯享网celery -A celery_tasks worker --loglevel=info -P eventlet -c 10 

step3:在celery_tasks 文件夹下新建task1.py和task2.py文件

import time from celery_tasks.celery import app @app.task def send_msg(name): print("完成向%s发生短信任务"%name) time.sleep(5) return "短信完成!" 
讯享网import time from celery_tasks.celery import app @app.task def send_email(name): print("完成向%s发生邮件任务"%name) time.sleep(5) return "邮件完成!" 

step4:增加produce_task.py文件,代码如下,分别发送执行任务消息到broker

from celery_tasks.task1 import send_msg from celery_tasks.task2 import send_email result=send_msg.delay('yuan') print(result.id) result=send_email.delay('yuan') print(result.id)#30ba1b0d-5d0f-46b0-89a9-f 

step5:增加check_result.py文件,代码如下

讯享网from celery.result import AsyncResult from celery_tasks.celery import app async_result=AsyncResult(id='30ba1b0d-5d0f-46b0-89a9-f',app=app) if async_result.successful(): result=async_result.get() print("result",result) elif async_result.failed(): print("执行失败") elif async_result.status=='PENDING': print("任务等待中被执行") elif async_result.status == 'RETRY': print("任务异常后正在重试") elif async_result.status == 'STARTED': print("任务已经开始被执行") 

六.定时任务

七.监控

7.1下载

pip install flower 

7.2运行flower命令将启动一个web服务器

默认端口是http://localhost:5555,可以使用-port参数来指定其他端口

讯享网celery -A proj flower --port=8000 

未完待续…

小讯
上一篇 2025-01-29 17:28
下一篇 2025-02-19 08:32

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/52937.html