Celery ETA任务重复提交的问题解决

2023年2月3日12:29:27

2018.09.28更新

pypi上有模块celery_once可解决重复提交和队列管理的问题。暂未实践。


问题描述

因为业务需求,使用了celery配置eta(estimated time of arrival)的功能,传入datetime类型的值,让任务在具体的某个时间执行。
在Django根目录执行查看任务计划,看到了许多重复提交到worker的eta任务。
查看语句

celery -A <celery.py文件所在文件夹> inspect scheduled

task_id均相同,排除是因为代码逻辑的问题重复提交.

官网文档

This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

简单来说, celery对ETA/countdown/retry等要求具体时间执行的任务支持并不完整. 指定执行时间,与celery自身的失效重传机制有所冲突.
celery在没有收到任务被worker正常执行的时候就会发起重传.我项目中的ETA任务往往是在24小时之后才执行, celery的默认重传timeout是1个小时(Visibility timeout).因此理论上在ETA时间没有到之前,celery每过一个小时便重复提交一个任务给worker

文档原文

Visibility timeout
If a task isn’t acknowledged within the Visibility Timeout the task will be redelivered to another worker and executed.

This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.

So you have to increase the visibility timeout to match the time of the longest ETA you’re planning to use.

Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.

Periodic tasks won’t be affected by the visibility timeout, as this is a concept separate from ETA/countdown.

You can increase this timeout by configuring a transport option with the same name:

app.conf.broker_transport_options = {‘visibility_timeout’: 43200}
The value must be an int describing the number of seconds.

解决方法

在django的setting文件当中手动配置visibility_timeout时间, 值为ETA时间的最大间隔.

CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 172800}

##结果
再次查看celery schedule,确实重复任务消失了. 但理论上,这种设置会让celery的重传机制失效, 可能在进程间通讯发生问题时,重传检测机制不能及时发现.

发现问题(2018.08.28更新)

第二天查看celery计划状态, 发现重复任务依然存在,说明在完成配置之后,依然被重复提交了.

当前解决方案

为tasks运行时加锁

  1. 使用redis作为django的cache.

    # setting.py
    ....
    CACHES = {
        "default": {
            "BACKEND": "django_redis.cache.RedisCache",
            "LOCATION": "redis://127.0.0.1:6379/1",
            'TIMEOUT': 7 * 24 * 60 * 60,
            "OPTIONS": {
                "CLIENT_CLASS": "django_redis.client.DefaultClient",
            }
        }
    }
    ....
    
  2. 在出现重复提交的任务中加锁.
    2.1 使用唯一标识为key(如task+操作对象object_id),配合redis的原子操作SETNX(SET IF NOT EXIST)执行前判断是否在cache中存在,已存在则tasks直接返回,不执行业务逻辑.
    2.2 在Django-redis中使用方法为cache.set(key, value, timeout, nx=True).
    2.3 若不存在,上述操作完成key:value的写入并返回True, 说明tasks第一次执行.
    大致代码如下:

# tasks.py
def example_task(object_id):
	flag = 'example_task' + str(object_id)
	nx_lock = cache.set(flag, 1, 60, nx=True) #过期时间为60秒
	if not nx_lock:
		print("task has been locked")
		return
	....
	....

这样可以使业务不受重复任务的影响,但仍然没有从根本上解决问题.有更好的解决方法希望分享, 谢谢.

  • 作者:chrispink_yang
  • 原文链接:https://blog.csdn.net/m0_37422289/article/details/82113317
    更新时间:2023年2月3日12:29:27 ,共 2403 字。