File 2585-Batch-updates-to-runq-lengths-during-task-stealing.patch of Package erlang
From 7b8a6110242b10abe58ec373cfb2462271cadc22 Mon Sep 17 00:00:00 2001
From: Robin Morisset <rmorisset@meta.com>
Date: Fri, 27 Sep 2024 02:20:28 -0700
Subject: [PATCH 05/15] Batch updates to runq lengths during task stealing
We now steal up to 100 tasks at once. But we were updating the victim
runqueue length by repeated decrements, and updating the beneficiary
runqueue length by repeated increments. This is not just extra work: it
also increased the length of the critical sections (since these updates
were done while holding the relevant locks).
This change batches the increments/decrements, doing a single
addition/substraction.
---
erts/emulator/beam/erl_process.c | 63 +++++++++++++++++++++++---------
erts/emulator/beam/erl_process.h | 25 ++++++++++---
2 files changed, 64 insertions(+), 24 deletions(-)
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index b88116f384..6b458aa144 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -3940,6 +3940,16 @@ erts_sched_notify_check_cpu_bind(void)
}
}
+static ERTS_INLINE void
+enqueue_process_internal(ErtsRunPrioQueue *rpq, Process *p)
+{
+ p->next = NULL;
+ if (rpq->last)
+ rpq->last->next = p;
+ else
+ rpq->first = p;
+ rpq->last = p;
+}
static ERTS_INLINE void
enqueue_process(ErtsRunQueue *runq, int prio, Process *p)
@@ -3958,26 +3968,14 @@ enqueue_process(ErtsRunQueue *runq, int prio, Process *p)
p->schedule_count = 1;
rpq = &runq->procs.prio[prio];
}
-
- p->next = NULL;
- if (rpq->last)
- rpq->last->next = p;
- else
- rpq->first = p;
- rpq->last = p;
+ enqueue_process_internal(rpq, p);
}
-
static ERTS_INLINE void
-unqueue_process(ErtsRunQueue *runq,
- ErtsRunPrioQueue *rpq,
- ErtsRunQueueInfo *rqi,
- int prio,
+unqueue_process_no_update_lengths(ErtsRunPrioQueue *rpq,
Process *prev_proc,
Process *proc)
{
- ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
-
if (prev_proc)
prev_proc->next = proc->next;
else
@@ -3987,11 +3985,21 @@ unqueue_process(ErtsRunQueue *runq,
if (!rpq->first)
rpq->last = NULL;
+}
+static ERTS_INLINE void
+unqueue_process(ErtsRunQueue *runq,
+ ErtsRunPrioQueue *rpq,
+ ErtsRunQueueInfo *rqi,
+ int prio,
+ Process *prev_proc,
+ Process *proc)
+{
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
+ unqueue_process_no_update_lengths(rpq, prev_proc, proc);
erts_dec_runq_len(runq, rqi, prio);
}
-
static ERTS_INLINE Process *
dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep)
{
@@ -4489,6 +4497,7 @@ try_steal_task_from_victim(ErtsRunQueue *rq, ErtsRunQueue *vrq, Uint32 flags, Pr
Process *prev_proc;
Process *proc;
unsigned max_processes_to_steal;
+ unsigned n_procs_stolen[ERTS_NO_PROC_PRIO_LEVELS];
max_prio_bit = procs_qmask & -procs_qmask;
switch (max_prio_bit) {
@@ -4511,6 +4520,9 @@ try_steal_task_from_victim(ErtsRunQueue *rq, ErtsRunQueue *vrq, Uint32 flags, Pr
}
max_processes_to_steal = 100;
+ for (int i = 0; i < ERTS_NO_PROC_PRIO_LEVELS; ++i) {
+ n_procs_stolen[i] = 0;
+ }
prev_proc = NULL;
proc = rpq->first;
while (proc) {
@@ -4521,11 +4533,11 @@ try_steal_task_from_victim(ErtsRunQueue *rq, ErtsRunQueue *vrq, Uint32 flags, Pr
} else if (erts_try_change_runq_proc(proc, rq)) {
erts_aint32_t state = erts_atomic32_read_acqb(&proc->state);
int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state);
- ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio];
ErtsStolenProcess *sp = PSTACK_PUSH(stolen_processes);
sp->proc = proc;
sp->prio = prio;
- unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc);
+ n_procs_stolen[prio]++;
+ unqueue_process_no_update_lengths(rpq, prev_proc, proc);
if (--max_processes_to_steal == 0) {
break;
}
@@ -4537,15 +4549,30 @@ try_steal_task_from_victim(ErtsRunQueue *rq, ErtsRunQueue *vrq, Uint32 flags, Pr
}
if (!PSTACK_IS_EMPTY(stolen_processes)) {
ErtsStolenProcess *sp = (ErtsStolenProcess *) stolen_processes.pstart;
+ for (int i = 0; i < ERTS_NO_PROC_PRIO_LEVELS; ++i) {
+ if (n_procs_stolen[i] > 0) {
+ ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[i];
+ erts_sub_runq_len(vrq, rqi, i, n_procs_stolen[i]);
+ }
+ }
erts_runq_unlock(vrq);
*result_proc = sp->proc;
+ ASSERT(n_procs_stolen[sp->prio] > 0);
+ n_procs_stolen[sp->prio]--; // We're not going to requeue this one, as we're returning it
++sp;
erts_runq_lock(rq);
+ for (int i = 0; i < ERTS_NO_PROC_PRIO_LEVELS; ++i) {
+ if (n_procs_stolen[i] > 0) {
+ ErtsRunQueueInfo *rqi = &rq->procs.prio_info[i];
+ erts_add_runq_len(rq, rqi, i, n_procs_stolen[i]);
+ }
+ }
// We're not using a loop of PSTACK_POP to keep the right (LIFO) order of elements
// "<=" rather than "<" because of the insanity that is PSTACK (offs = 0 means that there is one element)
for (;(byte *) sp <= stolen_processes.pstart + stolen_processes.offs; ++sp) {
- enqueue_process(rq, sp->prio, sp->proc);
+ unsigned prio_q = sp->prio == PRIORITY_LOW ? PRIORITY_NORMAL : sp->prio;
+ enqueue_process_internal(&rq->procs.prio[prio_q], sp->proc);
}
PSTACK_DESTROY(stolen_processes);
return !0;
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 95e344f3f5..de96e35a6a 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -788,11 +788,13 @@ void erts_non_empty_runq(ErtsRunQueue *rq);
ERTS_GLB_INLINE void erts_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio);
ERTS_GLB_INLINE void erts_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio);
ERTS_GLB_INLINE void erts_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi);
+ERTS_GLB_INLINE void erts_add_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio, unsigned n);
+ERTS_GLB_INLINE void erts_sub_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio, unsigned n);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
-erts_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
+erts_add_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio, unsigned n)
{
erts_aint32_t len;
@@ -802,7 +804,7 @@ erts_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
if (len == 0)
erts_non_empty_runq(rq);
- len++;
+ len += n;
if (rq->max_len < len)
rq->max_len = len;
ASSERT(len > 0);
@@ -816,7 +818,7 @@ erts_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
erts_atomic32_read_bor_nob(&rq->flags,
(erts_aint32_t) (1 << prio));
}
- len++;
+ len += n;
if (rqi->max_len < len)
rqi->max_len = len;
@@ -824,19 +826,25 @@ erts_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
}
ERTS_GLB_INLINE void
-erts_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
+erts_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
+{
+ erts_add_runq_len(rq, rqi, prio, 1);
+}
+
+ERTS_GLB_INLINE void
+erts_sub_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio, unsigned n)
{
erts_aint32_t len;
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
len = erts_atomic32_read_dirty(&rq->len);
- len--;
+ len -= n;
ASSERT(len >= 0);
erts_atomic32_set_nob(&rq->len, len);
len = erts_atomic32_read_dirty(&rqi->len);
- len--;
+ len -= n;
ASSERT(len >= 0);
if (len == 0) {
ASSERT((erts_atomic32_read_nob(&rq->flags)
@@ -845,7 +853,12 @@ erts_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
~((erts_aint32_t) (1 << prio)));
}
erts_atomic32_set_relb(&rqi->len, len);
+}
+ERTS_GLB_INLINE void
+erts_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
+{
+ erts_sub_runq_len(rq, rqi, prio, 1);
}
ERTS_GLB_INLINE void
--
2.43.0