diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index d81f6949f4..8bf8a73ff1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -523,6 +523,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // load management decisions may be made. private void updateBundleData() { final Map bundleData = loadData.getBundleData(); + final Set activeBundles = new HashSet<>(); // Iterate over the broker data. for (Map.Entry brokerEntry : loadData.getBrokerData().entrySet()) { final String broker = brokerEntry.getKey(); @@ -534,6 +535,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { for (Map.Entry entry : statsMap.entrySet()) { final String bundle = entry.getKey(); final NamespaceBundleStats stats = entry.getValue(); + activeBundles.add(bundle); if (bundleData.containsKey(bundle)) { // If we recognize the bundle, add these stats as a new sample. bundleData.get(bundle).update(stats); @@ -545,6 +547,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager { bundleData.put(bundle, currentBundleData); } } + //Remove not active bundle from loadData + for (String bundle : bundleData.keySet()) { + if (!activeBundles.contains(bundle)){ + bundleData.remove(bundle); + if (pulsar.getLeaderElectionService().isLeader()){ + deleteBundleDataFromMetadataStore(bundle); + } + } + } + // Remove all loaded bundles from the preallocated maps. final Map preallocatedBundleData = brokerData.getPreallocatedBundleData();