File backport_run_in_executor.patch of Package python-flower
From 5741cbcbc5c2a75c2552326018ee97b8fe5f257f Mon Sep 17 00:00:00 2001
From: John Vandenberg <jayvdb@gmail.com>
Date: Fri, 22 Mar 2019 08:28:34 +0700
Subject: [PATCH] Backport run_in_executor
---
flower/api/tasks.py | 21 +++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
diff --git a/flower/api/tasks.py b/flower/api/tasks.py
index 1f172422..f0395dea 100644
--- a/flower/api/tasks.py
+++ b/flower/api/tasks.py
@@ -78,6 +78,24 @@ def safe_result(self, result):
return result
+def inline_run_in_executor(func, *args):
+ from tornado.concurrent import Future, chain_future
+
+ io_loop = IOLoop.current()
+ if not hasattr(io_loop, "_executor"):
+ import concurrent.futures
+ from tornado.process import cpu_count
+
+ io_loop._executor = concurrent.futures.ThreadPoolExecutor(
+ max_workers=(cpu_count() * 5)
+ )
+ executor = io_loop._executor
+ c_future = executor.submit(func, *args)
+ t_future = Future()
+ io_loop.add_future(c_future, lambda f: chain_future(f, t_future))
+ return t_future
+
+
class TaskApply(BaseTaskHandler):
@web.authenticated
@gen.coroutine
@@ -138,8 +156,7 @@ def post(self, taskname):
result = task.apply_async(args=args, kwargs=kwargs, **options)
response = {'task-id': result.task_id}
- response = yield IOLoop.current().run_in_executor(
- None, self.wait_results, result, response)
+ response = yield inline_run_in_executor(self.wait_results, result, response)
self.write(response)
def wait_results(self, result, response):