38 lines
2.2 KiB
Diff
38 lines
2.2 KiB
Diff
|
|
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
|
||
|
|
index d81f6949f4..8bf8a73ff1 100644
|
||
|
|
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
|
||
|
|
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
|
||
|
|
@@ -523,6 +523,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
|
||
|
|
// load management decisions may be made.
|
||
|
|
private void updateBundleData() {
|
||
|
|
final Map<String, BundleData> bundleData = loadData.getBundleData();
|
||
|
|
+ final Set<String> activeBundles = new HashSet<>();
|
||
|
|
// Iterate over the broker data.
|
||
|
|
for (Map.Entry<String, BrokerData> brokerEntry : loadData.getBrokerData().entrySet()) {
|
||
|
|
final String broker = brokerEntry.getKey();
|
||
|
|
@@ -534,6 +535,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
|
||
|
|
for (Map.Entry<String, NamespaceBundleStats> entry : statsMap.entrySet()) {
|
||
|
|
final String bundle = entry.getKey();
|
||
|
|
final NamespaceBundleStats stats = entry.getValue();
|
||
|
|
+ activeBundles.add(bundle);
|
||
|
|
if (bundleData.containsKey(bundle)) {
|
||
|
|
// If we recognize the bundle, add these stats as a new sample.
|
||
|
|
bundleData.get(bundle).update(stats);
|
||
|
|
@@ -545,6 +547,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
|
||
|
|
bundleData.put(bundle, currentBundleData);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
+ //Remove not active bundle from loadData
|
||
|
|
+ for (String bundle : bundleData.keySet()) {
|
||
|
|
+ if (!activeBundles.contains(bundle)){
|
||
|
|
+ bundleData.remove(bundle);
|
||
|
|
+ if (pulsar.getLeaderElectionService().isLeader()){
|
||
|
|
+ deleteBundleDataFromMetadataStore(bundle);
|
||
|
|
+ }
|
||
|
|
+ }
|
||
|
|
+ }
|
||
|
|
+
|
||
|
|
|
||
|
|
// Remove all loaded bundles from the preallocated maps.
|
||
|
|
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
|