一.Celery介绍
Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,
使用非常简单。celery看起来似乎很庞大,
本章节我们先对其进行简单的了解,然后再去学习其他一些高级特性。
celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,
我们可将其异步执行,这样用户不需要等待很久,提高用户体验。 celery的特点是:
* 简单,易于使用和维护,有丰富的文档。
* 高效,单个celery进程每分钟可以处理数百万个任务。
* 灵活,celery中几乎每个部分都可以自定义扩展。
celery非常易于集成到一些web开发框架中
二.Celery库和schedule库和APScheduler库对比
(Github Star21.2K)
- celery 是经过生产级考量,但遇到问题,排查时候,比较坑,它的优势重在异步队列,虽也可用在定时任务。通常如果项目中已有Celery在使用,而且不需要动态添加定时任务时可以考虑使用;
文档(http://www.celeryproject.org/)
(Github Star5.1K)
- apscheduler 专注于定时任务,功能丰定,文档写得还算全,但没有对各个场景的详细说明。一些具体用法,还得去源代码,怎么调用,比如redis作为store。在实际使用过程中拥有最大的灵活性,可以满足我们的大部分定时任务的相关需求;
文档(https://apscheduler.readthedocs.io/en/latest/)
(Github Star10.8K)
- schedule 类似于linux的cron,简单好用。schedule非常轻量级,使用简单,但是不支持任务的持久化,也无法动态添加删除任务,所以主要用于简单的小型应用。
文档(https://github.com/dbader/schedule)
三.安装Celery
pip install celery==5.3.0
讯享网
celery不支持在windows下运行任务,需要借助eventlet来完成
讯享网pip install eventlet==0.33.3
Celery的默认broker(消息中间件)是RabbitMQ, 仅需配置一行就可以
使用Redis做broker(消息中间件)也可以 本地安装redis数据库(推荐使用 RabbitMQ)
pip install redis==4.5.5
四.基本使用
4.1创建应用
新建tasks.py文件
讯享网from celery import Celery import time # 我们这里案例使用redis作为broker app = Celery('demo', backend='redis://@127.0.0.1:6379/2', broker='redis://@127.0.0.1:6379/1') # 创建任务函数 @app.task def my_task(a,b): print("任务函数正在执行....100s") time.sleep(100) return a+b @app.task def my_task2(a,b): print("任务函数2正在执行....100s") time.sleep(100) return a+b
#启动Celery celery -A tasks worker --loglevel=info -P eventlet -c 10 # -c是协程的数量,生产环境可以用1000
这时候出现一个报警,添加参数即可
讯享网app = Celery('demo', backend='redis://@127.0.0.1:6379/2', broker='redis://@127.0.0.1:6379/1', broker_connection_retry_on_startup=True)
官方文档这里会讲
https://docs.celeryq.dev/en/latest/_modules/celery/worker/consumer/consumer.html

启动成功

4.2调用任务
讯享网from tasks import my_task,my_task2 ret=my_task.delay(1,2) ret2=my_task2.delay(1,2) #返回任务id print(ret.id) #b6782be6-2e58-49b1-baca-d3 print(ret2.id) #9032fdad-a0af-40ce-8918-38bc28b4c882

4.3存储结果
如果我们想跟踪任务的状态,Celery需要将结果保存到某个地方。有几种保存的方案可选:SQLAlchemy、Django ORM、Memcached、 Redis、RPC (RabbitMQ/AMQP)。
创建result.py

from celery.result import AsyncResult from tasks import app import time #任务id与4.2对应的 async_result=AsyncResult(id='b6782be6-2e58-49b1-baca-d3',app=app) while True: if async_result.successful(): result=async_result.get() print("result",result) elif async_result.failed(): print("执行失败") elif async_result.status=='PENDING': print("任务等待中被执行") elif async_result.status == 'RETRY': print("任务异常后正在重试") elif async_result.status == 'STARTED': print("任务已经开始被执行") time.sleep(1)
更多关于result对象信息,请参阅下列网址:
http://docs.celeryproject.org/en/latest/reference/celery.result.html#module-celery.result
五.Celery的配置文件(优化版)
由于Celery的配置信息比较多,通常情况下,我们会创建一个Celery的配置文件, 这里命名为 celery_config.py

step1:新增文件celery添加如下代码:
讯享网from celery import Celery # include:导入指定的任务模块 # 这一次创建 app,并没有直接指定 broker(消息中间件来接收和发送任务消息) 和 backend(存储结果)。而是在配置文件中。 app = Celery( 'demo', include=[ 'celery_tasks.task1', 'celery_tasks.task2', ] ) # 通过Celery 实例加载配置模块 app.config_from_object( 'celery_tasks.celery_config', )
step2:在celery_tasks文件夹下新建一个celery_config.py文件(Celery的配置文件) 添加如下代码:
# 官方配置文档:查询每个配置项的含义。 # <http://docs.celeryproject.org/en/latest/userguide/configuration.html> # broker(消息中间件来接收和发送任务消息) broker_url = 'redis://localhost:6379/1' # backend(存储worker执行的结果) result_backend = 'redis://localhost:6379/2' # 设置时间参照,不设置默认使用的UTC时间 timezone = 'Asia/Shanghai' # 指定任务的序列化 task_serializer='json' # 指定执行结果的序列化 result_serializer='json' broker_connection_retry_on_startup=True
启动
讯享网celery -A celery_tasks worker --loglevel=info -P eventlet -c 10
step3:在celery_tasks 文件夹下新建task1.py和task2.py文件
import time from celery_tasks.celery import app @app.task def send_msg(name): print("完成向%s发生短信任务"%name) time.sleep(5) return "短信完成!"
讯享网import time from celery_tasks.celery import app @app.task def send_email(name): print("完成向%s发生邮件任务"%name) time.sleep(5) return "邮件完成!"
step4:增加produce_task.py文件,代码如下,分别发送执行任务消息到broker
from celery_tasks.task1 import send_msg from celery_tasks.task2 import send_email result=send_msg.delay('yuan') print(result.id) result=send_email.delay('yuan') print(result.id)#30ba1b0d-5d0f-46b0-89a9-f
step5:增加check_result.py文件,代码如下
讯享网from celery.result import AsyncResult from celery_tasks.celery import app async_result=AsyncResult(id='30ba1b0d-5d0f-46b0-89a9-f',app=app) if async_result.successful(): result=async_result.get() print("result",result) elif async_result.failed(): print("执行失败") elif async_result.status=='PENDING': print("任务等待中被执行") elif async_result.status == 'RETRY': print("任务异常后正在重试") elif async_result.status == 'STARTED': print("任务已经开始被执行")
六.定时任务
七.监控
7.1下载
pip install flower
7.2运行flower命令将启动一个web服务器
默认端口是http://localhost:5555,可以使用-port参数来指定其他端口
讯享网celery -A proj flower --port=8000
未完待续…

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/52937.html