File 879af6341974c3778077d8212d78f093b2d77a4f.patch of Package python-celery
From 879af6341974c3778077d8212d78f093b2d77a4f Mon Sep 17 00:00:00 2001
From: Tomer Nosrati <tomer.nosrati@kcg.tech>
Date: Tue, 4 Oct 2022 02:06:50 +0300
Subject: [PATCH] Fixed error handling bugs due to upgrade to a newer version
of billiard
---
celery/app/task.py | 4 +++-
celery/worker/request.py | 19 ++++++++++++++-----
t/unit/utils/test_collections.py | 4 ++--
t/unit/worker/test_request.py | 8 ++++----
4 files changed, 23 insertions(+), 12 deletions(-)
diff --git a/celery/app/task.py b/celery/app/task.py
index 212bc772e01..d6108fbef8c 100644
--- a/celery/app/task.py
+++ b/celery/app/task.py
@@ -1,7 +1,7 @@
"""Task implementation: request context and the task base class."""
import sys
-from billiard.einfo import ExceptionInfo
+from billiard.einfo import ExceptionInfo, ExceptionWithTraceback
from kombu import serialization
from kombu.exceptions import OperationalError
from kombu.utils.uuid import uuid
@@ -813,6 +813,8 @@ def apply(self, args=None, kwargs=None,
retval = ret.retval
if isinstance(retval, ExceptionInfo):
retval, tb = retval.exception, retval.traceback
+ if isinstance(retval, ExceptionWithTraceback):
+ retval = retval.exc
if isinstance(retval, Retry) and retval.sig is not None:
return retval.sig.apply(retries=retries + 1)
state = states.SUCCESS if ret.info is None else ret.info.state
diff --git a/celery/worker/request.py b/celery/worker/request.py
index d89971468c6..d0004a19ccc 100644
--- a/celery/worker/request.py
+++ b/celery/worker/request.py
@@ -10,6 +10,7 @@
from weakref import ref
from billiard.common import TERM_SIGNAME
+from billiard.einfo import ExceptionWithTraceback
from kombu.utils.encoding import safe_repr, safe_str
from kombu.utils.objects import cached_property
@@ -511,8 +512,11 @@ def on_success(self, failed__retval__runtime, **kwargs):
"""Handler called if the task was successfully processed."""
failed, retval, runtime = failed__retval__runtime
if failed:
- if isinstance(retval.exception, (SystemExit, KeyboardInterrupt)):
- raise retval.exception
+ exc = retval.exception
+ if isinstance(exc, ExceptionWithTraceback):
+ exc = exc.exc
+ if isinstance(exc, (SystemExit, KeyboardInterrupt)):
+ raise exc
return self.on_failure(retval, return_ok=True)
task_ready(self, successful=True)
@@ -535,6 +539,9 @@ def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
task_ready(self)
exc = exc_info.exception
+ if isinstance(exc, ExceptionWithTraceback):
+ exc = exc.exc
+
is_terminated = isinstance(exc, Terminated)
if is_terminated:
# If the task was terminated and the task was not cancelled due
@@ -735,9 +742,11 @@ def execute_using_pool(self, pool, **kwargs):
def on_success(self, failed__retval__runtime, **kwargs):
failed, retval, runtime = failed__retval__runtime
if failed:
- if isinstance(retval.exception, (
- SystemExit, KeyboardInterrupt)):
- raise retval.exception
+ exc = retval.exception
+ if isinstance(exc, ExceptionWithTraceback):
+ exc = exc.exc
+ if isinstance(exc, (SystemExit, KeyboardInterrupt)):
+ raise exc
return self.on_failure(retval, return_ok=True)
task_ready(self)
diff --git a/t/unit/utils/test_collections.py b/t/unit/utils/test_collections.py
index ce776cebf1a..aae685ebc7c 100644
--- a/t/unit/utils/test_collections.py
+++ b/t/unit/utils/test_collections.py
@@ -145,8 +145,8 @@ def test_exception_info(self):
except Exception:
einfo = ExceptionInfo()
assert str(einfo) == einfo.traceback
- assert isinstance(einfo.exception, LookupError)
- assert einfo.exception.args == ('The quick brown fox jumps...',)
+ assert isinstance(einfo.exception.exc, LookupError)
+ assert einfo.exception.exc.args == ('The quick brown fox jumps...',)
assert einfo.traceback
assert repr(einfo)
diff --git a/t/unit/worker/test_request.py b/t/unit/worker/test_request.py
index a34f70dc80d..b818f2837cc 100644
--- a/t/unit/worker/test_request.py
+++ b/t/unit/worker/test_request.py
@@ -155,7 +155,7 @@ def test_execute_jail_failure(self):
self.app, uuid(), self.mytask_raising.name, {}, [4], {},
)
assert isinstance(ret, ExceptionInfo)
- assert ret.exception.args == (4,)
+ assert ret.exception.exc.args == (4,)
def test_execute_task_ignore_result(self):
@self.app.task(shared=False, ignore_result=True)
@@ -385,7 +385,7 @@ def test_on_failure_WorkerLostError_redelivered_True(self):
task_failure,
sender=req.task,
task_id=req.id,
- exception=einfo.exception,
+ exception=einfo.exception.exc,
args=req.args,
kwargs=req.kwargs,
traceback=einfo.traceback,
@@ -394,7 +394,7 @@ def test_on_failure_WorkerLostError_redelivered_True(self):
req.on_failure(einfo)
req.task.backend.mark_as_failure.assert_called_once_with(req.id,
- einfo.exception,
+ einfo.exception.exc,
request=req._context,
store_result=True)
@@ -807,7 +807,7 @@ def test_from_message_invalid_kwargs(self):
m = self.TaskMessage(self.mytask.name, args=(), kwargs='foo')
req = Request(m, app=self.app)
with pytest.raises(InvalidTaskError):
- raise req.execute().exception
+ raise req.execute().exception.exc
def test_on_hard_timeout_acks_late(self, patching):
error = patching('celery.worker.request.error')