File 0028-68050-tentacle-stretched-cluster.patch of Package ceph-ceph-20.2.0+20260319.5bb32787

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index dc272546b93..5514f5381f5 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1001,6 +1001,7 @@ if(WITH_NVMEOF_GATEWAY_MONITOR_CLIENT)
     ${nvmeof_monitor_grpc_hdrs}
     ceph_nvmeof_monitor_client.cc
     nvmeof/NVMeofGwClient.cc
+    nvmeof/NVMeofGwUtils.cc
     nvmeof/NVMeofGwMonitorGroupClient.cc
     nvmeof/NVMeofGwMonitorClient.cc)
   add_executable(ceph-nvmeof-monitor-client ${ceph_nvmeof_monitor_client_srcs})
diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h
index 5d8b38fc0a1..5a607ef4b5a 100644
--- a/src/include/ceph_features.h
+++ b/src/include/ceph_features.h
@@ -161,7 +161,7 @@ DEFINE_CEPH_FEATURE_RETIRED(49, 1, OSD_PROXY_FEATURES, JEWEL, LUMINOUS) // overl
 DEFINE_CEPH_FEATURE(49, 2, SERVER_SQUID);
 DEFINE_CEPH_FEATURE_RETIRED(50, 1, MON_METADATA, MIMIC, OCTOPUS)
 DEFINE_CEPH_FEATURE(50, 2, SERVER_TENTACLE);
-DEFINE_CEPH_FEATURE_RETIRED(51, 1, OSD_BITWISE_HOBJ_SORT, MIMIC, OCTOPUS)
+DEFINE_CEPH_FEATURE(51, 2, NVMEOF_BEACON_DIFF)
 // available
 DEFINE_CEPH_FEATURE_RETIRED(52, 1, OSD_PROXY_WRITE_FEATURES, MIMIC, OCTOPUS)
 // available
@@ -258,6 +258,7 @@ DEFINE_CEPH_FEATURE_RETIRED(63, 1, RESERVED_BROKEN, LUMINOUS, QUINCY) // client-
 	 CEPH_FEATUREMASK_SERVER_REEF | \
 	 CEPH_FEATUREMASK_SERVER_SQUID | \
 	 CEPH_FEATUREMASK_SERVER_TENTACLE | \
+	 CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF | \
 	 0ULL)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
diff --git a/src/messages/MNVMeofGwBeacon.h b/src/messages/MNVMeofGwBeacon.h
index 26fc8dcf3ac..b39c10576d2 100644
--- a/src/messages/MNVMeofGwBeacon.h
+++ b/src/messages/MNVMeofGwBeacon.h
@@ -21,11 +21,9 @@
 #include "mon/MonCommand.h"
 #include "mon/NVMeofGwMap.h"
 #include "include/types.h"
+#include "mon/NVMeofGwBeaconConstants.h"
 
 class MNVMeofGwBeacon final : public PaxosServiceMessage {
-private:
-  static constexpr int HEAD_VERSION = 1;
-  static constexpr int COMPAT_VERSION = 1;
 
 protected:
     std::string       gw_id;
@@ -35,10 +33,13 @@ protected:
     gw_availability_t availability;                         // in absence of  beacon  heartbeat messages it becomes inavailable
     epoch_t           last_osd_epoch;
     epoch_t           last_gwmap_epoch;
+    uint64_t          sequence = 0;                         // sequence number for each beacon message
+    uint64_t          affected_features = 0;
 
 public:
   MNVMeofGwBeacon()
-    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION}
+    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, BEACON_VERSION_ENHANCED,
+	  BEACON_VERSION_ENHANCED}, sequence(0)
   {
     set_priority(CEPH_MSG_PRIO_HIGH);
   }
@@ -49,11 +50,17 @@ public:
         const BeaconSubsystems& subsystems_,
         const gw_availability_t& availability_,
         const epoch_t& last_osd_epoch_,
-        const epoch_t& last_gwmap_epoch_
-  )
-    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
+        const epoch_t& last_gwmap_epoch_,
+        uint64_t sequence_ = 0,  // default sequence for backward compatibility
+        uint64_t features = 0)
+    : PaxosServiceMessage{MSG_MNVMEOF_GW_BEACON,
+            0,
+            features ? BEACON_VERSION_ENHANCED : BEACON_VERSION_LEGACY,
+            features ? BEACON_VERSION_ENHANCED : BEACON_VERSION_LEGACY},
       gw_id(gw_id_), gw_pool(gw_pool_), gw_group(gw_group_), subsystems(subsystems_),
-      availability(availability_), last_osd_epoch(last_osd_epoch_), last_gwmap_epoch(last_gwmap_epoch_)
+      availability(availability_), last_osd_epoch(last_osd_epoch_),
+      last_gwmap_epoch(last_gwmap_epoch_), sequence(sequence_),
+      affected_features(features)
   {
     set_priority(CEPH_MSG_PRIO_HIGH);
   }
@@ -77,6 +84,7 @@ public:
   const epoch_t&           get_last_osd_epoch() const   { return last_osd_epoch; }
   const epoch_t&           get_last_gwmap_epoch() const { return last_gwmap_epoch; }
   const BeaconSubsystems&  get_subsystems()     const   { return subsystems; };
+  uint64_t get_sequence() const { return sequence; }
 
 private:
   ~MNVMeofGwBeacon() final {}
@@ -91,10 +99,14 @@ public:
     encode(gw_id, payload);
     encode(gw_pool, payload);
     encode(gw_group, payload);
-    encode(subsystems, payload);
+    encode(subsystems, payload, affected_features);
     encode((uint32_t)availability, payload);
     encode(last_osd_epoch, payload);
     encode(last_gwmap_epoch, payload);
+    // Only encode sequence for enhanced beacons (version >= 2)
+    if (get_header().version >= 2) {
+      encode(sequence, payload);
+    }
   }
 
   void decode_payload() override {
@@ -111,6 +123,12 @@ public:
     availability = static_cast<gw_availability_t>(tmp);
     decode(last_osd_epoch, p);
     decode(last_gwmap_epoch, p);
+    // Only decode sequence for enhanced beacons (version >= 2)
+    if (get_header().version >= 2 && !p.end()) {
+      decode(sequence, p);
+    } else {
+      sequence = 0;  // Legacy beacons don't have sequence field
+    }
   }
 
 private:
diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h
index b2ee4836794..21daba955f3 100644
--- a/src/mon/MonCommands.h
+++ b/src/mon/MonCommands.h
@@ -1457,6 +1457,43 @@ COMMAND("nvme-gw listeners"
 	" show all nvmeof gateways listeners within (pool, group)",
 	"mon", "r")
 
+COMMAND("nvme-gw enable"
+   " name=id,type=CephString"
+   " name=pool,type=CephString"
+   " name=group,type=CephString",
+   "administratively enables nvmeof gateway id for (pool, group)",
+   "mgr", "rw")
+
+COMMAND("nvme-gw disable"
+   " name=id,type=CephString"
+   " name=pool,type=CephString"
+   " name=group,type=CephString",
+   "administratively disables nvmeof gateway id for (pool, group)",
+   "mgr", "rw")
+
+COMMAND("nvme-gw set-location"
+   " name=id,type=CephString"
+   " name=pool,type=CephString"
+   " name=group,type=CephString"
+   " name=location,type=CephString",
+   "set location for nvmeof gateway id for (pool, group)",
+   "mgr", "rw")
+
+COMMAND("nvme-gw disaster-set"
+     " name=pool,type=CephString"
+     " name=group,type=CephString"
+     " name=location,type=CephString",
+     " set location to Disaster state",
+     "mgr", "rw")
+
+COMMAND("nvme-gw disaster-clear"
+    " name=pool,type=CephString"
+    " name=group,type=CephString"
+    " name=location,type=CephString",
+    " set location to clear Disaster state - failbacks allowed for recovered location",
+    "mgr", "rw")
+
+
 // these are tell commands that were implemented as CLI commands in
 // the broken pre-octopus way that we want to allow to work when a
 // monitor has upgraded to octopus+ but the monmap min_mon_release is
diff --git a/src/mon/NVMeofGwBeaconConstants.h b/src/mon/NVMeofGwBeaconConstants.h
new file mode 100644
index 00000000000..7453905012f
--- /dev/null
+++ b/src/mon/NVMeofGwBeaconConstants.h
@@ -0,0 +1,24 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef CEPH_NVMEOFGWBEACONCONSTANTS_H
+#define CEPH_NVMEOFGWBEACONCONSTANTS_H
+
+// This header contains version constants used across multiple files
+// to avoid duplication and maintain consistency.
+
+// Beacon version constants
+#define BEACON_VERSION_LEGACY 1             // Legacy beacon format (no diff support)
+#define BEACON_VERSION_ENHANCED 2           // Enhanced beacon format (with diff support)
+
+#endif /* CEPH_NVMEOFGWBEACONCONSTANTS_H */
diff --git a/src/mon/NVMeofGwMap.cc b/src/mon/NVMeofGwMap.cc
index 4b3fb2c676b..e1586483f5d 100755
--- a/src/mon/NVMeofGwMap.cc
+++ b/src/mon/NVMeofGwMap.cc
@@ -47,7 +47,8 @@ void NVMeofGwMap::to_gmap(
       }
 
       auto gw_state = NvmeGwClientState(
-	gw_created.ana_grp_id, epoch, availability);
+	gw_created.ana_grp_id, epoch, availability, gw_created.beacon_sequence,
+	gw_created.beacon_sequence_ooo);
       for (const auto& sub: gw_created.subsystems) {
 	gw_state.subsystems.insert({
 	    sub.nqn,
@@ -80,10 +81,10 @@ void NVMeofGwMap::remove_grp_id(
 }
 
 int NVMeofGwMap::cfg_add_gw(
-  const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
+  const NvmeGwId &gw_id, const NvmeGroupKey& group_key, uint64_t features)
 {
   std::set<NvmeAnaGrpId> allocated;
-  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOFHAMAP)) {
+  if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
     auto gw_epoch_it = gw_epoch.find(group_key);
     if (gw_epoch_it == gw_epoch.end()) {
       gw_epoch[group_key] = epoch;
@@ -174,11 +175,16 @@ int NVMeofGwMap::cfg_delete_gw(
     for (auto& gws_states: created_gws[group_key]) {
       if (gws_states.first == gw_id) {
         auto& state = gws_states.second;
+        if (state.availability == gw_availability_t::GW_AVAILABLE) {
+		   /*prevent failover because blocklisting right now cause IO errors */
+		   dout(4) << "Delete GW: set skip-failovers for group " << gw_id
+				   << " group " << group_key << dendl;
+		   skip_failovers_for_group(group_key, 5);
+		}
         state.availability = gw_availability_t::GW_DELETING;
         dout(4) << " Deleting  GW :"<< gw_id  << " in state  "
             << state.availability <<  " Resulting GW availability: "
             << state.availability  << dendl;
-        state.subsystems.clear();//ignore subsystems of this GW
         utime_t now = ceph_clock_now();
         mon->nvmegwmon()->gws_deleting_time[group_key][gw_id] = now;
         return 0;
@@ -219,6 +225,10 @@ int NVMeofGwMap::do_delete_gw(
       }
       dout(10) << " Delete GW :"<< gw_id  << " ANA grpid: "
         << state.ana_grp_id  << dendl;
+      if(is_last_gw_in_location(gw_id, group_key, state.location)) {
+        disaster_map_remove_location(group_key, state.location);
+      }
+
       for (auto& itr: created_gws[group_key]) {
         // Update state map and other maps
 	remove_grp_id(itr.first, group_key, state.ana_grp_id);
@@ -231,6 +241,245 @@ int NVMeofGwMap::do_delete_gw(
   return -EINVAL;
 }
 
+int NVMeofGwMap::cfg_admin_state_change(const NvmeGwId &gw_id,
+        const NvmeGroupKey& group_key,
+        gw_admin_state_t state, bool &propose_pending)
+{
+  auto& gws_states = created_gws[group_key];
+  auto  gw_state = gws_states.find(gw_id);
+  if (gw_state != gws_states.end()) {
+    auto& st = gw_state->second;
+    if (state == gw_admin_state_t::GW_ADMIN_DISABLED) {
+      if (st.gw_admin_state == gw_admin_state_t::GW_ADMIN_ENABLED) {
+        dout(4) << "GW-id set admin Disabled " << group_key
+                << " " << gw_id << dendl;
+        if (st.availability == gw_availability_t::GW_AVAILABLE) {
+          skip_failovers_for_group(group_key, 5);
+          process_gw_map_gw_down(gw_id, group_key, propose_pending);
+        }
+        st.gw_admin_state = state;
+        propose_pending = true;
+      }
+    } else if (state == gw_admin_state_t::GW_ADMIN_ENABLED) {
+      if (st.gw_admin_state == gw_admin_state_t::GW_ADMIN_DISABLED) {
+        dout(4) << "GW-id set admin Enabled " << group_key
+                << " " << gw_id << dendl;
+        st.gw_admin_state = state;
+        propose_pending = true;
+      }
+    }
+  } else {
+     dout(4) << "GW-id not created yet " << group_key << " " << gw_id << dendl;
+     return -EINVAL;
+  }
+  return 0;
+}
+
+bool NVMeofGwMap::validate_number_locations(int num_gws, int num_locations)
+{
+  return true; // TODO: add validation in the separate PR
+}
+
+int NVMeofGwMap::cfg_set_location(const NvmeGwId &gw_id,
+    const NvmeGroupKey& group_key,
+    std::string &location, bool &propose_pending) {
+
+  if (!HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF)) {
+    dout(4) << "Command is not allowed - feature is not installed"
+    << group_key << " " << gw_id << dendl;
+    return -EINVAL;
+  }
+  // validate that location differs from gw location
+  auto& gws_states = created_gws[group_key];
+  auto  gw_state = gws_states.find(gw_id);
+  std::set<std::string>  locations;
+  int num_gws = gws_states.size();
+  if (gw_state != gws_states.end()) {
+    auto& st = gw_state->second;
+    if (st.location == location) {
+      dout(4) << "GW-id same location is set " << group_key
+              << " " << gw_id << " " << location << dendl;
+      return -EEXIST;
+    } else { //is_last_gw_in_location
+      bool last_gw = is_last_gw_in_location(gw_id, group_key, st.location);
+      for (auto& states: created_gws[group_key]) {
+        locations.insert(states.second.location);
+      }
+      if (last_gw) { // this location would be removed so erase from set
+        locations.erase(st.location);
+      }
+      locations.insert(location);
+      dout(10) << "num GWs " << num_gws << " num set locations "
+               << locations.size() << dendl;
+      bool rc = validate_number_locations(num_gws, locations.size());
+      if (rc == false) {
+        dout(4) << "defined invalid number of locations "
+        << locations.size() << dendl;
+        return -EINVAL;
+      }
+      if (last_gw) {
+       dout(4) << "remove location:last gw-id " << gw_id << " location "
+               << st.location << dendl;
+       disaster_map_remove_location(group_key, st.location);
+      }
+      st.location = location;
+      dout(10) << "GW-id  location is set " << group_key
+        << " " << gw_id << " " << location << dendl;
+      propose_pending = true;
+      return 0;
+    }
+  } else {
+    dout(4) << "GW-id not created yet " << group_key << " " << gw_id << dendl;
+    return -EINVAL;
+  }
+}
+
+bool NVMeofGwMap::is_last_gw_in_location(const NvmeGwId &gw_id,
+            const NvmeGroupKey& group_key, NvmeLocation& location) {
+  for (auto& states: created_gws[group_key]) {
+    auto &state = states.second;
+    if (state.location == location && states.first != gw_id) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool NVMeofGwMap::is_location_in_disaster(const NvmeGroupKey& group_key,
+           NvmeLocation& location, bool &cleanup_in_process) {
+  auto grp_it = disaster_locations.find(group_key);
+  cleanup_in_process = false;
+  if (grp_it != disaster_locations.end()) {
+    auto &loc_states = grp_it->second;
+    if (loc_states.find(location) != loc_states.end()) {
+      cleanup_in_process =
+        disaster_locations[group_key][location].failbacks_in_process;
+      return true;
+    }
+  }
+  return false;
+}
+
+bool NVMeofGwMap::get_location_in_disaster_cleanup(const NvmeGroupKey& group_key,
+           NvmeLocation& returned_location) {
+  auto grp_it = disaster_locations.find(group_key);
+  if (grp_it != disaster_locations.end()) {
+    for (auto &entry : grp_it->second) {
+      const NvmeLocation &location = entry.first;
+      if (entry.second.failbacks_in_process) { // find first location in recovering state and return
+        returned_location = location;
+        return true;
+      }
+    }
+  }
+  return false;
+}
+
+void NVMeofGwMap::disaster_map_remove_location(const NvmeGroupKey& group_key,
+           NvmeLocation& location) {
+  // this function called when GW with last location removed from the group
+  //or when removed location from the disaster_location map
+  auto grp_it = disaster_locations.find(group_key);
+  if (grp_it != disaster_locations.end()) {
+    auto& locs = grp_it->second;
+    locs.erase(location);
+    if (locs.empty()) {
+      disaster_locations.erase(grp_it);
+    }
+  }
+}
+
+int NVMeofGwMap::cfg_location_disaster_set(
+         const NvmeGroupKey& group_key,
+         std::string &location, bool &propose_pending) {
+
+  if (!HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF)) {
+    dout(4) << "Command is not allowed - feature is not installed"
+            << group_key << dendl;
+    return -EINVAL;
+  }
+  bool cleanup_in_process = false;
+  bool location_exists = false;
+  auto& gws_states = created_gws[group_key];
+  if (is_location_in_disaster(group_key, location, cleanup_in_process)) {
+      dout(4) << "command cannot be accepted since location already in disaster "
+              << location << dendl;
+      return -EEXIST;
+  }
+  // validate: check that all gws in location are not available
+  for (auto& found_gw_state: gws_states) {
+    auto st = found_gw_state.second;
+    if (st.location == location &&
+        st.availability == gw_availability_t::GW_AVAILABLE) {
+      dout(4) << "command cannot be accepted since gw " << found_gw_state.first
+              <<" in location " << location << " is available" << dendl;
+      return -EINVAL;
+    }
+    if (st.location == location) {
+      location_exists = true;
+    }
+  }
+  if (!location_exists) {
+      dout(4) << "command cannot be accepted since in group " << group_key
+             <<" location " << location << " was not configured yet" << dendl;
+      return -EINVAL;
+  }
+  dout(10) << " set disaster location " << location << " in group "
+           << group_key << dendl;
+  disaster_locations[group_key][location];
+  propose_pending = true;
+  return 0;
+}
+
+int NVMeofGwMap::cfg_location_disaster_clear(
+         const NvmeGroupKey& group_key,
+         std::string &location, bool &propose_pending) {
+
+  if (!HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF)) {
+    dout(4) << "Command is not allowed - feature is not installed"
+            << group_key << dendl;
+       return -EINVAL;
+  }
+  auto& gws_states = created_gws[group_key];
+  bool accept = false;
+  bool cleanup_in_process = false;
+  // for all the gateways of the subsystem
+  if (!is_location_in_disaster(group_key, location, cleanup_in_process)) {
+      dout(4) << "command cannot be accepted: in a group " << group_key
+              << " disaster location " << location << " was not found" << dendl;
+      return -EINVAL;
+  } else if (cleanup_in_process) {
+    dout(4)
+    << "command cannot be accepted since recovering already started for group "
+    << group_key << "and location " << location << dendl;
+    return -EEXIST;
+  }
+  for (auto& found_gw_state: gws_states) {
+    auto st = found_gw_state.second;
+    if (st.location == location) {
+      if (st.availability == gw_availability_t::GW_AVAILABLE &&
+        st.sm_state[st.ana_grp_id] == gw_states_per_group_t::GW_STANDBY_STATE) {
+        dout(10) << "command  accepted found gw in state " << st.availability
+                 << " ana grp state " << st.sm_state[st.ana_grp_id] << dendl;
+        accept = true;
+        break;
+      }
+    }
+  }
+  if (accept) { // conditions for a cleanup
+    disaster_locations[group_key][location].failbacks_in_process = true;
+    propose_pending = true;
+    return 0;
+  } else {
+    dout(10) << "command  accepted: but not found AVAILABLE GW"
+          "with ANA grp in standby state so disaster just removed"
+           << dendl;
+    disaster_map_remove_location(group_key, location);
+    propose_pending = true;
+    return 0;
+  }
+}
+
 void  NVMeofGwMap::gw_performed_startup(const NvmeGwId &gw_id,
       const NvmeGroupKey& group_key, bool &propose_pending)
 {
@@ -338,10 +587,16 @@ void NVMeofGwMap::track_deleting_gws(const NvmeGroupKey& group_key,
   }
 }
 
-void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key)
+void NVMeofGwMap::skip_failovers_for_group(const NvmeGroupKey& group_key,
+   int interval_sec)
 {
-  const auto skip_failovers = g_conf().get_val<std::chrono::seconds>
-    ("mon_nvmeofgw_skip_failovers_interval");
+  std::chrono::seconds skip_failovers;
+  if (interval_sec == 0) {
+    skip_failovers = g_conf().get_val<std::chrono::seconds>
+	      ("mon_nvmeofgw_skip_failovers_interval");
+	} else {
+	skip_failovers = std::chrono::seconds(interval_sec);
+  }
   for (auto& gw_created: created_gws[group_key]) {
     gw_created.second.allow_failovers_ts = std::chrono::system_clock::now()
         + skip_failovers;
@@ -386,6 +641,7 @@ int NVMeofGwMap::process_gw_map_gw_down(
     auto& st = gw_state->second;
     st.set_unavailable_state();
     st.set_last_gw_down_ts();
+    st.reset_beacon_sequence();
     for (auto& state_itr: created_gws[group_key][gw_id].sm_state) {
       fsm_handle_gw_down(
 	gw_id, group_key, state_itr.second,
@@ -451,26 +707,29 @@ void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
     for (auto& gw_state : gws_states) { // loop for GWs inside nqn group
       auto& gw_id = gw_state.first;
       NvmeGwMonState& state = gw_state.second;
-
+      bool disaster_cleanup = false;
+      bool in_disaster =
+        is_location_in_disaster(group_key, state.location, disaster_cleanup);
       // 1. Failover missed : is there is a GW in unavailable state?
       // if yes, is its ANA group handled by some other GW?
-      if ((state.availability == gw_availability_t::GW_UNAVAILABLE ||
+      if ( (state.availability == gw_availability_t::GW_UNAVAILABLE ||
           state.availability == gw_availability_t::GW_DELETING ||
-          state.availability == gw_availability_t::GW_CREATED) &&
-	  state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) {
+          state.availability == gw_availability_t::GW_CREATED ||
+	      (in_disaster && !disaster_cleanup)) &&
+ 	      state.ana_grp_id != REDUNDANT_GW_ANA_GROUP_ID) {
 	auto found_gw_for_ana_group = false;
 	for (auto& gw_state2 : gws_states) {
 	  NvmeGwMonState& state2 = gw_state2.second;
 	  if (state2.availability == gw_availability_t::GW_AVAILABLE &&
 	      (state2.sm_state[state.ana_grp_id] ==
 	       gw_states_per_group_t::GW_ACTIVE_STATE)) {
-	    found_gw_for_ana_group = true;
+	    found_gw_for_ana_group = true; // found GW that currently handles the ana group
 	    break;
 	  }
 	}
         // choose the GW for handle ana group
 	if (found_gw_for_ana_group == false) {
-	  dout(20) << "Was not found the GW " << " that handles ANA grp "
+	  dout(10) << "Not found GW  that handles ANA grp "
 		   << (int)state.ana_grp_id << " find candidate "<< dendl;
 	  for (auto& state_itr: created_gws[group_key][gw_id].sm_state) {
 	    find_failover_candidate(gw_id, group_key, state_itr.first, propose);
@@ -485,6 +744,7 @@ void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
 	find_failback_gw(gw_id, group_key, propose);
       }
     }
+    check_relocate_ana_groups(group_key, propose);
     if (propose) {
       validate_gw_map(group_key);
       increment_gw_epoch(group_key);
@@ -492,13 +752,135 @@ void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
   }
 }
 
+void NVMeofGwMap::check_relocate_ana_groups(const NvmeGroupKey& group_key,
+         bool &propose) {
+  /* if location in disaster_locations found in recovering state state - find all gws in location.
+   * add ana-grp of not Available gws to the list.
+   * if ana-grp is already active on some gw in location skip it
+   * for ana-grp in list make relocation.
+   * if all ana-grps in location active remove location from the map disaster_locations.group
+  */
+  if (!HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF)) {
+    dout(4) << "relocate is not allowed - feature is not installed"
+            << group_key << dendl;
+       return ;
+  }
+  std::list<NvmeAnaGrpId>  reloc_list;
+  auto& gws_states = created_gws[group_key];
+  FailbackLocation location;
+  if (get_location_in_disaster_cleanup(group_key, location)) {
+    uint32_t num_gw_in_location = 0;
+    uint32_t num_active_ana_in_location = 0;
+    for (auto& gw_state : gws_states) { // loop for GWs inside group-key
+      NvmeGwMonState& state = gw_state.second;
+      if (state.location == location) {
+        num_gw_in_location ++;
+        if (state.availability != gw_availability_t::GW_AVAILABLE) {
+          reloc_list.push_back(state.ana_grp_id);
+        } else { // in parallel check condition to complete failback-in-process
+          for (auto& state_it: state.sm_state) {
+            if (state_it.second == gw_states_per_group_t::GW_ACTIVE_STATE) {
+              num_active_ana_in_location ++;
+            }
+          }
+        }
+      }
+    }
+    if (num_gw_in_location == num_active_ana_in_location) {// All ana groups of disaster location are in Active
+      disaster_map_remove_location(group_key, location);
+      dout(4) <<  "the location entry is erased "<< location
+          << " from disaster-locations num_ana_groups in location "
+          << num_gw_in_location
+          << " from the failbacks-in-progress of group " << group_key <<dendl;
+      propose = true;
+      return;
+    }
+    // for all ana groups in the list do relocate
+    for (auto& anagrp : reloc_list) {
+      for (auto& gw_state : gws_states) { // loop for GWs inside group-key
+        NvmeGwMonState& state = gw_state.second;
+        if (state.sm_state[anagrp] == gw_states_per_group_t::GW_ACTIVE_STATE) {
+          if (state.location == location) { // already relocated to the location
+            dout(10) << "ana " << anagrp << " already in " << location << dendl;
+            break;
+          } else { // try to relocate
+              dout(10) << "ana " << anagrp
+                  << " to relocate to " << location << dendl;
+              relocate_ana_grp(gw_state.first, group_key, anagrp,
+                    location, propose);
+          }
+        }
+      }
+    }
+  }
+}
+
+int NVMeofGwMap::relocate_ana_grp(const NvmeGwId &src_gw_id,
+  const NvmeGroupKey& group_key, NvmeAnaGrpId grpid, NvmeLocation& location,
+  bool &propose){
+  /* grpid should be in Active state on src_gw_id as precondition
+   * function tries to found the minimum loaded gw in location
+   * if found, relocate ana grp to it using regular failback FSM
+  */
+  constexpr uint32_t   MAX_NUM_ANA_GROUPS_FOR_RELOCATE = 0xFFF;
+  uint32_t min_num_ana_groups_in_gw =  MAX_NUM_ANA_GROUPS_FOR_RELOCATE;
+  NvmeGwId min_gw_id;
+  auto& gws_states = created_gws[group_key];
+
+  for (auto& gw_state : gws_states) { // loop for GWs inside group-key
+    NvmeGwMonState& state = gw_state.second;
+    uint32_t current_ana_groups_in_gw = 0;
+    if (state.location == location && state.availability ==
+        gw_availability_t::GW_AVAILABLE) {
+     //find minimum loaded gw in location for relocate anagr to it
+      for (auto& state_itr: state.sm_state) {
+        NvmeAnaGrpId anagrp = state_itr.first;
+        // if found some gw in intermediate state - exit the process
+        // need to calculate the correct loading of Active groups
+        if ( (state.sm_state[anagrp] !=
+           gw_states_per_group_t::GW_ACTIVE_STATE) &&
+            (state.sm_state[anagrp] !=
+                gw_states_per_group_t::GW_STANDBY_STATE)
+           ) {
+          dout(10) << "relocatе: found gw in intermediate state "
+           << gw_state.first << " state " << state.sm_state[anagrp] << dendl;
+          return 0;
+        }
+        if (state.sm_state[anagrp] ==
+          gw_states_per_group_t::GW_ACTIVE_STATE) {
+          // how many ANA groups are handled by this GW
+          current_ana_groups_in_gw ++;
+          if (current_ana_groups_in_gw < min_num_ana_groups_in_gw ) {
+            min_num_ana_groups_in_gw = current_ana_groups_in_gw;
+            min_gw_id = gw_state.first;
+          }
+        }
+      }
+    }
+  }
+  dout(10) << "found min loaded gw for relocate " << min_gw_id << " location "
+      << location << "min load " << min_num_ana_groups_in_gw << dendl;
+
+  if (min_num_ana_groups_in_gw <  MAX_NUM_ANA_GROUPS_FOR_RELOCATE) {
+    dout(4) << "relocate starts " << grpid << " location " << location << dendl;
+    gws_states[src_gw_id].sm_state[grpid] =
+       gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED;
+    // Add timestamp of start Failback preparation
+    start_timer(src_gw_id, group_key, grpid, 3);
+    gws_states[min_gw_id].sm_state[grpid] =
+       gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED;
+    propose = true;
+  }
+  return 0;
+}
+
 void NVMeofGwMap::set_failover_gw_for_ANA_group(
   const NvmeGwId &failed_gw_id, const NvmeGroupKey& group_key,
   const NvmeGwId &gw_id, NvmeAnaGrpId ANA_groupid)
 {
   NvmeGwMonState& gw_state = created_gws[group_key][gw_id];
   NvmeGwMonState& failed_gw_state = created_gws[group_key][failed_gw_id];
-  epoch_t epoch;
+  epoch_t epoch = 0;
   dout(10) << "Found failover GW " << gw_id
 	   << " for ANA group " << (int)ANA_groupid << dendl;
   if (failed_gw_state.availability == gw_availability_t::GW_CREATED) {
@@ -529,7 +911,15 @@ void NVMeofGwMap::find_failback_gw(
   auto& gws_states = created_gws[group_key];
   auto& gw_state = created_gws[group_key][gw_id];
   bool do_failback = false;
-  dout(10) << "Find failback GW for GW " << gw_id << dendl;
+  bool allow_inter_location = true;
+  bool cleanup_in_process = false;
+  if (is_location_in_disaster(group_key, gw_state.location, cleanup_in_process)) {
+    if (!cleanup_in_process) {
+      allow_inter_location = false;
+    }
+  }
+  dout(10) << "Find failback GW for GW " << gw_id << "location "
+           << gw_state.location << dendl;
   for (auto& gw_state_it: gws_states) {
     auto& st = gw_state_it.second;
     // some other gw owns or owned the desired ana-group
@@ -547,6 +937,12 @@ void NVMeofGwMap::find_failback_gw(
   if (do_failback == false) {
     // No other gw currently performs some activity with desired ana
     // group of coming-up GW - so it just takes over on the group
+    if(allow_inter_location == false) {
+      dout (10) << "Failback GW candidate was not found but "
+                << "location of the gw " << gw_id
+                << "is currently in disaster state" <<dendl;
+      return;
+    }
     dout(10) << "Failback GW candidate was not found, "
 	     << "just set Optimized to group " << gw_state.ana_grp_id
 	     << " to GW " << gw_id << dendl;
@@ -564,7 +960,13 @@ void NVMeofGwMap::find_failback_gw(
       dout(10)  << "Found Failback GW " << failback_gw_id
 		<< " that previously took over the ANAGRP "
 		<< gw_state.ana_grp_id << " of the available GW "
-		<< gw_id << dendl;
+		<< gw_id << "location " << st.location << dendl;
+      if (st.location != gw_state.location && !allow_inter_location ) {
+        //not allowed inter-location failbacks
+        dout(10) << "not allowed interlocation failbacks. GW "
+        << gw_id << dendl;
+        return;
+      }
       st.sm_state[gw_state.ana_grp_id] =
 	gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED;
 
@@ -578,24 +980,81 @@ void NVMeofGwMap::find_failback_gw(
   }
 }
 
+
+int  NVMeofGwMap::find_failover_gw_logic(const NvmeGroupKey& group_key,
+                  NvmeGwMonStates& gws_states, NvmeLocation& location,
+                   NvmeGwId& min_loaded_gw_id, bool ignore_locations)
+{
+#define ILLEGAL_GW_ID " "
+#define MIN_NUM_ANA_GROUPS 0xFFF
+    int min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS;
+    min_loaded_gw_id = ILLEGAL_GW_ID;
+    int active_ana_groups_in_gw = 0;
+    int num_busy = 0, num_gws = 0;
+    // for all the gateways of the subsystem
+    // find the gws  related to the same location as in anagrp
+    for (auto& found_gw_state: gws_states) {
+      auto st = found_gw_state.second;
+      if ((st.availability == gw_availability_t::GW_AVAILABLE) &&
+          (ignore_locations || st.location == location)) {
+	num_gws ++;
+	active_ana_groups_in_gw = 0;
+	bool cleanup_in_process = false;
+	if (is_location_in_disaster(group_key, st.location, cleanup_in_process)) {
+	  continue;
+	}
+	for (auto& state_itr: st.sm_state) {
+	  NvmeAnaGrpId anagrp = state_itr.first;
+	  if ((st.sm_state[anagrp] ==
+	       gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED) ||
+	      (st.sm_state[anagrp] ==
+	       gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED) ||
+	      (st.sm_state[anagrp] ==
+	       gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL)) {
+	    active_ana_groups_in_gw = 0xFFFF;
+	    num_busy ++;
+	    break; // dont take into account   GWs in the transitive state
+	  } else if (st.sm_state[anagrp] ==
+		     gw_states_per_group_t::GW_ACTIVE_STATE) {
+            // how many ANA groups are handled by this GW
+	    active_ana_groups_in_gw++;
+	  }
+	}
+	if (min_num_ana_groups_in_gw > active_ana_groups_in_gw) {
+	  min_num_ana_groups_in_gw = active_ana_groups_in_gw;
+	  min_loaded_gw_id = found_gw_state.first;
+	  dout(10) << "choose: gw-id  min_ana_groups " << min_loaded_gw_id
+		   << active_ana_groups_in_gw << " min "
+		   << min_num_ana_groups_in_gw << dendl;
+	}
+      }
+    }
+    if (min_loaded_gw_id !=ILLEGAL_GW_ID) { // some GW choosen
+      return 0;
+    } else if (num_busy) {
+      dout(4) << "some GWs are busy " << num_busy
+              << "num Available " << num_gws << dendl;
+      return -EBUSY;
+    } else {
+      dout(4) << "no GWs in Active state. num Available " << num_gws << dendl;
+      return -ENOENT;
+    }
+}
+
 void  NVMeofGwMap::find_failover_candidate(
   const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
   NvmeAnaGrpId grpid, bool &propose_pending)
 {
   dout(10) << __func__<< " " << gw_id << dendl;
-#define ILLEGAL_GW_ID " "
-#define MIN_NUM_ANA_GROUPS 0xFFF
-  int min_num_ana_groups_in_gw = 0;
-  int current_ana_groups_in_gw = 0;
   std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
   NvmeGwId min_loaded_gw_id = ILLEGAL_GW_ID;
   auto& gws_states = created_gws[group_key];
   auto gw_state = gws_states.find(gw_id);
+  NvmeLocation ana_location = gw_state->second.location;
   if (gw_state->second.allow_failovers_ts > now) {
     dout(4) << "gw " << gw_id << " skip-failovers is set " << dendl;
     return;
   }
-
   // this GW may handle several ANA groups and  for each
   // of them need to found the candidate GW
   if ((gw_state->second.sm_state[grpid] ==
@@ -615,38 +1074,14 @@ void  NVMeofGwMap::find_failover_candidate(
       }
     }
     // Find a GW that takes over the ANA group(s)
-    min_num_ana_groups_in_gw = MIN_NUM_ANA_GROUPS;
-    min_loaded_gw_id = ILLEGAL_GW_ID;
-
-    // for all the gateways of the subsystem
-    for (auto& found_gw_state: gws_states) {
-      auto st = found_gw_state.second;
-      if (st.availability == gw_availability_t::GW_AVAILABLE) {
-	current_ana_groups_in_gw = 0;
-	for (auto& state_itr: created_gws[group_key][gw_id].sm_state) {
-	  NvmeAnaGrpId anagrp = state_itr.first;
-	  if ((st.sm_state[anagrp] ==
-	       gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED) ||
-	      (st.sm_state[anagrp] ==
-	       gw_states_per_group_t::GW_WAIT_FAILBACK_PREPARED) ||
-	      (st.sm_state[anagrp] ==
-	       gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL)) {
-	    current_ana_groups_in_gw = 0xFFFF;
-	    break; // dont take into account   GWs in the transitive state
-	  } else if (st.sm_state[anagrp] ==
-		     gw_states_per_group_t::GW_ACTIVE_STATE) {
-            // how many ANA groups are handled by this GW
-	    current_ana_groups_in_gw++;
-	  }
-	}
-	if (min_num_ana_groups_in_gw > current_ana_groups_in_gw) {
-	  min_num_ana_groups_in_gw = current_ana_groups_in_gw;
-	  min_loaded_gw_id = found_gw_state.first;
-	  dout(10) << "choose: gw-id  min_ana_groups " << min_loaded_gw_id
-		   << current_ana_groups_in_gw << " min "
-		   << min_num_ana_groups_in_gw << dendl;
-	}
-      }
+    //Find GW among the GWs belong to the same location
+    int rc = find_failover_gw_logic(group_key, gws_states, ana_location,
+               min_loaded_gw_id, false);
+    if (rc == -ENOENT) {
+      // looks at all GWs
+      dout(10) << "Find Failover GW -look at all Gateways in the pool/group" << dendl;
+      rc = find_failover_gw_logic(group_key, gws_states, ana_location,
+                 min_loaded_gw_id, true);
     }
     if (min_loaded_gw_id != ILLEGAL_GW_ID) {
       propose_pending = true;
@@ -875,7 +1310,8 @@ void NVMeofGwMap::fsm_handle_to_expired(
     for (auto& gw_state: created_gws[group_key]) {
       auto& st = gw_state.second;
       // group owner
-      if (st.ana_grp_id == grpid) {
+      if (st.sm_state[grpid] ==
+          gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED) {
 	grp_owner_found = true;
 	if (st.availability == gw_availability_t::GW_AVAILABLE) {
 	  if (!(fbp_gw_state.last_gw_map_epoch_valid &&
@@ -889,34 +1325,29 @@ void NVMeofGwMap::fsm_handle_to_expired(
 	}
 	cancel_timer(gw_id, group_key, grpid);
 	map_modified = true;
-	if ((st.sm_state[grpid] ==
-	     gw_states_per_group_t::GW_OWNER_WAIT_FAILBACK_PREPARED) &&
-	    (st.availability == gw_availability_t::GW_AVAILABLE)) {
-          // Previous failover GW  set to standby
-	  fbp_gw_state.standby_state(grpid);
+	// unconditionaly previous failover GW  set to standby
+	fbp_gw_state.standby_state(grpid);
+	if (st.availability == gw_availability_t::GW_AVAILABLE) {
 	  st.active_state(grpid);
 	  dout(10)  << "Expired Failback-preparation timer from GW "
-		    << gw_id << " ANA groupId "<< grpid << dendl;
-	  map_modified = true;
+	            << gw_id << " ANA groupId "<< grpid << dendl;
 	  break;
-	} else if ((st.sm_state[grpid] ==
-		    gw_states_per_group_t::GW_STANDBY_STATE) &&
-		   (st.availability == gw_availability_t::GW_AVAILABLE)) {
-          // GW failed during the persistency interval
-	  st.standby_state(grpid);
-	  dout(10)  << "Failback unsuccessfull. GW: " << gw_state.first
-		    << " becomes Standby for the ANA groupId " << grpid << dendl;
 	}
-	fbp_gw_state.standby_state(grpid);
-	dout(10) << "Failback unsuccessfull GW: " << gw_id
-		 << " becomes Standby for the ANA groupId " << grpid  << dendl;
-	map_modified = true;
-	break;
+	else {
+      st.standby_state(grpid);
+      dout(10) << "GW failed during failback/relocation persistency interval"
+               << gw_state.first << dendl;
+    }
       }
     }
     if (grp_owner_found == false) {
-      // when  GW group owner is deleted the fbk gw is put to standby
-      dout(4) << "group owner not found " << grpid << " GW: " << gw_id << dendl;
+      cancel_timer(gw_id, group_key, grpid);
+      fbp_gw_state.standby_state(grpid);
+      dout(4) << "group owner/relocation target not found "
+              << grpid << " GW: " << gw_id << dendl;
+      dout(10) << "Failback unsuccessfull GW: " << gw_id
+               << " becomes Standby for the ANA groupId " << grpid  << dendl;
+      map_modified = true;
     }
   } else if (fbp_gw_state.sm_state[grpid] ==
 	     gw_states_per_group_t::GW_WAIT_BLOCKLIST_CMPL) {
@@ -1041,14 +1472,23 @@ int NVMeofGwMap::blocklist_gw(
   // find_already_created_gw(gw_id, group_key);
   NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
   NvmeNonceVector nonces;
+
+  NvmeAnaNonceMap nonce_map;
+  for (const auto& sub: gw_map.subsystems) { // recreate nonce map from subsystems
+    for (const auto& ns: sub.namespaces) {
+	  auto& nonce_vec = nonce_map[ns.anagrpid-1]; //Converting  ana groups to offsets
+	  if (std::find(nonce_vec.begin(), nonce_vec.end(), ns.nonce) == nonce_vec.end())
+	    nonce_vec.push_back(ns.nonce);
+    }
+  }
   for (auto& state_itr: gw_map.sm_state) {
     // to make blocklist on all clusters of the failing GW
-    nonces.insert(nonces.end(), gw_map.nonce_map[state_itr.first].begin(),
-        gw_map.nonce_map[state_itr.first].end());
+    nonces.insert(nonces.end(), nonce_map[state_itr.first].begin(),
+        nonce_map[state_itr.first].end());
   }
-
+  gw_map.subsystems.clear();
   if (nonces.size() > 0) {
-    NvmeNonceVector &nonce_vector = gw_map.nonce_map[grpid];;
+    NvmeNonceVector &nonce_vector = nonces;
     std::string str = "[";
     entity_addrvec_t addr_vect;
 
@@ -1122,6 +1562,45 @@ void  NVMeofGwMap::validate_gw_map(const NvmeGroupKey& group_key)
   }
 }
 
+bool NVMeofGwMap::put_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+	  int gw_version, const NvmeGroupKey& group_key,
+	  uint64_t beacon_sequence, uint64_t& old_sequence)
+{
+  bool rc = true;
+  NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+
+  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+		  (gw_version > 0) ) {
+    uint64_t seq_number = gw_map.beacon_sequence;
+    if ((beacon_sequence != seq_number+1) &&
+        !(beacon_sequence == 0 && seq_number == 0 )) {// new GW startup
+        rc = false;
+        old_sequence = seq_number;
+        dout(4) << "Warning: GW " << gw_id
+                << " sent beacon sequence out of order, expected "
+                << seq_number +1 << " received " << beacon_sequence << dendl;
+        gw_map.beacon_sequence_ooo = true;
+    } else {
+      gw_map.beacon_sequence = beacon_sequence;
+    }
+  }
+  return rc;
+}
+
+bool NVMeofGwMap::set_gw_beacon_sequence_number(const NvmeGwId &gw_id,
+	 int gw_version, const NvmeGroupKey& group_key, uint64_t beacon_sequence)
+{
+  NvmeGwMonState& gw_map = created_gws[group_key][gw_id];
+  if (HAVE_FEATURE(mon->get_quorum_con_features(), NVMEOF_BEACON_DIFF) ||
+		  (gw_version > 0)) {
+      gw_map.beacon_sequence = beacon_sequence;
+      gw_map.beacon_sequence_ooo = false;
+      dout(10) << gw_id << " set beacon_sequence " << beacon_sequence << dendl;
+  }
+  return true;
+}
+
+
 void NVMeofGwMap::update_active_timers(bool &propose_pending)
 {
   const auto now = std::chrono::system_clock::now();
diff --git a/src/mon/NVMeofGwMap.h b/src/mon/NVMeofGwMap.h
index 015577f248a..f5af47d8ac5 100755
--- a/src/mon/NVMeofGwMap.h
+++ b/src/mon/NVMeofGwMap.h
@@ -27,7 +27,11 @@
 #include "NVMeofGwTypes.h"
 
 using ceph::coarse_mono_clock;
-
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define MODULE_PREFFIX "nvmeofgw "
+#define dout_prefix *_dout << MODULE_PREFFIX << __PRETTY_FUNCTION__ << " "
 class health_check_map_t;
 
 class Monitor;
@@ -61,11 +65,34 @@ public:
    */
   std::map<NvmeGroupKey, epoch_t> gw_epoch;
 
+  /* in stretched cluster configuration:
+   * map stores disaster state per location - can be several locations in disaster state
+   * location in disaster:
+   *   failbacks/failovers to disaster location are not allowed
+   * in disaster recovery state:
+   *  periodic process to relocate ana groups, allowed failbacks between locations
+   * disaster_locations map should be updated when last location is removed
+   * */
+  std::map<NvmeGroupKey, LocationStates> disaster_locations;
+
   void to_gmap(std::map<NvmeGroupKey, NvmeGwMonClientStates>& Gmap) const;
   void track_deleting_gws(const NvmeGroupKey& group_key,
     const BeaconSubsystems&  subs, bool &propose_pending);
-  int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
+  void check_all_gws_in_deleting_state(const NvmeGwId &gw_id,
+    const NvmeGroupKey& group_key);
+  int cfg_add_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+    uint64_t features);
   int cfg_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
+  int cfg_admin_state_change(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+		  gw_admin_state_t state, bool &propose_pending);
+  int cfg_set_location(const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
+		  std::string &location, bool &propose_pending);
+  bool is_last_gw_in_location(const NvmeGwId &gw_id,
+              const NvmeGroupKey& group_key, NvmeLocation& location);
+  int cfg_location_disaster_set(const NvmeGroupKey& group_key,
+          std::string &location, bool &propose_pending);
+  int cfg_location_disaster_clear(const NvmeGroupKey& group_key,
+          std::string &location, bool &propose_pending);
   void process_gw_map_ka(
     const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
     epoch_t& last_osd_epoch,  bool &propose_pending);
@@ -89,7 +116,15 @@ public:
        const NvmeGroupKey& group_key, bool &propose_pending);
   void set_addr_vect(const NvmeGwId &gw_id,
       const NvmeGroupKey& group_key, const entity_addr_t &addr_vect);
-  void skip_failovers_for_group(const NvmeGroupKey& group_key);
+  void skip_failovers_for_group(const NvmeGroupKey& group_key,
+      int interval_sec = 0);
+  bool put_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+      const NvmeGroupKey& group_key, uint64_t beacon_sequence,
+      uint64_t& old_sequence);
+  bool set_gw_beacon_sequence_number(const NvmeGwId &gw_id, int gw_version,
+         const NvmeGroupKey& group_key, uint64_t beacon_sequence);
+  bool is_location_in_disaster(const NvmeGroupKey& group_key,
+             NvmeLocation& location, bool &cleanup_in_process);
 private:
   int  do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
   int  do_erase_gw_id(const NvmeGwId &gw_id,
@@ -136,17 +171,33 @@ private:
   void validate_gw_map(
     const NvmeGroupKey& group_key);
   void increment_gw_epoch(const NvmeGroupKey& group_key);
+  int find_failover_gw_logic(const NvmeGroupKey& group_key,
+    NvmeGwMonStates& gws_states, NvmeLocation& location,
+    NvmeGwId& min_loaded_gw_id, bool ignore_locations);
+  bool get_location_in_disaster_cleanup(const NvmeGroupKey& group_key,
+             NvmeLocation& returned_location);
+  void disaster_map_remove_location(const NvmeGroupKey& group_key,
+             NvmeLocation& location);
+  bool validate_number_locations(int num_gws, int num_locations);
+  void check_relocate_ana_groups(const NvmeGroupKey& group_key,
+           bool &propose);
+  int relocate_ana_grp(const NvmeGwId &src_gw_id,
+   const NvmeGroupKey& group_key, NvmeAnaGrpId grpid,
+   NvmeLocation& location, bool &propose);
 
 public:
   int blocklist_gw(
     const NvmeGwId &gw_id, const NvmeGroupKey& group_key,
     NvmeAnaGrpId ANA_groupid, epoch_t &epoch, bool failover);
 
-  void encode(ceph::buffer::list &bl, uint64_t features) const {
+  void encode(ceph::buffer::list &bl, uint64_t features) const  {
     using ceph::encode;
     uint8_t version = 1;
     if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
-       version = 2;
+      version = 2;
+    }
+    if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+      version = 3;
     }
     ENCODE_START(version, version, bl);
     encode(epoch, bl);// global map epoch
@@ -156,18 +207,29 @@ public:
     if (version >= 2) {
       encode(gw_epoch, bl);
     }
+    if (version >=3) {
+      encode(disaster_locations, bl);
+    }
     ENCODE_FINISH(bl);
   }
 
-  void decode(ceph::buffer::list::const_iterator &bl) {
+  void  decode(ceph::buffer::list::const_iterator &bl) {
     using ceph::decode;
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
 
     decode(epoch, bl);
+    dout(20)  << "decode epoch "  << dendl;
     decode(created_gws, bl);
+    dout(20)  << "decode created gws "  << dendl;
     decode(fsm_timers, bl);
+    dout(20)  << "decode fsm timers "  << dendl;
     if (struct_v >= 2) {
       decode(gw_epoch, bl);
+      dout(20)  << "decode gw epoch "  << dendl;
+    }
+    if (struct_v >=3) {
+      decode(disaster_locations, bl);
+      dout(20)  << "decode  disaster location "  << dendl;
     }
     DECODE_FINISH(bl);
   }
diff --git a/src/mon/NVMeofGwMon.cc b/src/mon/NVMeofGwMon.cc
index f34479a7333..71181a9d2a0 100644
--- a/src/mon/NVMeofGwMon.cc
+++ b/src/mon/NVMeofGwMon.cc
@@ -68,6 +68,9 @@ void NVMeofGwMon::synchronize_last_beacon()
       // force send ack after nearest beacon after leader re-election
       gw_created_pair.second.beacon_index =
           g_conf().get_val<uint64_t>("mon_nvmeofgw_beacons_till_ack");
+     // force send full beacon after leader election
+      gw_created_pair.second.beacon_sequence = 0;
+      gw_created_pair.second.beacon_sequence_ooo = true;
     }
   }
 }
@@ -144,7 +147,9 @@ void NVMeofGwMon::tick()
   for (auto &[group_key, gws_states]: pending_map.created_gws) {
     BeaconSubsystems *subsystems = &empty_subsystems;
     for (auto& gw_state : gws_states) { // loop for GWs inside nqn group
-      subsystems = &gw_state.second.subsystems;
+      if (gw_state.second.availability == gw_availability_t::GW_AVAILABLE) {
+        subsystems = &gw_state.second.subsystems;
+      }
       if (subsystems->size()) { // Set subsystems to the valid value
         break;
       }
@@ -178,7 +183,7 @@ version_t NVMeofGwMon::get_trim_to() const
  * function called to restore in pending map all data that is not serialized
  * to paxos peons. Othervise it would be overriden in "pending_map = map"
  * currently "allow_failovers_ts", "last_gw_down_ts",
- * "last_gw_map_epoch_valid" variables are restored
+ * "last_gw_map_epoch_valid", "beacon_sequence", "beacon_index" variables are restored
  */
 void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) {
   std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
@@ -202,6 +207,12 @@ void NVMeofGwMon::restore_pending_map_info(NVMeofGwMap & tmp_map) {
           gw_created_pair.second.last_gw_down_ts;
       pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
 	  gw_created_pair.second.last_gw_map_epoch_valid;
+      pending_map.created_gws[group_key][gw_id].beacon_index =
+            gw_created_pair.second.beacon_index;
+      pending_map.created_gws[group_key][gw_id].beacon_sequence =
+            gw_created_pair.second.beacon_sequence;
+      pending_map.created_gws[group_key][gw_id].beacon_sequence_ooo =
+            gw_created_pair.second.beacon_sequence_ooo;
     }
   }
 }
@@ -240,7 +251,8 @@ void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t)
   }
   pending_map.encode(bl, features);
   dout(10) << " has NVMEOFHA: " << HAVE_FEATURE(features, NVMEOFHA)
-       << " has NVMEOFHAMAP: " << HAVE_FEATURE(features, NVMEOFHAMAP) << dendl;
+       << " has NVMEOFHAMAP: " <<  HAVE_FEATURE(features, NVMEOFHAMAP)
+       << " has BEACON_DIFF: " <<  HAVE_FEATURE(features, NVMEOF_BEACON_DIFF) << dendl;
   put_version(t, pending_map.epoch, bl);
   put_last_committed(t, pending_map.epoch);
 
@@ -253,7 +265,9 @@ void NVMeofGwMon::encode_pending(MonitorDBStore::TransactionRef t)
 void NVMeofGwMon::update_from_paxos(bool *need_bootstrap)
 {
   version_t version = get_last_committed();
-
+  uint64_t features = mon.get_quorum_con_features();
+  dout(20) << " has BEACON_DIFF: " <<  HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)
+		   << dendl;
   if (version != map.epoch) {
     dout(10) << " NVMeGW loading version " << version
 	     << " " << map.epoch << dendl;
@@ -502,13 +516,26 @@ bool NVMeofGwMon::preprocess_command(MonOpRequestRef op)
 	f->open_object_section("stat");
 	f->dump_string("gw-id", gw_id);
 	f->dump_unsigned("anagrp-id",state.ana_grp_id+1);
+	f->dump_string("location", state.location);
+	bool cleanup_in_process;
+	bool is_disaster = map.is_location_in_disaster
+	             (group_key, state.location, cleanup_in_process);
+	if (is_disaster) {
+	  std::string disaster_state = (cleanup_in_process) ? "Disaster-cleanup":
+	                               "Disaster";
+	  f->dump_string("disaster state", disaster_state);
+	}
+	std::string admin_state = (state.gw_admin_state ==
+	    gw_admin_state_t::GW_ADMIN_ENABLED) ? "ENABLED" : "DISABLED";
+	f->dump_string("admin state", admin_state);
 	f->dump_unsigned("num-namespaces", num_ns[state.ana_grp_id+1]);
 	f->dump_unsigned("performed-full-startup", state.performed_full_startup);
 	std::stringstream  sstrm1;
 	sstrm1 << state.availability;
 	f->dump_string("Availability", sstrm1.str());
 	uint32_t num_listeners = 0;
-	if (state.availability == gw_availability_t::GW_AVAILABLE) {
+	if ((state.availability == gw_availability_t::GW_AVAILABLE) ||
+	    (state.availability == gw_availability_t::GW_CREATED)) {
 	  for (auto &subs: state.subsystems) {
 	    num_listeners += subs.listeners.size();
 	  }
@@ -633,7 +660,8 @@ bool NVMeofGwMon::prepare_command(MonOpRequestRef op)
     auto group_key = std::make_pair(pool, group);
     dout(10) << " id "<< id <<" pool "<< pool << " group "<< group << dendl;
     if (prefix == "nvme-gw create") {
-      rc = pending_map.cfg_add_gw(id, group_key);
+      rc = pending_map.cfg_add_gw(id, group_key,
+		   mon.get_quorum_con_features());
       if (rc == -EINVAL) {
 	err = rc;
 	dout (4) << "Error: GW cannot be created " << id
@@ -659,18 +687,115 @@ bool NVMeofGwMon::prepare_command(MonOpRequestRef op)
       response = true;
     }
   }
+  else if (prefix == "nvme-gw enable" || prefix == "nvme-gw disable") {
 
+    std::string id, pool, group;
+    cmd_getval(cmdmap, "id", id);
+    cmd_getval(cmdmap, "pool", pool);
+    cmd_getval(cmdmap, "group", group);
+    auto group_key = std::make_pair(pool, group);
+    dout(10) << " id "<< id <<" pool "<< pool << " group "<< group
+             << " " << prefix << dendl;
+    gw_admin_state_t set =  (prefix == "nvme-gw enable") ?
+             gw_admin_state_t::GW_ADMIN_ENABLED :
+             gw_admin_state_t::GW_ADMIN_DISABLED;
+    bool propose = false;
+    rc = pending_map.cfg_admin_state_change(id, group_key, set, propose);
+    if (rc == -EINVAL) {
+      err = rc;
+      dout (4) << "Error: GW cannot be set to admin state " << id
+          << " " << pool << " " << group << "  rc " << rc << dendl;
+      sstrm.str("");
+    }
+    // propose pending would be generated by the PaxosService
+    if (rc == 0 && propose == true) {
+      response = true;
+    }
+  } else if (prefix == "nvme-gw set-location") {
+
+    std::string id, pool, group, location;
+    cmd_getval(cmdmap, "id", id);
+    cmd_getval(cmdmap, "pool", pool);
+    cmd_getval(cmdmap, "group", group);
+    cmd_getval(cmdmap, "location", location);
+    auto group_key = std::make_pair(pool, group);
+    dout(10) << " id "<< id <<" pool "<< pool << " group "<< group
+             <<" location "<< location << dendl;
+    bool propose = false;
+    rc = pending_map.cfg_set_location(id, group_key, location, propose);
+    if (rc == -EINVAL || rc == -EEXIST) {
+      err = rc;
+      dout (4) << "Error: GW cannot  set location " << id
+           << " " << pool << " " << group << "  rc " << rc << dendl;
+      sstrm.str("");
+      if (rc == -EEXIST) {
+        sstrm.str("The location is already set");
+      }
+    }
+    // propose pending would be generated by the PaxosService
+    if (rc == 0 && propose == true) {
+      response = true;
+    }
+  } else if (prefix == "nvme-gw disaster-set") {
+    std::string id, pool, group, location;
+    bool propose = false;
+    cmd_getval(cmdmap, "pool", pool);
+    cmd_getval(cmdmap, "group", group);
+    cmd_getval(cmdmap, "location", location);
+    auto group_key = std::make_pair(pool, group);
+    dout(10) << id <<" pool "<< pool << " group "<< group
+             <<" location "<< location << dendl;
+    rc = pending_map.cfg_location_disaster_set(group_key,
+                     location, propose);
+    if (rc == -EINVAL || rc == -EEXIST) {
+      err = rc;
+      sstrm.str("");
+      if (rc == -EEXIST) {
+        sstrm.str("command already set please wait until completed");
+      }
+      if (rc == -EINVAL) {
+        sstrm.str("command cannot be executed");
+      }
+    }
+    if (rc == 0 && propose == true) {
+      response = true;
+    }
+  } else if (prefix == "nvme-gw disaster-clear") {
+      std::string pool, group, location;
+      bool propose = false;
+      cmd_getval(cmdmap, "pool", pool);
+      cmd_getval(cmdmap, "group", group);
+      cmd_getval(cmdmap, "location", location);
+      auto group_key = std::make_pair(pool, group);
+      dout(10) << " pool "<< pool << " group "<< group
+               <<" location "<< location << dendl;
+      rc = pending_map.cfg_location_disaster_clear(group_key,
+                       location, propose);
+      if (rc == -EINVAL || rc == -EEXIST) {
+        err = rc;
+        sstrm.str("");
+        if (rc == -EEXIST) {
+          sstrm.str("command already set please wait until completed");
+        }
+        if (rc == -EINVAL) {
+          sstrm.str("command cannot be executed");
+        }
+      }
+      if (rc == 0 && propose == true) {
+        response = true;
+      }
+    }
   getline(sstrm, rs);
   if (response == false) {
     if (err < 0 && rs.length() == 0) {
       rs = cpp_strerror(err);
       dout(10) << "Error command  err : "<< err  << " rs-len: "
-	       << rs.length() <<  dendl;
+               << rs.length() <<  dendl;
     }
     mon.reply_command(op, err, rs, rdata, get_last_committed());
   } else {
     wait_for_commit(op, new Monitor::C_Command(mon, op, 0, rs,
-					       get_last_committed() + 1));
+       get_last_committed() + 1));
   }
   return response;
 }
@@ -717,23 +842,179 @@ epoch_t NVMeofGwMon::get_ack_map_epoch(bool gw_created,
   return rc;
 }
 
+void NVMeofGwMon::do_send_map_ack(MonOpRequestRef op,
+	bool gw_created, bool gw_propose,
+    uint64_t stored_sequence, bool is_correct_sequence,
+    const NvmeGroupKey& group_key, const NvmeGwId &gw_id) {
+  /* always send beacon ack to gw in Created state,
+   * it should be temporary state
+   * if epoch-filter-bit: send ack to beacon in case no propose
+   * or if changed something not relevant to gw-epoch
+  */
+  NVMeofGwMap ack_map;
+  if (gw_created) {
+	NvmeGwMonState& pending_gw_map = pending_map.created_gws[group_key][gw_id];
+    // respond with a map slice correspondent to the same GW
+    ack_map.created_gws[group_key][gw_id] = (gw_propose) ? //avail = CREATED
+      pending_gw_map : map.created_gws[group_key][gw_id];
+    ack_map.created_gws[group_key][gw_id].beacon_sequence =
+      pending_gw_map.beacon_sequence;
+    if (!is_correct_sequence) {
+      dout(4) << " GW " << gw_id <<
+      " sending ACK due to receiving beacon_sequence out of order" << dendl;
+      ack_map.created_gws[group_key][gw_id].beacon_sequence = stored_sequence;
+      ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = true;
+    } else {
+        ack_map.created_gws[group_key][gw_id].beacon_sequence_ooo = false;
+    }
+    if (gw_propose) {
+     dout(10) << "GW in Created " << gw_id << " ack map " << ack_map << dendl;
+    }
+  }
+  ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
+  if (!gw_created)
+    dout(10) << "gw not created, ack map "
+             << ack_map << " epoch " << ack_map.epoch << dendl;
+  dout(20) << "ack_map " << ack_map <<dendl;
+  auto msg = make_message<MNVMeofGwMap>(ack_map);
+  mon.send_reply(op, msg.detach());
+}
+
+/*
+ * Any subsystem was added to the Beacon only if it( or it's encapsulated fields)
+ * was changed, added or deleted. If nothing changed the beacon did not
+ * encode any subsystem.
+ * New descriptor was added under the subsystem to describe
+ * the change : ADDED, DELETED, CHANGED
+ * rules for apply the subsystems to the map:
+ * Pass all subsystems in the beacon->sub list
+ * if descriptor is ADDED or CHANGED do the following
+ * look for subs nqn in the gw.subs list and if found - substitute,
+ * if not found - add
+ * if descriptor is DELETED do the following
+ * look for subs nqn in the gw.subs list and if found - delete
+ */
+int NVMeofGwMon::apply_beacon(const NvmeGwId &gw_id, int gw_version,
+        const NvmeGroupKey& group_key, void *msg,
+        const BeaconSubsystems& sub, gw_availability_t &avail,
+        bool &propose_pending)
+{
+  bool found = false;
+  bool changed = false;
+  BeaconSubsystems &gw_subs =
+                     pending_map.created_gws[group_key][gw_id].subsystems;
+  auto &state = pending_map.created_gws[group_key][gw_id];
+
+  if (!HAVE_FEATURE(mon.get_quorum_con_features(), NVMEOF_BEACON_DIFF) &&
+		  gw_version == 0) {
+    if (gw_subs != sub) {
+      dout(10) << "BEACON_DIFF logic not applied."
+          "subsystems of GW changed, propose pending " << gw_id << dendl;
+      gw_subs = sub;
+      //rebuild  nonce map.
+      state.nonce_map = ((MNVMeofGwBeacon *)msg)->get_nonce_map();
+      changed = true;
+    }
+  } else {
+    if (state.beacon_sequence_ooo) {
+      dout(10) << "Good sequence after out of order detection "
+         "sequence "<< state.beacon_sequence << " " << gw_id << dendl;
+      // need to clear subsystems for correct calculation of difference
+      state.subsystems.clear();
+      state.beacon_sequence_ooo = false;
+      propose_pending = true;
+    }
+
+    for (auto &subs_it: sub) {
+      if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED ||
+          subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED) {
+        found = false;
+        for (auto &gw_subs_it: gw_subs) {
+          if (gw_subs_it.nqn == subs_it.nqn) {
+            gw_subs_it = subs_it;
+            dout(10) << "subsystem changed " << subs_it.nqn << " change descr "
+                     << (uint32_t)subs_it.change_descriptor << dendl;
+            found = true;
+            changed = true;
+            break;
+          }
+        }
+        if (!found) {
+          gw_subs.push_back(subs_it);
+          changed = true;
+          dout(10) << "subsystem added " << subs_it.nqn <<  " change descr "
+                 << (uint32_t)subs_it.change_descriptor << dendl;
+        }
+      }
+      else
+        if (subs_it.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED)
+        {
+           auto it = std::find_if(gw_subs.begin(), gw_subs.end(),
+           [&](const auto& gw_subs_it) {
+             return gw_subs_it.nqn == subs_it.nqn;
+           });
+           if (it != gw_subs.end()) {
+             gw_subs.erase(it);
+             dout(10) << "subsystem deleted " << subs_it.nqn << " change descr "
+                      << (uint32_t)subs_it.change_descriptor << dendl;
+             changed = true;
+           }
+        }
+    }
+  }
+  if (changed) {
+    avail = gw_availability_t::GW_AVAILABLE;
+  }
+  if (state.gw_admin_state ==gw_admin_state_t::GW_ADMIN_DISABLED) {
+    avail = gw_availability_t::GW_CREATED;
+  }
+  if (gw_subs.size() == 0) {
+      avail = gw_availability_t::GW_CREATED;
+      dout(10) << "No-subsystems condition detected for GW " << gw_id <<dendl;
+    } else {
+      bool listener_found = false;
+      for (auto &subs: gw_subs) {
+        if (subs.listeners.size()) {
+          listener_found = true;
+          break;
+        }
+      }
+      if (!listener_found) {
+       dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
+       avail = gw_availability_t::GW_CREATED;
+      }
+    }// for HA no-subsystems and no-listeners are same usecases
+  if (avail == gw_availability_t::GW_UNAVAILABLE) {
+	  dout(4) << "Warning: UNAVAILABLE gw " << gw_id << dendl;
+  }
+  return (changed == true ? 1:0);
+}
+
 bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
 {
   auto m = op->get_req<MNVMeofGwBeacon>();
-
-  dout(20) << "availability " <<  m->get_availability()
+  uint64_t sequence = m->get_sequence();
+  int version = m->version;
+  dout(10) << "availability " << m->get_availability()
+       << " sequence " << sequence << " version " << version
 	   << " GW : " << m->get_gw_id()
 	   << " osdmap_epoch " << m->get_last_osd_epoch()
-	   << " subsystems " << m->get_subsystems() << dendl;
+	   << " subsystems " << m->get_subsystems().size() << dendl;
+
   ConnectionRef con = op->get_connection();
   NvmeGwId gw_id = m->get_gw_id();
   NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(),  m->get_gw_group());
+  //"avail" variable will be changed inside the function
+  // when it becomes CREATED for several reasons GW's load balance group
+  //  is serviced by another GW
   gw_availability_t  avail = m->get_availability();
   bool propose = false;
   bool nonce_propose = false;
   bool timer_propose = false;
   bool gw_propose    = false;
   bool gw_created = true;
+  bool correct_sequence = true;
+  uint64_t stored_sequence;
   NVMeofGwMap ack_map;
   bool epoch_filter_enabled = HAVE_FEATURE(mon.get_quorum_con_features(),
                               NVMEOFHAMAP);
@@ -755,6 +1036,9 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
 	       << map.created_gws << dendl;
       goto set_propose;
     } else {
+      pending_map.created_gws[group_key][gw_id].subsystems.clear();
+      pending_map.set_gw_beacon_sequence_number(gw_id, version,
+            group_key, sequence);
       dout(4) << "GW beacon: Created state - full startup done " << gw_id
 	       << " GW state in monitor data-base : "
 	       << pending_map.created_gws[group_key][gw_id].availability
@@ -783,6 +1067,8 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   } else { // first GW beacon should come with avail = Created
     // if GW reports Avail/Unavail but in monitor's database it is Unavailable
     if (gw != group_gws.end()) {
+      correct_sequence = pending_map.put_gw_beacon_sequence_number
+           (gw_id, version, group_key, sequence, stored_sequence);
       // it means it did not perform "exit" after failover was set by
       // NVMeofGWMon
       if ((pending_map.created_gws[group_key][gw_id].availability ==
@@ -800,6 +1086,18 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
 	mon.send_reply(op, msg.detach());
 	goto false_return;
       }
+      if (!correct_sequence) {
+        if (avail == gw_availability_t::GW_AVAILABLE) {
+          /*prevent failover - give GW a chance to send the expected sequence */
+          dout(4) << "sequence ooo: set skip-failovers for group " << gw_id
+                  << " group " << group_key << dendl;
+          pending_map.skip_failovers_for_group(group_key, 7);
+        }
+        avail = gw_availability_t::GW_CREATED;
+        // availability would be set to Active and GW receive the full map
+        // when it sends the correct beacon-sequence,
+        goto check_availability;
+      }
     }
   }
   // Beacon from GW in !Created state but it does not appear in the map
@@ -825,44 +1123,12 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
     pending_map.set_addr_vect(gw_id, group_key, con->get_peer_addr());
     gw_propose = true;
   }
-  // deep copy the whole nonce map of this GW
-  if (m->get_nonce_map().size()) {
-    if (pending_map.created_gws[group_key][gw_id].nonce_map !=
-	m->get_nonce_map()) {
-      dout(10) << "nonce map of GW  changed , propose pending "
-	       << gw_id << dendl;
-      pending_map.created_gws[group_key][gw_id].nonce_map = m->get_nonce_map();
-      dout(10) << "nonce map of GW " << gw_id << " "
-	       << pending_map.created_gws[group_key][gw_id].nonce_map  << dendl;
-      nonce_propose = true;
-    }
-  } else {
-    dout(10) << "Warning: received empty nonce map in the beacon of GW "
-	     << gw_id << " avail " << (int)avail << dendl;
-  }
-
-  if (sub.size() == 0) {
-    avail = gw_availability_t::GW_CREATED;
-    dout(20) << "No-subsystems condition detected for GW " << gw_id <<dendl;
-  } else {
-    bool listener_found = false;
-    for (auto &subs: sub) {
-      if (subs.listeners.size()) {
-        listener_found = true;
-        break;
-      }
-    }
-    if (!listener_found) {
-     dout(10) << "No-listeners condition detected for GW " << gw_id << dendl;
-     avail = gw_availability_t::GW_CREATED;
-    }
-  }// for HA no-subsystems and no-listeners are same usecases
-  if (pending_map.created_gws[group_key][gw_id].subsystems != sub) {
-    dout(10) << "subsystems of GW changed, propose pending " << gw_id << dendl;
-    pending_map.created_gws[group_key][gw_id].subsystems =  sub;
-    dout(20) << "subsystems of GW " << gw_id << " "
-	     << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
+  if (apply_beacon(gw_id, version, group_key, (void *)m, sub, avail, propose) !=0) {
     nonce_propose = true;
+    dout(10) << "subsystem(subs/listener/nonce/NM) of GW changed, propose pending "
+             << gw_id << " available " << avail <<  dendl;
+    dout(20) << "subsystems of GW " << gw_id << " "
+             << pending_map.created_gws[group_key][gw_id].subsystems << dendl;
   }
   pending_map.created_gws[group_key][gw_id].last_gw_map_epoch_valid =
     (get_ack_map_epoch(true, group_key) == m->get_last_gwmap_epoch());
@@ -872,10 +1138,11 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
 	     << " epoch " << get_ack_map_epoch(true, group_key)
 	     << " beacon_epoch " << m->get_last_gwmap_epoch() <<  dendl;
   }
+
+check_availability:
   if (avail == gw_availability_t::GW_AVAILABLE) {
     // check pending_map.epoch vs m->get_version() -
     // if different - drop the beacon
-
     LastBeacon lb = {gw_id, group_key};
     last_beacon[lb] = now;
     epoch_t last_osd_epoch = m->get_last_osd_epoch();
@@ -888,9 +1155,10 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   // Periodic: check active FSM timers
   pending_map.update_active_timers(timer_propose);
 
- set_propose:
+set_propose:
   propose |= (timer_propose | gw_propose | nonce_propose);
-  apply_ack_logic = (avail == gw_availability_t::GW_AVAILABLE) ? true : false;
+  apply_ack_logic = ((avail == gw_availability_t::GW_AVAILABLE)
+                      && correct_sequence) ? true : false;
   if ( (apply_ack_logic &&
       ((pending_map.created_gws[group_key][gw_id].beacon_index++
           % beacons_till_ack) == 0))|| (!apply_ack_logic) ) {
@@ -905,22 +1173,8 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
   if (send_ack && ((!gw_propose && epoch_filter_enabled) ||
                     (!propose && !epoch_filter_enabled) ||
                     (avail == gw_availability_t::GW_CREATED)) ) {
-          /* always send beacon ack to gw in Created state,
-           * it should be temporary state
-           * if epoch-filter-bit: send ack to beacon in case no propose
-           * or if changed something not relevant to gw-epoch
-          */
-    if (gw_created) {
-      // respond with a map slice correspondent to the same GW
-      ack_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
-    }
-    ack_map.epoch = get_ack_map_epoch(gw_created, group_key);
-    if (!gw_created)
-      dout(10) << "gw not created, ack map "
-                        << ack_map << " epoch " << ack_map.epoch << dendl;
-    dout(20) << "ack_map " << ack_map <<dendl;
-    auto msg = make_message<MNVMeofGwMap>(ack_map);
-    mon.send_reply(op, msg.detach());
+    do_send_map_ack(op, gw_created, gw_propose, stored_sequence,
+			        correct_sequence, group_key, gw_id);
   } else {
     mon.no_reply(op);
   }
diff --git a/src/mon/NVMeofGwMon.h b/src/mon/NVMeofGwMon.h
index 08abf98a8df..16ca9e568fc 100644
--- a/src/mon/NVMeofGwMon.h
+++ b/src/mon/NVMeofGwMon.h
@@ -101,6 +101,12 @@ private:
   void restore_pending_map_info(NVMeofGwMap & tmp_map);
   void cleanup_pending_map();
   void get_gw_listeners(ceph::Formatter *f, std::pair<std::string, std::string>& group_key);
+  int apply_beacon(const NvmeGwId &gw_id, int gw_version,
+             const NvmeGroupKey& group_key, void *msg,
+			 const BeaconSubsystems& sub, gw_availability_t &avail, bool &propose_pending);
+  void do_send_map_ack(MonOpRequestRef op, bool gw_created, bool gw_propose,
+       uint64_t stored_sequence, bool is_correct_sequence,
+       const NvmeGroupKey& group_key, const NvmeGwId &gw_id);
   void check_beacon_timeout(ceph::coarse_mono_clock::time_point now,
        bool &propose_pending);
 };
diff --git a/src/mon/NVMeofGwSerialize.h b/src/mon/NVMeofGwSerialize.h
index 9d43f242ae8..24dee67d70b 100755
--- a/src/mon/NVMeofGwSerialize.h
+++ b/src/mon/NVMeofGwSerialize.h
@@ -12,6 +12,7 @@
  */
 #ifndef MON_NVMEOFGWSERIALIZE_H_
 #define MON_NVMEOFGWSERIALIZE_H_
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mon
 #undef dout_prefix
@@ -83,6 +84,22 @@ inline std::ostream& operator<<(
   return os;
 }
 
+inline std::ostream& operator<<(
+  std::ostream& os, const  gw_admin_state_t value) {
+  switch (value) {
+
+  case gw_admin_state_t:: GW_ADMIN_ENABLED:
+    os << "ADMIN_ENABLED";
+    break;
+  case gw_admin_state_t:: GW_ADMIN_DISABLED:
+    os << "ADMIN_DISABLED";
+    break;
+  default:
+    os << "Invalid " << (int)value << " ";
+  }
+  return os;
+}
+
 inline std::ostream& operator<<(std::ostream& os, const SmState value) {
   os << "SM_STATE [ ";
   for (auto& state_itr: value ) {
@@ -106,7 +123,8 @@ inline std::ostream& operator<<(std::ostream& os, const BeaconListener value) {
 }
 
 inline std::ostream& operator<<(std::ostream& os, const BeaconSubsystem value) {
-  os << "BeaconSubsystem( nqn:" << value.nqn << ", listeners [ ";
+  os << "BeaconSubsystem( nqn:" << value.nqn << " descr "
+     << (uint32_t)value.change_descriptor << ", listeners [ ";
   for (const auto& list: value.listeners) os << list << " ";
   os << "] namespaces [ ";
   for (const auto& ns: value.namespaces) os << ns << " ";
@@ -123,7 +141,9 @@ inline std::ostream& operator<<(
   std::ostream& os, const NvmeGwClientState value) {
   os <<  "NvmeGwState { group id: " << value.group_id
      << " gw_map_epoch " <<  value.gw_map_epoch
-     << " availablilty "<< value.availability
+     << " availablilty " << value.availability
+     << " sequence " << value.last_beacon_seq_number
+     << " sequence-ooo " << value.last_beacon_seq_ooo
      << " GwSubsystems: [ ";
   for (const auto& sub: value.subsystems) {
     os << sub.second << " ";
@@ -183,7 +203,8 @@ inline std::ostream& print_gw_created_t(
     os << " " << state_itr.first <<": " << state_itr.second << ",";
   }
   os << "]\n"<< MODULE_PREFFIX << " entity-addr : " << value.addr_vect
-     << " availability " << value.availability
+     << " availability " << value.availability << " location " << value.location
+     << " admin state " << value.gw_admin_state
      << " full-startup " << value.performed_full_startup  << " ]";
 
   return os;
@@ -223,6 +244,17 @@ inline std::ostream& operator<<(std::ostream& os, const NvmeGwMonStates value) {
   return os;
 }
 
+inline std::ostream& operator<<(std::ostream& os, const LocationStates value) {
+  if(value.size()) os << "\n" << MODULE_PREFFIX;;
+
+  for (auto &locations : value) {
+    os  <<  "location " << locations.first  << " recovering state "
+        << locations.second.failbacks_in_process;
+    os  << "\n"<< MODULE_PREFFIX;
+  }
+  return os;
+}
+
 inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) {
   os <<  "\n" <<  MODULE_PREFFIX << "== NVMeofGwMap [ Created_gws: epoch "
      << value.epoch;
@@ -230,6 +262,10 @@ inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) {
     os <<  "\n" <<  MODULE_PREFFIX  << "{ " << group_gws.first
        << " } -> GW epoch: " << group_gws.second << " }";
   }
+  for (auto& group_gws: value.disaster_locations) {
+    os <<  "\n" <<  MODULE_PREFFIX  << "{ " << group_gws.first
+       << " } -> disaster-locations: " << group_gws.second << " }";
+  }
   for (auto& group_gws: value.created_gws) {
    os <<  "\n" <<  MODULE_PREFFIX  << "{ " << group_gws.first
       << " } -> { " << group_gws.second << " }";
@@ -273,7 +309,7 @@ inline void encode(
   for (const auto& sub: subsystems) {
     encode(sub.second.nqn, bl);
     if (version == 1) {
-      dout(20) << "encode ana_state vector version1 = " << version << dendl;
+      dout(20) << "encode ana_state vector version1 = " << (int)version << dendl;
       /* Version 1 requires exactly 16 entries */
       ana_state_t filled(sub.second.ana_state);
       filled.resize(
@@ -283,7 +319,7 @@ inline void encode(
 	  0));
       encode(filled, bl);
     } else {
-      dout(20) << "encode ana_state vector version2 = " << version << dendl;
+      dout(20) << "encode ana_state vector version2 = " << (int)version << dendl;
       encode(sub.second.ana_state, bl);
     }
   }
@@ -307,23 +343,37 @@ inline  void decode(
 }
 
 inline void encode(const NvmeGwClientState& state,  ceph::bufferlist &bl, uint64_t features) {
-  ENCODE_START(1, 1, bl);
+  uint8_t version = 1;
+  if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+     version = 2;
+  }
+  ENCODE_START(version, version, bl);
   encode(state.group_id, bl);
   encode(state.gw_map_epoch, bl);
   encode (state.subsystems, bl, features);
   encode((uint32_t)state.availability, bl);
+  if (version >= 2) {
+    encode((uint64_t)state.last_beacon_seq_number, bl);
+    encode(state.last_beacon_seq_ooo, bl);
+  }
   ENCODE_FINISH(bl);
 }
 
 inline  void decode(
   NvmeGwClientState& state,  ceph::bufferlist::const_iterator& bl) {
-  DECODE_START(1, bl);
+  DECODE_START(2, bl);
   decode(state.group_id, bl);
   decode(state.gw_map_epoch, bl);
   decode(state.subsystems, bl);
   uint32_t avail;
+  uint64_t last_beacon_seq_number;
   decode(avail, bl);
   state.availability = (gw_availability_t)avail;
+  if (struct_v >= 2) {
+    decode(last_beacon_seq_number, bl);
+    state.last_beacon_seq_number = last_beacon_seq_number;
+    decode(state.last_beacon_seq_ooo, bl);
+  }
   DECODE_FINISH(bl);
 }
 
@@ -417,6 +467,7 @@ inline void encode(const NvmeAnaNonceMap& nonce_map,  ceph::bufferlist &bl,
   uint64_t features) {
   ENCODE_START(1, 1, bl);
   encode((uint32_t)nonce_map.size(), bl);
+  dout(20) << "encode nonce map  size " << nonce_map.size() << dendl;
   for (auto& ana_group_nonces : nonce_map) {
     // ana group id
     encode(ana_group_nonces.first, bl);
@@ -456,11 +507,16 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
   if (HAVE_FEATURE(features, NVMEOFHAMAP)) {
     version = 3;
   }
+  if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+    version = 4;
+  }
   ENCODE_START(version, version, bl);
+  dout(20) << "encode NvmeGwMonStates. struct_v: " << (int)version << dendl;
   encode ((uint32_t)gws.size(), bl); // number of gws in the group
   for (auto& gw : gws) {
     encode(gw.first, bl);// GW_id
     encode(gw.second.ana_grp_id, bl); // GW owns this group-id
+    dout(20) << "encode gw-id " << gw.first << dendl;
     if (version >= 2) {
       encode((uint32_t)gw.second.sm_state.size(), bl);
       for (auto &state_it:gw.second.sm_state) {
@@ -470,7 +526,9 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
       encode((uint32_t)gw.second.availability, bl);
       encode((uint16_t)gw.second.performed_full_startup, bl);
       encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
-      encode(gw.second.subsystems, bl);
+      dout(20) << "encode availability " << gw.second.availability
+               << " startup " << (int)gw.second.performed_full_startup << dendl;
+      encode(gw.second.subsystems, bl, features);
 
       encode((uint32_t)gw.second.blocklist_data.size(), bl);
       for (auto &blklst_itr: gw.second.blocklist_data) {
@@ -487,7 +545,7 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
       encode((uint32_t)gw.second.availability, bl);
       encode((uint16_t)gw.second.performed_full_startup, bl);
       encode((uint16_t)gw.second.last_gw_map_epoch_valid, bl);
-      encode(gw.second.subsystems, bl); // TODO reuse but put features - encode version
+      encode(gw.second.subsystems, bl, features);
       Blocklist_data bl_data[MAX_SUPPORTED_ANA_GROUPS];
       for (auto &blklst_itr: gw.second.blocklist_data) {
         bl_data[blklst_itr.first].osd_epoch   = blklst_itr.second.osd_epoch;
@@ -504,6 +562,11 @@ inline void encode(const NvmeGwMonStates& gws,  ceph::bufferlist &bl,
       gw.second.addr_vect.encode(bl, features);
       encode(gw.second.beacon_index, bl);
     }
+    if (version >= 4) {
+      encode((int)gw.second.gw_admin_state, bl);
+      dout(10) << "encode location " << gw.second.location << dendl;
+      encode(gw.second.location, bl);
+    }
   }
   ENCODE_FINISH(bl);
 }
@@ -512,7 +575,7 @@ inline void decode(
   NvmeGwMonStates& gws, ceph::buffer::list::const_iterator &bl) {
   gws.clear();
   uint32_t num_created_gws;
-  DECODE_START(3, bl);
+  DECODE_START(4, bl);
   dout(20) << "decode NvmeGwMonStates. struct_v: " << struct_v << dendl;
   decode(num_created_gws, bl);
   dout(20) << "decode NvmeGwMonStates. num gws  " << num_created_gws << dendl;
@@ -589,6 +652,15 @@ inline void decode(
       dout(20) << "decode addr_vect and beacon_index" << dendl;
       gw_created.addr_vect.decode(bl);
       decode(gw_created.beacon_index, bl);
+      dout(20) << "decoded beacon_index " << gw_created.beacon_index << dendl;
+    }
+    if (struct_v >= 4) {
+      dout(20) << "decode admin state and location" << dendl;
+      int admin_state;
+      decode(admin_state, bl);
+      gw_created.gw_admin_state = (gw_admin_state_t)admin_state;
+      decode(gw_created.location, bl);
+      dout(20) << "decoded location " << gw_created.location << dendl;
     }
 
     gws[gw_name] = gw_created;
@@ -638,6 +710,15 @@ inline void decode(std::map<NvmeGroupKey, epoch_t>& gw_epoch,
   DECODE_FINISH(bl);
 }
 
+inline void encode(const LocationState &locationstate, ceph::bufferlist &bl) {
+  encode(locationstate.failbacks_in_process, bl);
+}
+
+inline void decode(LocationState &locationstate,
+                  ceph::buffer::list::const_iterator &bl) {
+  decode(locationstate.failbacks_in_process, bl);
+}
+
 inline void encode(
   const std::map<NvmeGroupKey, NvmeGwMonStates>& created_gws,
   ceph::bufferlist &bl, uint64_t features) {
@@ -823,25 +904,37 @@ inline void decode(BeaconListener& ls, ceph::buffer::list::const_iterator &bl) {
   DECODE_FINISH(bl);
 }
 
-inline void encode(const BeaconSubsystem& sub,  ceph::bufferlist &bl) {
-  ENCODE_START(1, 1, bl);
+inline void encode(const BeaconSubsystem& sub,  ceph::bufferlist &bl, uint64_t features) {
+  uint8_t version = 1;
+  if (HAVE_FEATURE(features, NVMEOF_BEACON_DIFF)) {
+    version = 2;
+  }
+
+  ENCODE_START(version, version, bl);
   encode(sub.nqn, bl);
+  dout(20) << "encode BeaconSubsystems " << sub.nqn <<" features " <<  features
+           << " version " << (int)version << dendl;
   encode((uint32_t)sub.listeners.size(), bl);
   for (const auto& ls: sub.listeners)
     encode(ls, bl);
   encode((uint32_t)sub.namespaces.size(), bl);
   for (const auto& ns: sub.namespaces)
     encode(ns, bl);
+  if (version >= 2) {
+    encode((uint32_t)sub.change_descriptor, bl);
+    dout(20) << "encode BeaconSubsystems change-descr: " << (uint32_t)sub.change_descriptor << dendl;
+  }
   ENCODE_FINISH(bl);
 }
 
 inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl) {
-  DECODE_START(1, bl);
-  dout(20) << "decode BeaconSubsystems " << dendl;
+  DECODE_START(2, bl);
   decode(sub.nqn, bl);
+  dout(20) << "decode BeaconSubsystems " << sub.nqn << dendl;
   uint32_t s;
   sub.listeners.clear();
   decode(s, bl);
+  dout(20) << "decode Nlisteners " << s << dendl;
   for (uint32_t i = 0; i < s; i++) {
     BeaconListener ls;
     decode(ls, bl);
@@ -855,6 +948,12 @@ inline void decode(BeaconSubsystem& sub, ceph::buffer::list::const_iterator &bl)
     decode(ns, bl);
     sub.namespaces.push_back(ns);
   }
+  if (struct_v >= 2) {
+    uint32_t change_desc;
+    decode(change_desc, bl);
+    sub.change_descriptor = static_cast<subsystem_change_t>(change_desc);
+    dout(20) << "decode BeaconSubsystems version >= " << 2 << dendl;
+  }
   DECODE_FINISH(bl);
 }
 
diff --git a/src/mon/NVMeofGwTypes.h b/src/mon/NVMeofGwTypes.h
index 097397795a9..5c01e5b6bb1 100755
--- a/src/mon/NVMeofGwTypes.h
+++ b/src/mon/NVMeofGwTypes.h
@@ -17,8 +17,16 @@
 #include <iomanip>
 #include <map>
 #include <iostream>
+#include <list>
+#include <vector>
+#include <cstdint>
+#include <chrono>
+#include "include/types.h"
+#include "msg/msg_types.h"
 
 using NvmeGwId = std::string;
+using FailbackLocation = std::string;
+using NvmeLocation = std::string;
 using NvmeGroupKey = std::pair<std::string, std::string>;
 using NvmeNqnId = std::string;
 using NvmeAnaGrpId = uint32_t;
@@ -46,6 +54,17 @@ enum class gw_availability_t {
   GW_DELETED
 };
 
+enum class gw_admin_state_t {
+  GW_ADMIN_ENABLED = 0,
+  GW_ADMIN_DISABLED,
+};
+
+enum class subsystem_change_t {
+  SUBSYSTEM_ADDED,
+  SUBSYSTEM_CHANGED,
+  SUBSYSTEM_DELETED
+};
+
 #define REDUNDANT_GW_ANA_GROUP_ID 0xFF
 using SmState = std::map < NvmeAnaGrpId, gw_states_per_group_t>;
 
@@ -85,6 +104,7 @@ struct BeaconSubsystem {
   NvmeNqnId nqn;
   std::list<BeaconListener>  listeners;
   std::list<BeaconNamespace> namespaces;
+  subsystem_change_t change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
 
   // Define the equality operator
   bool operator==(const BeaconSubsystem& other) const {
@@ -130,7 +150,9 @@ struct NvmeGwMonState {
   BlocklistData blocklist_data;
   //ceph entity address allocated for the GW-client that represents this GW-id
   entity_addrvec_t addr_vect;
-  uint16_t beacon_index = 0;
+  uint64_t beacon_sequence = 0;// sequence number of last beacon copied to GW state
+  bool beacon_sequence_ooo = false; // last beacon sequence was out of order;
+  uint16_t beacon_index = 0; // used for filter acks sent to the client as response to beacon
   /**
    * during redeploy action and maybe other emergency use-cases gw performs scenario
    * that we call fast-reboot. It quickly reboots(due to redeploy f.e) and sends the
@@ -151,6 +173,8 @@ struct NvmeGwMonState {
    * it from being overriden by new epochs in monitor's function create_pending -
    * function restore_pending_map_info is called for this purpose
   */
+  gw_admin_state_t gw_admin_state = gw_admin_state_t::GW_ADMIN_ENABLED;
+  std::string location = "";
   std::chrono::system_clock::time_point allow_failovers_ts =
              std::chrono::system_clock::now();
   std::chrono::system_clock::time_point last_gw_down_ts =
@@ -169,6 +193,9 @@ struct NvmeGwMonState {
      // it expects it performed the full startup
     performed_full_startup = false;
   }
+  void reset_beacon_sequence(){
+    beacon_sequence = 0;
+  }
   void standby_state(NvmeAnaGrpId grpid) {
     sm_state[grpid]       = gw_states_per_group_t::GW_STANDBY_STATE;
   }
@@ -230,12 +257,16 @@ struct NvmeGwClientState {
   epoch_t gw_map_epoch;
   GwSubsystems subsystems;
   gw_availability_t availability;
-  NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available)
-    : group_id(id), gw_map_epoch(epoch), availability(available) {}
+  uint64_t last_beacon_seq_number;
+  bool last_beacon_seq_ooo; //out of order sequence
+  NvmeGwClientState(NvmeAnaGrpId id, epoch_t epoch, gw_availability_t available,
+     uint64_t sequence, bool sequence_ooo)
+    : group_id(id), gw_map_epoch(epoch), availability(available),
+      last_beacon_seq_number(sequence), last_beacon_seq_ooo(sequence_ooo) {}
 
   NvmeGwClientState()
     : NvmeGwClientState(
-      REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE) {}
+      REDUNDANT_GW_ANA_GROUP_ID, 0, gw_availability_t::GW_UNAVAILABLE, 0, 0) {}
 };
 
 struct Tmdata {
@@ -255,8 +286,16 @@ struct NvmeGwTimerState {
   NvmeGwTimerState() {};
 };
 
+struct LocationState {
+  bool failbacks_in_process; //failbacks allowed in recovering state
+  LocationState() {
+    failbacks_in_process = 0;
+  }
+};
+
 using NvmeGwMonClientStates = std::map<NvmeGwId, NvmeGwClientState>;
 using NvmeGwTimers = std::map<NvmeGwId, NvmeGwTimerState>;
 using NvmeGwMonStates = std::map<NvmeGwId, NvmeGwMonState>;
+using LocationStates = std::map<NvmeLocation, LocationState>;
 
 #endif /* SRC_MON_NVMEOFGWTYPES_H_ */
diff --git a/src/nvmeof/NVMeofGwMonitorClient.cc b/src/nvmeof/NVMeofGwMonitorClient.cc
index f632d11ba34..6d80d2d1936 100644
--- a/src/nvmeof/NVMeofGwMonitorClient.cc
+++ b/src/nvmeof/NVMeofGwMonitorClient.cc
@@ -12,6 +12,7 @@
  */
 
 #include <boost/algorithm/string/replace.hpp>
+#include <fmt/format.h>
 
 #include "common/errno.h"
 #include "common/signal.h"
@@ -19,6 +20,7 @@
 #include "include/compat.h"
 
 #include "include/stringify.h"
+#include "include/ceph_features.h"
 #include "global/global_context.h"
 #include "global/signal_handler.h"
 
@@ -28,6 +30,7 @@
 #include "NVMeofGwMonitorClient.h"
 #include "NVMeofGwClient.h"
 #include "NVMeofGwMonitorGroupClient.h"
+#include "nvmeof/NVMeofGwUtils.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mon
@@ -41,6 +44,8 @@ NVMeofGwMonitorClient::NVMeofGwMonitorClient(int argc, const char **argv) :
   last_map_time(std::chrono::steady_clock::now()),
   reset_timestamp(std::chrono::steady_clock::now()),
   start_time(last_map_time),
+  cluster_beacon_diff_included(0),
+  poolctx(),
   monc{g_ceph_context, poolctx},
   client_messenger(Messenger::create(g_ceph_context, "async", entity_name_t::CLIENT(-1), "client", getpid())),
   objecter{g_ceph_context, client_messenger.get(), &monc, poolctx},
@@ -214,7 +219,7 @@ void NVMeofGwMonitorClient::send_beacon()
 {
   ceph_assert(ceph_mutex_is_locked_by_me(beacon_lock));
   gw_availability_t gw_availability = gw_availability_t::GW_CREATED;
-  BeaconSubsystems subs;
+  BeaconSubsystems current_subsystems;
   NVMeofGwClient gw_client(
      grpc::CreateChannel(gateway_address, gw_creds()));
   subsystems_info gw_subsystems;
@@ -234,26 +239,43 @@ void NVMeofGwMonitorClient::send_beacon()
         BeaconListener bls = { ls.adrfam(), ls.traddr(), ls.trsvcid() };
         bsub.listeners.push_back(bls);
       }
-      subs.push_back(bsub);
+      current_subsystems.push_back(bsub);
     }
   }
 
+  // Determine change descriptors by comparing with previous beacon's subsystems
+  BeaconSubsystems subsystem_diff = current_subsystems;
+  determine_subsystem_changes(prev_beacon_subsystems, subsystem_diff);
+
   auto group_key = std::make_pair(pool, group);
   NvmeGwClientState old_gw_state;
   // if already got gateway state in the map
   if (first_beacon == false && get_gw_state("old map", map, group_key, name, old_gw_state))
     gw_availability = ok ? gw_availability_t::GW_AVAILABLE : gw_availability_t::GW_UNAVAILABLE;
-  dout(10) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
+  dout(1) << "sending beacon as gid " << monc.get_global_id() << " availability " << (int)gw_availability <<
     " osdmap_epoch " << osdmap_epoch << " gwmap_epoch " << gwmap_epoch << dendl;
+
+  // Check if NVMEOF_BEACON_DIFF feature is supported by the cluster
+  dout(10) << fmt::format("NVMEOF_BEACON_DIFF supported: {}",  cluster_beacon_diff_included ? "yes" : "no") << dendl;
+
+  // Send beacon with appropriate version based on cluster features
   auto m = ceph::make_message<MNVMeofGwBeacon>(
       name,
       pool,
       group,
-      subs,
+      cluster_beacon_diff_included ? subsystem_diff : current_subsystems,
       gw_availability,
       osdmap_epoch,
-      gwmap_epoch);
+      gwmap_epoch,
+      beacon_sequence,
+      // Pass affected features to the constructor
+      cluster_beacon_diff_included ? CEPH_FEATUREMASK_NVMEOF_BEACON_DIFF : 0
+  );
+  dout(10) << "sending beacon with diff support: " << (cluster_beacon_diff_included ? "enabled" : "disabled") << dendl;
+  
   monc.send_mon_message(std::move(m));
+  ++beacon_sequence;
+  prev_beacon_subsystems = std::move(current_subsystems);
 }
 
 void NVMeofGwMonitorClient::disconnect_panic()
@@ -391,6 +413,22 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
   // ensure that the gateway state has not vanished
   ceph_assert(got_new_gw_state || !got_old_gw_state);
 
+  // Check if the last_beacon_seq_number in the received map doesn't match our last sent beacon_sequence
+  // beacon_sequence is incremented after sending, so we compare with (beacon_sequence - 1)
+  {
+    std::lock_guard bl(beacon_lock);
+    if (got_new_gw_state && new_gw_state.last_beacon_seq_ooo) {
+        //new_gw_state.last_beacon_seq_number != (beacon_sequence - 1)) {
+      dout(4) << "Beacon sequence mismatch detected. Expected: " << (beacon_sequence - 1)
+               << ", received: " << new_gw_state.last_beacon_seq_number
+               << ". Truncating previous subsystems list." << dendl;
+      beacon_sequence = new_gw_state.last_beacon_seq_number + 1;
+      prev_beacon_subsystems.clear();
+      dout(4) << "OOO map received, Ignore it" << dendl;
+      return;
+    }
+  }
+
   if (!got_old_gw_state) {
     if (!got_new_gw_state) {
       dout(10) << "Can not find new gw state" << dendl;
@@ -425,10 +463,29 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
     }
   }
 
+  // Combined subsystems
+  const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
+  GwSubsystems combined_subsystems = new_gw_state.subsystems;
+  for (const auto& nqn_state_pair: old_gw_state.subsystems) {
+    const auto& nqn = nqn_state_pair.first;
+    auto& old_nqn_state = nqn_state_pair.second;
+
+    // The monitor might remove active subsystems from the new distributed GwSubsystems.
+    // In such cases, ensure an INACCESSIBLE state is generated for subsystems
+    // that were present in the old state but are now missing.
+    if (new_gw_state.subsystems.find(nqn) == new_gw_state.subsystems.end()) {
+      ana_state_t all_disabled(old_nqn_state.ana_state.size(), initial_ana_state);
+      dout(4) << "set all groups to Inacccessible stat for " << nqn << dendl;
+      NqnState    nqn_state(nqn, all_disabled);
+
+      combined_subsystems.insert({nqn, nqn_state});
+    }
+  }
+
   // Gather all state changes
   ana_info ai;
   epoch_t max_blocklist_epoch = 0;
-  for (const auto& nqn_state_pair: new_gw_state.subsystems) {
+  for (const auto& nqn_state_pair: combined_subsystems) {
     auto& sub = nqn_state_pair.second;
     const auto& nqn = nqn_state_pair.first;
     nqn_ana_states nas;
@@ -442,7 +499,6 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
        sub.ana_state.size();
 
     for (NvmeAnaGrpId  ana_grp_index = 0; ana_grp_index < ana_state_size; ana_grp_index++) {
-      const auto initial_ana_state = std::make_pair(gw_exported_states_per_group_t::GW_EXPORTED_INACCESSIBLE_STATE, (epoch_t)0);
       auto new_group_state = (ana_grp_index < sub.ana_state.size()) ?
 	sub.ana_state[ana_grp_index] :
 	initial_ana_state;
@@ -496,6 +552,12 @@ Dispatcher::dispatch_result_t NVMeofGwMonitorClient::ms_dispatch2(const ref_t<Me
   std::lock_guard l(lock);
   dout(10) << "got map type " << m->get_type() << dendl;
 
+  // print connection features for all incoming messages and update cluster features
+  if (m->get_connection()) {
+    cluster_beacon_diff_included = monc.get_monmap_required_features().contains_all(ceph::features::mon::FEATURE_NVMEOF_BEACON_DIFF);
+    dout(10) << fmt::format("Updated cluster features: 0x{:x}", cluster_beacon_diff_included) << dendl;
+  }
+
   if (m->get_type() == MSG_MNVMEOF_GW_MAP) {
     handle_nvmeof_gw_map(ref_cast<MNVMeofGwMap>(m));
     return Dispatcher::HANDLED();
diff --git a/src/nvmeof/NVMeofGwMonitorClient.h b/src/nvmeof/NVMeofGwMonitorClient.h
index 434f160203f..0ffe8169ad6 100644
--- a/src/nvmeof/NVMeofGwMonitorClient.h
+++ b/src/nvmeof/NVMeofGwMonitorClient.h
@@ -53,7 +53,9 @@ private:
 
   bool first_beacon = true;
   bool set_group_id = false;
-
+  uint64_t beacon_sequence = 0;
+  BeaconSubsystems prev_beacon_subsystems;
+  bool cluster_beacon_diff_included = 0;  // track cluster features for beacon encoding
   // init gw ssl opts
   void init_gw_ssl_opts();
 
diff --git a/src/nvmeof/NVMeofGwUtils.cc b/src/nvmeof/NVMeofGwUtils.cc
new file mode 100644
index 00000000000..9beccc8668c
--- /dev/null
+++ b/src/nvmeof/NVMeofGwUtils.cc
@@ -0,0 +1,53 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+                                BeaconSubsystems& new_subsystems) {
+  BeaconSubsystems result;
+
+  // for each subsystem in new_subsystems, check if it's added or changed
+  for (const auto& new_sub : new_subsystems) {
+    auto old_it = std::find_if(old_subsystems.begin(), old_subsystems.end(),
+                              [&](const BeaconSubsystem& s) { return s.nqn == new_sub.nqn; });
+    if (old_it == old_subsystems.end()) {
+      // Subsystem not found in old list - it's new
+      BeaconSubsystem added = new_sub;
+      added.change_descriptor = subsystem_change_t::SUBSYSTEM_ADDED;
+      result.push_back(std::move(added));
+    } else {
+      // subsystem exists - check if it changed
+      if (!(*old_it == new_sub)) {
+        BeaconSubsystem changed = new_sub;
+        changed.change_descriptor = subsystem_change_t::SUBSYSTEM_CHANGED;
+        result.push_back(std::move(changed));
+      }
+      // else: unchanged, do not add
+    }
+  }
+
+  // for any subsystem in old_subsystems not present in new_subsystems, add as deleted
+  for (const auto& old_sub : old_subsystems) {
+    auto found = std::find_if(new_subsystems.begin(), new_subsystems.end(),
+                             [&](const BeaconSubsystem& s) { return s.nqn == old_sub.nqn; });
+    if (found == new_subsystems.end()) {
+      BeaconSubsystem deleted_sub = old_sub;
+      deleted_sub.change_descriptor = subsystem_change_t::SUBSYSTEM_DELETED;
+      result.push_back(std::move(deleted_sub));
+    }
+  }
+
+  new_subsystems = std::move(result);
+}
+
diff --git a/src/nvmeof/NVMeofGwUtils.h b/src/nvmeof/NVMeofGwUtils.h
new file mode 100644
index 00000000000..a9c25317d81
--- /dev/null
+++ b/src/nvmeof/NVMeofGwUtils.h
@@ -0,0 +1,23 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#ifndef  __NVMEOFGWUTILS_H__
+#define  __NVMEOFGWUTILS_H__
+#include "mon/NVMeofGwTypes.h"
+#include <list>
+
+// utility for diffing nvmeof subsystems changes
+void determine_subsystem_changes(const BeaconSubsystems& old_subsystems,
+                                BeaconSubsystems& new_subsystems);
+
+#endif
diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt
index 45d628717dd..847b89d42af 100644
--- a/src/test/CMakeLists.txt
+++ b/src/test/CMakeLists.txt
@@ -1033,3 +1033,12 @@ add_executable(unittest_ceph_assert
 add_ceph_unittest(unittest_ceph_assert)
 target_link_libraries(unittest_ceph_assert ceph-common global)
 endif()
+
+add_executable(test_nvmeof_gw_utils
+  test_nvmeof_gw_utils.cc
+  ../nvmeof/NVMeofGwUtils.cc
+  )
+target_link_libraries(test_nvmeof_gw_utils
+  mon ceph-common global-static
+  )
+
diff --git a/src/test/test_nvmeof_gw_utils.cc b/src/test/test_nvmeof_gw_utils.cc
new file mode 100644
index 00000000000..c7d2ccfa561
--- /dev/null
+++ b/src/test/test_nvmeof_gw_utils.cc
@@ -0,0 +1,69 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "nvmeof/NVMeofGwUtils.h"
+#include "mon/NVMeofGwTypes.h"
+#include <iostream>
+#include <algorithm>
+#include "include/ceph_assert.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_mon
+#undef dout_prefix
+#define dout_prefix *_dout
+
+void test_determine_subsystem_changes() {
+  std::cout << __func__ << "\n\n" << std::endl;
+  // Prepare old and new subsystems
+  BeaconSubsystem sub1_old = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub2_old = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub3_old = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystems old_subs = { sub1_old, sub2_old, sub3_old };
+
+  // sub1 unchanged, sub2 changed, sub4 added, sub3 deleted
+  BeaconSubsystem sub1_new = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub2_new = { "nqn2", { {"IPv4", "1.2.3.4", "4420"} }, {}, subsystem_change_t::SUBSYSTEM_ADDED }; // changed listeners
+  BeaconSubsystem sub4_new = { "nqn4", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystems new_subs = { sub1_new, sub2_new, sub4_new };
+
+  determine_subsystem_changes(old_subs, new_subs);
+
+  // After call, new_subs should only contain changed, added, and deleted subsystems
+  // sub1 (unchanged) should be removed
+  // sub2 (changed) should be present with SUBSYSTEM_CHANGED
+  // sub4 (added) should be present with SUBSYSTEM_ADDED
+  // sub3 (deleted) should be present with SUBSYSTEM_DELETED
+  bool found_sub2 = false, found_sub3 = false, found_sub4 = false;
+  for (const auto& s : new_subs) {
+    if (s.nqn == "nqn2") {
+      found_sub2 = true;
+      ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+    } else if (s.nqn == "nqn3") {
+      found_sub3 = true;
+      ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_DELETED);
+    } else if (s.nqn == "nqn4") {
+      found_sub4 = true;
+      ceph_assert(s.change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+    } else {
+      ceph_assert(false && "Unexpected subsystem in result");
+    }
+  }
+  ceph_assert(found_sub2 && found_sub3 && found_sub4);
+  std::cout << "determine_subsystem_changes test passed" << std::endl;
+}
+
+int main(int argc, const char **argv) {
+  test_determine_subsystem_changes();
+  return 0;
+}
diff --git a/src/test/test_nvmeof_mon_encoding.cc b/src/test/test_nvmeof_mon_encoding.cc
index 704342c4c16..ca7f01d3eb1 100644
--- a/src/test/test_nvmeof_mon_encoding.cc
+++ b/src/test/test_nvmeof_mon_encoding.cc
@@ -35,12 +35,17 @@ void test_NVMeofGwMap() {
   std::string pool = "pool1";
   std::string group = "grp1";
   auto group_key = std::make_pair(pool, group);
-  pending_map.cfg_add_gw("GW1" ,group_key);
-  pending_map.cfg_add_gw("GW2" ,group_key);
-  pending_map.cfg_add_gw("GW3" ,group_key);
+  std::string nqn = "nqn-nqn";
+  BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+  BeaconSubsystems subs = {sub};
+
+  pending_map.cfg_add_gw("GW1" ,group_key, CEPH_FEATURES_ALL);
+  pending_map.cfg_add_gw("GW2" ,group_key, CEPH_FEATURES_ALL);
+  pending_map.cfg_add_gw("GW3" ,group_key, CEPH_FEATURES_ALL);
   NvmeNonceVector new_nonces = {"abc", "def","hij"};
   pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
   pending_map.created_gws[group_key]["GW1"].performed_full_startup = true;
+  pending_map.created_gws[group_key]["GW1"].subsystems = subs;
   int i = 0;
   for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
     blklst_itr.second.osd_epoch = 2*(i++);
@@ -52,21 +57,25 @@ void test_NVMeofGwMap() {
   dout(0) << pending_map << dendl;
 
   ceph::buffer::list bl;
+  dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
+
   pending_map.encode(bl, CEPH_FEATURES_ALL);
   auto p = bl.cbegin();
   pending_map.decode(p);
   dout(0) << " == Dump map after Decode: == " <<dendl;
   dout(0) << pending_map << dendl;
+  dout(0) << pending_map.created_gws[group_key]["GW1"].subsystems << dendl;
 }
 
 void test_MNVMeofGwMap() {
+  //test message to the Mon Client
   dout(0) << __func__ << "\n\n" << dendl;
   std::map<NvmeGroupKey, NvmeGwMonClientStates> map;
 
   std::string pool = "pool1";
   std::string group = "grp1";
   std::string gw_id = "GW1";
-  NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE);
+  NvmeGwClientState state(1, 32, gw_availability_t::GW_UNAVAILABLE, 0, false);
   std::string nqn = "nqn";
   ana_state_t ana_state;
   NqnState nqn_state(nqn, ana_state);
@@ -81,9 +90,9 @@ void test_MNVMeofGwMap() {
   encode(map, bl, CEPH_FEATURES_ALL);
   dout(0) << "encoded: " << map << dendl;
   decode(map, bl);
-  dout(0) << "decode: " << map << dendl;
+  dout(0) << "decoded: " << map << dendl;
 
-  BeaconSubsystem sub = { nqn, {}, {} };
+  BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
   NVMeofGwMap pending_map;
   pending_map.epoch = 2;
   auto msg1 = make_message<MNVMeofGwMap>(pending_map);
@@ -94,12 +103,13 @@ void test_MNVMeofGwMap() {
   int epoch = msg1->get_gwmap_epoch();
   dout(0) << "after decode empty msg: " << *msg1 << " epoch " << epoch <<  dendl;
 
-  pending_map.cfg_add_gw("GW1" ,group_key);
-  pending_map.cfg_add_gw("GW2" ,group_key);
-  pending_map.cfg_add_gw("GW3" ,group_key);
+  pending_map.cfg_add_gw("GW1" ,group_key, CEPH_FEATURES_ALL);
+  pending_map.cfg_add_gw("GW2" ,group_key, CEPH_FEATURES_ALL);
+  pending_map.cfg_add_gw("GW3" ,group_key, CEPH_FEATURES_ALL);
   NvmeNonceVector new_nonces = {"abc", "def","hij"};
   pending_map.created_gws[group_key]["GW1"].nonce_map[1] = new_nonces;
   pending_map.created_gws[group_key]["GW1"].subsystems.push_back(sub);
+
   int i = 0;
   for (auto & blklst_itr : pending_map.created_gws[group_key]["GW1"].blocklist_data){
      blklst_itr.second.osd_epoch = 2*(i++);
@@ -115,7 +125,7 @@ void test_MNVMeofGwMap() {
   dout(0) << "after encode msg: " << *msg << dendl;
   msg->decode_payload();
   dout(0) << "after decode msg: " << *msg << dendl;
-
+  
   //dout(0)   << "\n == Test GW Delete ==" << dendl;
   //pending_map.cfg_delete_gw("GW1" ,group_key);
   //dout(0) << "deleted GW1 " << pending_map << dendl;
@@ -127,10 +137,10 @@ void test_MNVMeofGwMap() {
   //dout(0) << "deleted GW2 " << pending_map << dendl;
 
   //dout(0) << "delete of wrong gw id" << dendl;
-  //pending_map.cfg_delete_gw("wow" ,group_key);
+  //pending_map.cfg_delete_gw("wow" ,group_key, true);
 
-  pending_map.cfg_delete_gw("GW3" ,group_key);
-  dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
+  //pending_map.cfg_delete_gw("GW3" ,group_key);
+  //dout(0) << "deleted GW3 . we should see the empty map " << pending_map << dendl;
 
 
 }
@@ -141,11 +151,13 @@ void test_MNVMeofGwBeacon() {
   std::string gw_group = "group";
   gw_availability_t availability = gw_availability_t::GW_AVAILABLE;
   std::string nqn = "nqn";
-  BeaconSubsystem sub = { nqn, {}, {} };
+  BeaconSubsystem sub = { nqn, {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
   BeaconSubsystems subs = { sub };
   epoch_t osd_epoch = 17;
   epoch_t gwmap_epoch = 42;
-
+  uint64_t sequence = 12345;
+  
+  // Test legacy beacon (without diff support)
   auto msg = make_message<MNVMeofGwBeacon>(
       gw_id,
       gw_pool,
@@ -153,22 +165,87 @@ void test_MNVMeofGwBeacon() {
       subs,
       availability,
       osd_epoch,
-      gwmap_epoch);
-  msg->encode_payload(0);
+      gwmap_epoch
+      // sequence defaults to 0
+      // enable_diff defaults to false
+  );
+  msg->encode_payload(CEPH_FEATURES_ALL);
   msg->decode_payload();
-  dout(0) << "decode msg: " << *msg << dendl;
+  dout(0) << "decode msg (revision 1): " << *msg << dendl;
   ceph_assert(msg->get_gw_id() == gw_id);
   ceph_assert(msg->get_gw_pool() == gw_pool);
   ceph_assert(msg->get_gw_group() == gw_group);
   ceph_assert(msg->get_availability() == availability);
   ceph_assert(msg->get_last_osd_epoch() == osd_epoch);
   ceph_assert(msg->get_last_gwmap_epoch() == gwmap_epoch);
+  // Legacy beacons don't preserve sequence field - it gets reset to 0
+  ceph_assert(msg->get_sequence() == 0);
   const auto& dsubs = msg->get_subsystems();
   auto it = std::find_if(dsubs.begin(), dsubs.end(),
                            [&nqn](const auto& element) {
                                return element.nqn == nqn;
                            });
   ceph_assert(it != dsubs.end());
+  ceph_assert(it->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+  
+  // Test enhanced beacon (with diff support)
+  auto msg2 = make_message<MNVMeofGwBeacon>(
+      gw_id,
+      gw_pool,
+      gw_group,
+      subs,
+      availability,
+      osd_epoch,
+      gwmap_epoch,
+      sequence,
+      true  // enable_diff = true
+  );
+  msg2->encode_payload(CEPH_FEATURES_ALL);
+  msg2->decode_payload();
+  dout(0) << "decode msg (revision 2): " << *msg2 << dendl;
+  ceph_assert(msg2->get_gw_id() == gw_id);
+  ceph_assert(msg2->get_gw_pool() == gw_pool);
+  ceph_assert(msg2->get_gw_group() == gw_group);
+  ceph_assert(msg2->get_availability() == availability);
+  ceph_assert(msg2->get_last_osd_epoch() == osd_epoch);
+  ceph_assert(msg2->get_last_gwmap_epoch() == gwmap_epoch);
+  ceph_assert(msg2->get_sequence() == sequence);
+  const auto& dsubs2 = msg2->get_subsystems();
+  auto it2 = std::find_if(dsubs2.begin(), dsubs2.end(),
+                           [&nqn](const auto& element) {
+                               return element.nqn == nqn;
+                           });
+  ceph_assert(it2 != dsubs2.end());
+  ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+}
+
+void test_subsystem_change_descriptors() {
+  dout(0) << __func__ << "\n\n" << dendl;
+  // Test different change descriptors
+  BeaconSubsystem sub1 = { "nqn1", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystem sub2 = { "nqn2", {}, {}, subsystem_change_t::SUBSYSTEM_CHANGED };
+  BeaconSubsystem sub3 = { "nqn3", {}, {}, subsystem_change_t::SUBSYSTEM_ADDED };
+  BeaconSubsystems subs = { sub1, sub2, sub3 };
+  // Encode and decode
+  ceph::buffer::list bl;
+  encode(subs, bl, CEPH_FEATURES_ALL);
+  auto p = bl.cbegin();
+  BeaconSubsystems decoded_subs;
+  decode(decoded_subs, p);
+  // Verify change descriptors are preserved
+  auto it1 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+                          [](const auto& s) { return s.nqn == "nqn1"; });
+  auto it2 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+                          [](const auto& s) { return s.nqn == "nqn2"; });
+  auto it3 = std::find_if(decoded_subs.begin(), decoded_subs.end(),
+                          [](const auto& s) { return s.nqn == "nqn3"; });
+  ceph_assert(it1 != decoded_subs.end());
+  ceph_assert(it2 != decoded_subs.end());
+  ceph_assert(it3 != decoded_subs.end());
+  ceph_assert(it1->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+  ceph_assert(it2->change_descriptor == subsystem_change_t::SUBSYSTEM_CHANGED);
+  ceph_assert(it3->change_descriptor == subsystem_change_t::SUBSYSTEM_ADDED);
+  dout(0) << "Subsystem change descriptors test passed" << dendl;
 }
 
 void test_NVMeofGwTimers()
@@ -206,6 +283,7 @@ int main(int argc, const char **argv)
   test_NVMeofGwMap();
   test_MNVMeofGwMap();
   test_MNVMeofGwBeacon();
+  test_subsystem_change_descriptors();
   test_NVMeofGwTimers();
 }
 
openSUSE Build Service is sponsored by