File 6003-core-don-t-process-dbus-unit-and-job-queue-when-ther.patch of Package systemd.39177
From bac0326d26ce3309c1f0e78cf55754b0550b2da9 Mon Sep 17 00:00:00 2001
From: Lennart Poettering <lennart@poettering.net>
Date: Tue, 13 Feb 2018 18:30:34 +0100
Subject: [PATCH 2/2] core: don't process dbus unit and job queue when there
are already too many messages pending
We maintain a queue of units and jobs that we are supposed to generate
change/new notifications for because they were either just created or
some of their property has changed. Let's throttle processing of this
queue a bit: as soon as > 1K of bus messages are queued for writing
let's skip processing the queue, and then recheck on the next
iteration again.
Moreover, never process more than 100 units in one go, return to the
event loop after that. Both limits together should put effective limits
on both space and time usage of the function, delaying further
operations until a later moment, when the queue is empty or the the
event loop is sufficiently idle again.
This should keep the number of generated messages much lower than
before on busy systems or where some client is hanging.
Note that this also means a bad client can slow down message dispatching
substantially for up to 90s if it likes to, for all clients. But that
should be acceptable as we only allow trusted bus clients, anyway.
Fixes: #8166
(cherry picked from commit e0a085811de1d1976c019afcfc2e4e74f590cc9f)
[fbui: fixes bsc#1233509]
[fbui: fixes bsc#1231211 (To be confirmed)]
---
src/core/dbus.c | 31 ++++++++++++++++++++++++++++
src/core/dbus.h | 2 ++
src/core/manager.c | 49 ++++++++++++++++++++++++++++++++++++--------
src/systemd/sd-bus.h | 3 +++
4 files changed, 76 insertions(+), 9 deletions(-)
diff --git a/src/core/dbus.c b/src/core/dbus.c
index 0b9b2d6d4e..ac412f7aa1 100644
--- a/src/core/dbus.c
+++ b/src/core/dbus.c
@@ -1255,3 +1255,34 @@ int bus_verify_reload_daemon_async(Manager *m, sd_bus_message *call, sd_bus_erro
int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error) {
return bus_verify_polkit_async(call, CAP_SYS_ADMIN, "org.freedesktop.systemd1.set-environment", NULL, false, UID_INVALID, &m->polkit_registry, error);
}
+
+uint64_t manager_bus_n_queued_write(Manager *m) {
+ uint64_t c = 0;
+ Iterator i;
+ sd_bus *b;
+ int r;
+
+ /* Returns the total number of messages queued for writing on all our direct and API busses. */
+
+ SET_FOREACH(b, m->private_buses, i) {
+ uint64_t k;
+
+ r = sd_bus_get_n_queued_write(b, &k);
+ if (r < 0)
+ log_debug_errno(r, "Failed to query queued messages for private bus: %m");
+ else
+ c += k;
+ }
+
+ if (m->api_bus) {
+ uint64_t k;
+
+ r = sd_bus_get_n_queued_write(m->api_bus, &k);
+ if (r < 0)
+ log_debug_errno(r, "Failed to query queued messages for API bus: %m");
+ else
+ c += k;
+ }
+
+ return c;
+}
diff --git a/src/core/dbus.h b/src/core/dbus.h
index 1d4526ece1..14d4424a6d 100644
--- a/src/core/dbus.h
+++ b/src/core/dbus.h
@@ -44,3 +44,5 @@ int bus_verify_reload_daemon_async(Manager *m, sd_bus_message *call, sd_bus_erro
int bus_verify_set_environment_async(Manager *m, sd_bus_message *call, sd_bus_error *error);
int bus_forward_agent_released(Manager *m, const char *path);
+
+uint64_t manager_bus_n_queued_write(Manager *m);
diff --git a/src/core/manager.c b/src/core/manager.c
index c38364e8a8..b6681a80ad 100644
--- a/src/core/manager.c
+++ b/src/core/manager.c
@@ -94,6 +94,13 @@
#define JOBS_IN_PROGRESS_PERIOD_USEC (USEC_PER_SEC / 3)
#define JOBS_IN_PROGRESS_PERIOD_DIVISOR 3
+/* If there are more than 1K bus messages queue across our API and direct busses, then let's not add more on top until
+ * the queue gets more empty. */
+#define MANAGER_BUS_BUSY_THRESHOLD 1024LU
+
+/* How many units and jobs to process of the bus queue before returning to the event loop. */
+#define MANAGER_BUS_MESSAGE_BUDGET 100U
+
static int manager_dispatch_notify_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
static int manager_dispatch_cgroups_agent_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
static int manager_dispatch_signal_fd(sd_event_source *source, int fd, uint32_t revents, void *userdata);
@@ -1547,41 +1554,65 @@ static int manager_dispatch_run_queue(sd_event_source *source, void *userdata) {
}
static unsigned manager_dispatch_dbus_queue(Manager *m) {
- Job *j;
+ unsigned n = 0, budget;
Unit *u;
- unsigned n = 0;
+ Job *j;
assert(m);
if (m->dispatching_dbus_queue)
return 0;
+ /* Anything to do at all? */
+ if (!m->dbus_unit_queue && !m->dbus_job_queue && !m->send_reloading_done && !m->queued_message)
+ return 0;
+
+ /* Do we have overly many messages queued at the moment? If so, let's not enqueue more on top, let's sit this
+ * cycle out, and process things in a later cycle when the queues got a bit emptier. */
+ if (manager_bus_n_queued_write(m) > MANAGER_BUS_BUSY_THRESHOLD)
+ return 0;
+
+ /* Only process a certain number of units/jobs per event loop iteration. Even if the bus queue wasn't overly
+ * full before this call we shouldn't increase it in size too wildly in one step, and we shouldn't monopolize
+ * CPU time with generating these messages. Note the difference in counting of this "budget" and the
+ * "threshold" above: the "budget" is decreased only once per generated message, regardless how many
+ * busses/direct connections it is enqueued on, while the "threshold" is applied to each queued instance of bus
+ * message, i.e. if the same message is enqueued to five busses/direct connections it will be counted five
+ * times. This difference in counting ("references" vs. "instances") is primarily a result of the fact that
+ * it's easier to implement it this way, however it also reflects the thinking that the "threshold" should put
+ * a limit on used queue memory, i.e. space, while the "budget" should put a limit on time. Also note that
+ * the "threshold" is currently chosen much higher than the "budget". */
+ budget = MANAGER_BUS_MESSAGE_BUDGET;
+
m->dispatching_dbus_queue = true;
- while ((u = m->dbus_unit_queue)) {
+ while (budget > 0 && (u = m->dbus_unit_queue)) {
+
assert(u->in_dbus_queue);
bus_unit_send_change_signal(u);
- n++;
+ n++, budget--;
}
- while ((j = m->dbus_job_queue)) {
+ while (budget > 0 && (j = m->dbus_job_queue)) {
assert(j->in_dbus_queue);
bus_job_send_change_signal(j);
- n++;
+ n++, budget--;
}
m->dispatching_dbus_queue = false;
- if (m->send_reloading_done) {
+ if (budget > 0 && m->send_reloading_done) {
m->send_reloading_done = false;
-
bus_manager_send_reloading(m, false);
+ n++, budget--;
}
- if (m->queued_message)
+ if (budget > 0 && m->queued_message) {
bus_send_queued_message(m);
+ n++;
+ }
return n;
}
diff --git a/src/systemd/sd-bus.h b/src/systemd/sd-bus.h
index 71a47f6f8e..e6d7102354 100644
--- a/src/systemd/sd-bus.h
+++ b/src/systemd/sd-bus.h
@@ -190,6 +190,9 @@ int sd_bus_attach_event(sd_bus *bus, sd_event *e, int priority);
int sd_bus_detach_event(sd_bus *bus);
sd_event *sd_bus_get_event(sd_bus *bus);
+int sd_bus_get_n_queued_read(sd_bus *bus, uint64_t *ret);
+int sd_bus_get_n_queued_write(sd_bus *bus, uint64_t *ret);
+
int sd_bus_add_filter(sd_bus *bus, sd_bus_slot **slot, sd_bus_message_handler_t callback, void *userdata);
int sd_bus_add_match(sd_bus *bus, sd_bus_slot **slot, const char *match, sd_bus_message_handler_t callback, void *userdata);
int sd_bus_add_object(sd_bus *bus, sd_bus_slot **slot, const char *path, sd_bus_message_handler_t callback, void *userdata);
--
2.43.0