File tests.patch of Package python-celery
From 9e324caaa6b175d8e51d3582378b78757e66a12d Mon Sep 17 00:00:00 2001
From: dobosevych <dobosevych@users.noreply.github.com>
Date: Thu, 14 Apr 2022 18:22:33 +0300
Subject: [PATCH] Integration test fix (#7460)
* Integration debugging
* Integration debugging
* Integration debugging
* Commented tasks that aren't working
* Fixed test_inspect.py
* Fixed serialization test_canvas.py
* Request fixes
* Setup full pipeline
* Setup full pipeline
* Setup full pipeline
* Setup python-package.yml
* Setup python-package.yml
* Added 3.10 to integration tests
* test_task.py fixed
* test_generator fixed
* Added parametrization to test_generation
* fixed test_generator
* Reverted encoding in test_canvas.py
* Rollback codecov
* Retries now respect additional options.
Previously, expires and other options were not merged with
the current task's options. This commit fixes the issue.
Co-authored-by: Omer Katz <omer.katz@kcg.tech>
---
celery/app/task.py | 2 +-
celery/canvas.py | 13 +++++---
celery/contrib/pytest.py | 2 +-
celery/worker/request.py | 2 +-
requirements/test-integration.txt | 1 +
t/integration/tasks.py | 7 +++--
t/integration/test_canvas.py | 19 ++++++------
t/integration/test_tasks.py | 11 +++++--
9 files changed, 79 insertions(+), 24 deletions(-)
Index: celery-5.2.6/celery/app/task.py
===================================================================
--- celery-5.2.6.orig/celery/app/task.py
+++ celery-5.2.6/celery/app/task.py
@@ -605,7 +605,7 @@ class Task:
request = self.request if request is None else request
args = request.args if args is None else args
kwargs = request.kwargs if kwargs is None else kwargs
- options = request.as_execution_options()
+ options = {**request.as_execution_options(), **extra_options}
delivery_info = request.delivery_info or {}
priority = delivery_info.get('priority')
if priority is not None:
Index: celery-5.2.6/celery/canvas.py
===================================================================
--- celery-5.2.6.orig/celery/canvas.py
+++ celery-5.2.6/celery/canvas.py
@@ -26,7 +26,7 @@ from celery.utils import abstract
from celery.utils.collections import ChainMap
from celery.utils.functional import _regen
from celery.utils.functional import chunks as _chunks
-from celery.utils.functional import (is_list, lookahead, maybe_list, regen,
+from celery.utils.functional import (is_list, maybe_list, regen,
seq_concat_item, seq_concat_seq)
from celery.utils.objects import getitem_property
from celery.utils.text import remove_repeating_from_task, truncate
@@ -1184,9 +1184,11 @@ class group(Signature):
# next_task is None. This enables us to set the chord size
# without burning through the entire generator. See #3021.
chord_size = 0
- for task_index, (current_task, next_task) in enumerate(
- lookahead(tasks)
- ):
+ tasks_shifted, tasks = itertools.tee(tasks)
+ next(tasks_shifted, None)
+ next_task = next(tasks_shifted, None)
+
+ for task_index, current_task in enumerate(tasks):
# We expect that each task must be part of the same group which
# seems sensible enough. If that's somehow not the case we'll
# end up messing up chord counts and there are all sorts of
@@ -1212,6 +1214,7 @@ class group(Signature):
if p and not p.cancelled and not p.ready:
p.size += 1
res.then(p, weak=True)
+ next_task = next(tasks_shifted, None)
yield res # <-- r.parent, etc set in the frozen result.
def _freeze_gid(self, options):
@@ -1249,7 +1252,7 @@ class group(Signature):
# we freeze all tasks in the clone tasks1, and then zip the results
# with the IDs of tasks in the second clone, tasks2. and then, we build
# a generator that takes only the task IDs from tasks2.
- self.tasks = regen(x[0] for x in zip(tasks2, results))
+ self.tasks = regen(tasks2)
else:
new_tasks = []
# Need to unroll subgroups early so that chord gets the
Index: celery-5.2.6/celery/contrib/pytest.py
===================================================================
--- celery-5.2.6.orig/celery/contrib/pytest.py
+++ celery-5.2.6/celery/contrib/pytest.py
@@ -88,7 +88,7 @@ def celery_session_worker(
for module in celery_includes:
celery_session_app.loader.import_task_module(module)
for class_task in celery_class_tasks:
- celery_session_app.tasks.register(class_task)
+ celery_session_app.register_task(class_task)
with worker.start_worker(celery_session_app,
pool=celery_worker_pool,
**celery_worker_parameters) as w:
Index: celery-5.2.6/celery/worker/request.py
===================================================================
--- celery-5.2.6.orig/celery/worker/request.py
+++ celery-5.2.6/celery/worker/request.py
@@ -155,7 +155,7 @@ class Request:
'exchange': delivery_info.get('exchange'),
'routing_key': delivery_info.get('routing_key'),
'priority': properties.get('priority'),
- 'redelivered': delivery_info.get('redelivered'),
+ 'redelivered': delivery_info.get('redelivered', False),
}
self._request_dict.update({
'properties': properties,
Index: celery-5.2.6/requirements/test-integration.txt
===================================================================
--- celery-5.2.6.orig/requirements/test-integration.txt
+++ celery-5.2.6/requirements/test-integration.txt
@@ -3,3 +3,4 @@
-r extras/auth.txt
-r extras/memcache.txt
pytest-rerunfailures>=6.0
+git+https://github.com/celery/kombu.git
Index: celery-5.2.6/t/integration/tasks.py
===================================================================
--- celery-5.2.6.orig/t/integration/tasks.py
+++ celery-5.2.6/t/integration/tasks.py
@@ -197,16 +197,17 @@ def retry(self, return_value=None):
raise self.retry(exc=ExpectedException(), countdown=5)
-@shared_task(bind=True, expires=60.0, max_retries=1)
-def retry_once(self, *args, expires=60.0, max_retries=1, countdown=0.1):
+@shared_task(bind=True, expires=120.0, max_retries=1)
+def retry_once(self, *args, expires=None, max_retries=1, countdown=0.1):
"""Task that fails and is retried. Returns the number of retries."""
if self.request.retries:
return self.request.retries
raise self.retry(countdown=countdown,
+ expires=expires,
max_retries=max_retries)
-@shared_task(bind=True, expires=60.0, max_retries=1)
+@shared_task(bind=True, max_retries=1)
def retry_once_priority(self, *args, expires=60.0, max_retries=1,
countdown=0.1):
"""Task that fails and is retried. Returns the priority."""
Index: celery-5.2.6/t/integration/test_canvas.py
===================================================================
--- celery-5.2.6.orig/t/integration/test_canvas.py
+++ celery-5.2.6/t/integration/test_canvas.py
@@ -124,7 +124,7 @@ class test_link_error:
)
assert result.get(timeout=TIMEOUT, propagate=False) == exception
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout instead of returning exception")
def test_link_error_callback_retries(self):
exception = ExpectedException("Task expected to fail", "test")
result = fail.apply_async(
@@ -144,7 +144,7 @@ class test_link_error:
assert (fail.apply().get(timeout=TIMEOUT, propagate=False), True) == (
exception, True)
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout instead of returning exception")
def test_link_error_using_signature(self):
fail = signature('t.integration.tasks.fail', args=("test",))
retrun_exception = signature('t.integration.tasks.return_exception')
@@ -179,7 +179,7 @@ class test_chain:
res = c()
assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
def test_group_results_in_chain(self, manager):
# This adds in an explicit test for the special case added in commit
# 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6
@@ -477,7 +477,7 @@ class test_chain:
res = c()
assert res.get(timeout=TIMEOUT) == [8, 8]
- @flaky
+ @pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
def test_nested_chain_group_lone(self, manager):
"""
Test that a lone group in a chain completes.
@@ -1233,7 +1233,7 @@ class test_chord:
result = c()
assert result.get(timeout=TIMEOUT) == 4
- @flaky
+ @pytest.mark.xfail(reason="async_results aren't performed in async way")
def test_redis_subscribed_channels_leak(self, manager):
if not manager.app.conf.result_backend.startswith('redis'):
raise pytest.skip('Requires redis result backend.')
@@ -1566,11 +1566,12 @@ class test_chord:
) == 1
@flaky
- def test_generator(self, manager):
+ @pytest.mark.parametrize('size', [3, 4, 5, 6, 7, 8, 9])
+ def test_generator(self, manager, size):
def assert_generator(file_name):
- for i in range(3):
+ for i in range(size):
sleep(1)
- if i == 2:
+ if i == size - 1:
with open(file_name) as file_handle:
# ensures chord header generators tasks are processed incrementally #3021
assert file_handle.readline() == '0\n', "Chord header was unrolled too early"
@@ -1579,7 +1580,7 @@ class test_chord:
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_file:
file_name = tmp_file.name
c = chord(assert_generator(file_name), tsum.s())
- assert c().get(timeout=TIMEOUT) == 3
+ assert c().get(timeout=TIMEOUT) == size * (size - 1) // 2
@flaky
def test_parallel_chords(self, manager):
Index: celery-5.2.6/t/integration/test_tasks.py
===================================================================
--- celery-5.2.6.orig/t/integration/test_tasks.py
+++ celery-5.2.6/t/integration/test_tasks.py
@@ -29,7 +29,7 @@ class test_class_based_tasks:
def test_class_based_task_retried(self, celery_session_app,
celery_session_worker):
task = ClassBasedAutoRetryTask()
- celery_session_app.tasks.register(task)
+ celery_session_app.register_task(task)
res = task.delay()
assert res.get(timeout=TIMEOUT) == 1
@@ -255,12 +255,17 @@ class test_tasks:
manager.assert_accepted([r1.id])
@flaky
- def test_task_retried(self):
+ def test_task_retried_once(self, manager):
res = retry_once.delay()
assert res.get(timeout=TIMEOUT) == 1 # retried once
@flaky
- def test_task_retried_priority(self):
+ def test_task_retried_once_with_expires(self, manager):
+ res = retry_once.delay(expires=60)
+ assert res.get(timeout=TIMEOUT) == 1 # retried once
+
+ @flaky
+ def test_task_retried_priority(self, manager):
res = retry_once_priority.apply_async(priority=7)
assert res.get(timeout=TIMEOUT) == 7 # retried once with priority 7