场景:在公司通过celery写了几个定时的任务,但是过了一段时间我发现这些定时任务的日志里面竟然出现的重复的结果,定时任务大概就是定期的去统计数据什么的,类似报表那样,可是今天我发现竟然有两条一模一样的报表,于是去查了一下celery的运行日志,发现同一时间内(1s内)竟然同时发送了两次的任务,也就是同时产生了两个worker。然后就出现了两条一毛一样的结果,还写到了我的报表里面。
查了一下官方celery的文档(地址:http://docs.celeryproject.org/en/latest/index.html)
终于发现了问题:原来是我任务的时间间隔不够长
打个比方:A定时任务5分钟执行一次统计报表,但是A任务遇到数据量大的报表时,统计时间超过了5分钟,这时celery又接收到了新的下一个周期的A任务,于是celery又去统计之前的数据,这就造成了数据重复。
于是我想把时间给他改长一点,但是这不是解决问题的方法,万一哪一天来了个超大的数据,又超时了咋办?
于是我就去google一次,发现git上面有一个库可以解决这个问题(celery_one的git网址)
点进去看了一下,简单易操作,于是就去测试了一下,发现真的可以解决重复执行的问题!!!
大概流程就是:
先安装celery_one(注意celery的版本最好是大于4.0的)
pip install -U celery_once
ps:-U 是–upgrade 的简写。就是把所有包升级到最新版
拿例子来说明
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
###一般之前的配置没有这个,需要添加上
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
###在原本没有参数的里面加上base
@celery.task(base=QueueOnce)
def slow_task():
sleep(30)
return "Done!"
但是这样其实会报错,报的错意思就是无法执行xxx的celery任务,为了忽略这个错误(为什么当时看到这个错误的时候超开心[滑稽])
改成如下
###新添加
from celery_once import AlreadyQueued
from celery import Celery
from celery_once import QueueOnce
from time import sleep
celery = Celery('tasks', broker='amqp://guest@localhost//')
celery.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
###在原本的参数里面加上了once这个参数
@celery.task(base=QueueOnce, once={'graceful': True})
def slow_task():
sleep(30)
return "Done!"
就可以避免出现报错啦(▽)