File 2111-Directly-execute-stolen-processes.patch of Package erlang
From 08186bd6c446e6ab971fe5d943dfff3fdf60e84e Mon Sep 17 00:00:00 2001
From: Robin Morisset <rmorisset@meta.com>
Date: Wed, 17 Jul 2024 07:06:54 -0700
Subject: [PATCH 01/15] Directly execute stolen processes
When a scheduler runqueue is empty, that scheduler attempts to steal
work from another (hopefully non-empty) runqueue. Currently, the
sequence of events looks like this:
- Notice that there is no work to do
- Set a flag saying "don't steal work from me, I'm busy stealing"
- Remove a process from some other runqueue
- Put it back on your own runqueue
- goto the beginning of the scheduler loop
- Notice that there is now work to do
- Take the process from your runqueue (that you just added it to)
- Unset the flag
- Execute that process
The new sequence looks like this:
- Notice that there is no work to do
- Remove a process from some other runqueue
- Execute that process
There is no more need for a flag to protect our runqueue from stealing,
since the process we stole never spends time in it (and so is never at
risk of being stolen back from us which could lead to some kind of
livelock)
---
erts/emulator/beam/erl_process.c | 57 ++++++++++++++------------------
erts/emulator/beam/erl_process.h | 10 +++---
2 files changed, 28 insertions(+), 39 deletions(-)
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 4e202e7a14..47e9a079ed 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -2978,16 +2978,14 @@ empty_runq_aux(ErtsRunQueue *rq, Uint32 old_flags)
static ERTS_INLINE void
empty_runq(ErtsRunQueue *rq)
{
- Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY);
empty_runq_aux(rq, old_flags);
}
static ERTS_INLINE Uint32
-empty_protected_runq(ErtsRunQueue *rq)
+empty_runq_get_old_flags(ErtsRunQueue *rq)
{
- Uint32 old_flags = ERTS_RUNQ_FLGS_BSET(rq,
- ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED,
- ERTS_RUNQ_FLG_PROTECTED);
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY);
empty_runq_aux(rq, old_flags);
return old_flags;
}
@@ -4036,6 +4034,7 @@ check_requeue_process(ErtsRunQueue *rq, int prio_q)
{
ErtsRunPrioQueue *rpq = &rq->procs.prio[prio_q];
Process *p = rpq->first;
+ ASSERT(p);
if (--p->schedule_count > 0 && p != rpq->last) {
/* reschedule */
rpq->first = p->next;
@@ -4081,8 +4080,6 @@ check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
#endif
f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq);
- if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED)
- return NULL;
if (ERTS_CHK_RUNQ_FLG_EVACUATE(f_flags, prio))
return f_rq;
@@ -4308,8 +4305,6 @@ evacuate_run_queue(ErtsRunQueue *rq,
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
-
ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
mps = erts_get_migration_paths_managed();
@@ -4463,7 +4458,7 @@ evacuate_run_queue(ErtsRunQueue *rq,
}
static int
-try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, Uint32 flags)
+try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, Uint32 flags, Process **result_proc)
{
Uint32 procs_qmask = flags & ERTS_RUNQ_FLGS_PROCS_QMASK;
int max_prio_bit;
@@ -4520,9 +4515,7 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq,
unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc);
erts_runq_unlock(vrq);
- erts_runq_lock(rq);
- *rq_lockedp = 1;
- enqueue_process(rq, prio, proc);
+ *result_proc = proc;
return !0;
}
prev_proc = proc;
@@ -4560,25 +4553,24 @@ no_procs:
static ERTS_INLINE int
-check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix)
+check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix, Process **result_proc)
{
ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix);
Uint32 flags = ERTS_RUNQ_FLGS_GET(vrq);
- if (runq_got_work_to_execute_flags(flags) & (!(flags & ERTS_RUNQ_FLG_PROTECTED)))
- return try_steal_task_from_victim(rq, rq_lockedp, vrq, flags);
+ if (runq_got_work_to_execute_flags(flags))
+ return try_steal_task_from_victim(rq, rq_lockedp, vrq, flags, result_proc);
else
return 0;
}
static int
-try_steal_task(ErtsRunQueue *rq)
+try_steal_task(ErtsRunQueue *rq, Process **result_proc)
{
int res, rq_locked, vix, active_rqs, blnc_rqs;
Uint32 flags;
- /* Protect jobs we steal from getting stolen from us... */
- flags = empty_protected_runq(rq);
+ flags = empty_runq_get_old_flags(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED)
return 0; /* go suspend instead... */
@@ -4599,7 +4591,7 @@ try_steal_task(ErtsRunQueue *rq)
int no = blnc_rqs - active_rqs;
int stop_ix = vix = active_rqs + rq->ix % no;
while (erts_atomic32_read_acqb(&no_empty_run_queues) < blnc_rqs) {
- res = check_possible_steal_victim(rq, &rq_locked, vix);
+ res = check_possible_steal_victim(rq, &rq_locked, vix, result_proc);
if (res)
goto done;
vix++;
@@ -4620,7 +4612,7 @@ try_steal_task(ErtsRunQueue *rq)
if (vix == rq->ix)
break;
- res = check_possible_steal_victim(rq, &rq_locked, vix);
+ res = check_possible_steal_victim(rq, &rq_locked, vix, result_proc);
if (res)
goto done;
}
@@ -5556,8 +5548,6 @@ wakeup_other_check(ErtsRunQueue *rq, Uint32 flags)
{
int empty_rqs =
erts_atomic32_read_acqb(&no_empty_run_queues);
- if (flags & ERTS_RUNQ_FLG_PROTECTED)
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
if (empty_rqs != 0)
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
@@ -5618,8 +5608,6 @@ wakeup_other_check_legacy(ErtsRunQueue *rq, Uint32 flags)
else if (rq->wakeup_other < wo_params->limit)
rq->wakeup_other += len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC_LEGACY;
else {
- if (flags & ERTS_RUNQ_FLG_PROTECTED)
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
if (erts_atomic32_read_acqb(&no_empty_run_queues) != 0) {
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
@@ -9834,11 +9822,20 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
/* Go suspend... */
goto continue_check_activities_to_run_known_flags;
}
+ empty_runq(rq);
}
else {
/* Normal scheduler */
- if (try_steal_task(rq))
+ p = NULL;
+ if (try_steal_task(rq, &p)) {
+ if (p) {
+ non_empty_runq(rq);
+ // TODO: avoid re-reading state, but it's ok for now
+ state = erts_atomic32_read_acqb(&p->state);
+ goto execute_process;
+ }
goto continue_check_activities_to_run;
+ }
/*
* Check for suspend has to be done after trying
* to steal a task...
@@ -9861,7 +9858,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
goto continue_check_activities_to_run_known_flags;
}
}
- empty_runq(rq);
}
(void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_EXEC);
@@ -9972,6 +9968,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
}
#endif
+execute_process:
if (is_normal_sched) {
psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state)
+ ERTS_PSFLGS_IN_PRQ_MASK_OFFSET));
@@ -10116,10 +10113,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_EMULATOR);
-
- if (flags & ERTS_RUNQ_FLG_PROTECTED)
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
-
ERTS_CHK_NO_PROC_LOCKS;
erts_proc_lock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
@@ -15076,8 +15069,6 @@ void erts_print_run_queue_info(fmtfn_t to, void *to_arg,
erts_print(to, to_arg, "INACTIVE"); break;
case ERTS_RUNQ_FLG_NONEMPTY:
erts_print(to, to_arg, "NONEMPTY"); break;
- case ERTS_RUNQ_FLG_PROTECTED:
- erts_print(to, to_arg, "PROTECTED"); break;
case ERTS_RUNQ_FLG_EXEC:
erts_print(to, to_arg, "EXEC"); break;
case ERTS_RUNQ_FLG_MSB_EXEC:
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 2c4b36a4d7..95e344f3f5 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -163,16 +163,14 @@ extern int erts_dio_sched_thread_suggested_stack_size;
(((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 4))
#define ERTS_RUNQ_FLG_NONEMPTY \
(((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 5))
-#define ERTS_RUNQ_FLG_PROTECTED \
- (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 6))
#define ERTS_RUNQ_FLG_EXEC \
- (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 7))
+ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 6))
#define ERTS_RUNQ_FLG_MSB_EXEC \
- (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 8))
+ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 7))
#define ERTS_RUNQ_FLG_MISC_OP \
- (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 9))
+ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 8))
#define ERTS_RUNQ_FLG_HALTING \
- (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 10))
+ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 9))
#define ERTS_RUNQ_FLG_MAX (ERTS_RUNQ_FLG_BASE2 + 12)
--
2.43.0