rocketmq/patch011-backport-optimize-config.patch

1391 lines
70 KiB
Diff
Raw Normal View History

2023-10-01 22:17:15 +08:00
From 50d1050437ed8748f86ee50261b50a1e1f63162e Mon Sep 17 00:00:00 2001
From: Jixiang Jin <lollipop@apache.org>
Date: Wed, 16 Aug 2023 21:15:00 +0800
Subject: [PATCH 1/7] To config the cardinalityLimit for openTelemetry metrics
exporting and fix logging config for metrics (#7196)
---
WORKSPACE | 14 +++---
.../broker/metrics/BrokerMetricsManager.java | 47 ++++++++++++++-----
.../broker/metrics/PopMetricsManager.java | 11 +++--
.../src/main/resources/rmq.broker.logback.xml | 17 ++++---
.../apache/rocketmq/common/BrokerConfig.java | 9 ++++
.../metrics/ControllerMetricsManager.java | 6 +--
pom.xml | 4 +-
.../metrics/RemotingMetricsManager.java | 10 ++--
.../rocketmq/store/DefaultMessageStore.java | 24 +++++-----
.../apache/rocketmq/store/MessageStore.java | 6 +--
.../metrics/DefaultStoreMetricsManager.java | 4 +-
.../plugin/AbstractPluginMessageStore.java | 6 +--
.../tieredstore/TieredMessageStore.java | 6 +--
.../metrics/TieredStoreMetricsManager.java | 23 +++++----
14 files changed, 110 insertions(+), 77 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index a8a0aafe9..3126f2d1d 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -88,14 +88,14 @@ maven_install(
"io.grpc:grpc-api:1.47.0",
"io.grpc:grpc-testing:1.47.0",
"org.springframework:spring-core:5.3.26",
- "io.opentelemetry:opentelemetry-exporter-otlp:1.19.0",
- "io.opentelemetry:opentelemetry-exporter-prometheus:1.19.0-alpha",
- "io.opentelemetry:opentelemetry-exporter-logging:1.19.0",
- "io.opentelemetry:opentelemetry-sdk:1.19.0",
+ "io.opentelemetry:opentelemetry-exporter-otlp:1.29.0",
+ "io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha",
+ "io.opentelemetry:opentelemetry-exporter-logging:1.29.0",
+ "io.opentelemetry:opentelemetry-sdk:1.29.0",
"com.squareup.okio:okio-jvm:3.0.0",
- "io.opentelemetry:opentelemetry-api:1.19.0",
- "io.opentelemetry:opentelemetry-sdk-metrics:1.19.0",
- "io.opentelemetry:opentelemetry-sdk-common:1.19.0",
+ "io.opentelemetry:opentelemetry-api:1.29.0",
+ "io.opentelemetry:opentelemetry-sdk-metrics:1.29.0",
+ "io.opentelemetry:opentelemetry-sdk-common:1.29.0",
"io.github.aliyunmq:rocketmq-slf4j-api:1.0.0",
"io.github.aliyunmq:rocketmq-logback-classic:1.0.0",
"org.slf4j:jul-to-slf4j:2.0.6",
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index f0b76107e..6af5afc14 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -34,8 +34,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -361,22 +363,45 @@ public class BrokerMetricsManager {
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
.build();
- View messageSizeView = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
- .build();
- providerBuilder.registerView(messageSizeSelector, messageSizeView);
-
- for (Pair<InstrumentSelector, View> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ ViewBuilder messageSizeViewBuilder = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets));
+ // To config the cardinalityLimit for openTelemetry metrics exporting.
+ SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build());
+
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
}
- for (Pair<InstrumentSelector, View> selectorViewPair : messageStore.getMetricsView()) {
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : messageStore.getMetricsView()) {
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
}
- for (Pair<InstrumentSelector, View> selectorViewPair : PopMetricsManager.getMetricsView()) {
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : PopMetricsManager.getMetricsView()) {
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
}
+
+ // default view builder for all counter.
+ InstrumentSelector defaultCounterSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.COUNTER)
+ .build();
+ ViewBuilder defaultCounterViewBuilder = View.builder().setDescription("default view for counter.");
+ SdkMeterProviderUtil.setCardinalityLimit(defaultCounterViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(defaultCounterSelector, defaultCounterViewBuilder.build());
+
+ //default view builder for all observable gauge.
+ InstrumentSelector defaultGaugeSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.OBSERVABLE_GAUGE)
+ .build();
+ ViewBuilder defaultGaugeViewBuilder = View.builder().setDescription("default view for gauge.");
+ SdkMeterProviderUtil.setCardinalityLimit(defaultGaugeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(defaultGaugeSelector, defaultGaugeViewBuilder.build());
}
private void initStatsMetrics() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
index 463371d7e..2de220da1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -63,7 +64,7 @@ public class PopMetricsManager {
private static LongCounter popReviveGetTotal = new NopLongCounter();
private static LongCounter popReviveRetryMessageTotal = new NopLongCounter();
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
List<Double> rpcCostTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(1).toMillis(),
(double) Duration.ofMillis(10).toMillis(),
@@ -76,10 +77,10 @@ public class PopMetricsManager {
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
.build();
- View popBufferScanTimeConsumeView = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
- .build();
- return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView));
+ ViewBuilder popBufferScanTimeConsumeViewBuilder = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
+
+ return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeViewBuilder));
}
public static void initMetrics(Meter meter, BrokerController brokerController,
diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml
index 7d49f6664..3c51e59d4 100644
--- a/broker/src/main/resources/rmq.broker.logback.xml
+++ b/broker/src/main/resources/rmq.broker.logback.xml
@@ -559,27 +559,27 @@
</sift>
</appender>
- <appender name="RocketmqBrokerMetricsAppender" class="ch.qos.logback.classic.sift.SiftingAppender">
+ <appender name="RocketmqBrokerMetricsSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender">
<discriminator>
<key>brokerContainerLogDir</key>
<defaultValue>${file.separator}</defaultValue>
</discriminator>
<sift>
- <appender name="RocketmqCommercialAppender"
+ <appender name="RocketmqBrokerMetricsAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metric.log
+ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metrics.log
</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>
- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metric.%i.log.gz
+ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metrics.%i.log.gz
</fileNamePattern>
<minIndex>1</minIndex>
- <maxIndex>10</maxIndex>
+ <maxIndex>3</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>500MB</maxFileSize>
+ <maxFileSize>512MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
@@ -588,6 +588,9 @@
</appender>
</sift>
</appender>
+ <appender name="RocketmqBrokerMetricsSiftingAppender" class="ch.qos.logback.classic.AsyncAppender">
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender_inner"/>
+ </appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
@@ -670,7 +673,7 @@
</logger>
<logger name="io.opentelemetry.exporter.logging.LoggingMetricExporter" additivity="false" level="INFO">
- <appender-ref ref="RocketmqBrokerMetricsAppender"/>
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender"/>
</logger>
<root level="INFO">
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 99a5db5ad..45d26b29c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -350,6 +350,7 @@ public class BrokerConfig extends BrokerIdentity {
private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+ private int metricsOtelCardinalityLimit = 50 * 1000;
private String metricsGrpcExporterTarget = "";
private String metricsGrpcExporterHeader = "";
private long metricGrpcExporterTimeOutInMills = 3 * 1000;
@@ -1531,6 +1532,14 @@ public class BrokerConfig extends BrokerIdentity {
this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
}
+ public int getMetricsOtelCardinalityLimit() {
+ return metricsOtelCardinalityLimit;
+ }
+
+ public void setMetricsOtelCardinalityLimit(int metricsOtelCardinalityLimit) {
+ this.metricsOtelCardinalityLimit = metricsOtelCardinalityLimit;
+ }
+
public String getMetricsGrpcExporterTarget() {
return metricsGrpcExporterTarget;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
index 9b30a3b43..650740bcc 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
@@ -203,7 +203,7 @@ public class ControllerMetricsManager {
10 * s
);
- View latecyView = View.builder()
+ View latencyView = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets))
.build();
@@ -217,8 +217,8 @@ public class ControllerMetricsManager {
.setName(HISTOGRAM_DLEDGER_OP_LATENCY)
.build();
- providerBuilder.registerView(requestLatencySelector, latecyView);
- providerBuilder.registerView(dLedgerOpLatencySelector, latecyView);
+ providerBuilder.registerView(requestLatencySelector, latencyView);
+ providerBuilder.registerView(dLedgerOpLatencySelector, latencyView);
}
private void initMetric(Meter meter) {
diff --git a/pom.xml b/pom.xml
index 3a08d75f2..9f0b3eb96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,8 +133,8 @@
<caffeine.version>2.9.3</caffeine.version>
<spring.version>5.3.27</spring.version>
<okio-jvm.version>3.0.0</okio-jvm.version>
- <opentelemetry.version>1.26.0</opentelemetry.version>
- <opentelemetry-exporter-prometheus.version>1.26.0-alpha</opentelemetry-exporter-prometheus.version>
+ <opentelemetry.version>1.29.0</opentelemetry.version>
+ <opentelemetry-exporter-prometheus.version>1.29.0-alpha</opentelemetry-exporter-prometheus.version>
<jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
<s3.version>2.20.29</s3.version>
<rocksdb.version>1.0.3</rocksdb.version>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
index 34136f94f..2e0d70856 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
@@ -26,6 +26,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -61,7 +62,7 @@ public class RemotingMetricsManager {
.build();
}
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
List<Double> rpcCostTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(1).toMillis(),
(double) Duration.ofMillis(3).toMillis(),
@@ -77,10 +78,9 @@ public class RemotingMetricsManager {
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_RPC_LATENCY)
.build();
- View view = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
- .build();
- return Lists.newArrayList(new Pair<>(selector, view));
+ ViewBuilder viewBuilder = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
+ return Lists.newArrayList(new Pair<>(selector, viewBuilder));
}
public static String getWriteAndFlushResult(Future<?> future) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 25e4a166f..6115ead59 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -22,7 +22,7 @@ import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -42,23 +42,24 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
@@ -82,7 +83,6 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
-import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -3268,7 +3268,7 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
return DefaultStoreMetricsManager.getMetricsView();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 31bbb907f..989cbbe31 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -19,8 +19,7 @@ package org.apache.rocketmq.store;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
-
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
@@ -28,7 +27,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
@@ -964,7 +962,7 @@ public interface MessageStore {
*
* @return List of metrics selector and view pair
*/
- List<Pair<InstrumentSelector, View>> getMetricsView();
+ List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();
/**
* Init store metrics
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index ff87f6369..45a6bbc68 100644
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -23,7 +23,7 @@ import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.io.File;
import java.util.List;
import java.util.function.Supplier;
@@ -69,7 +69,7 @@ public class DefaultStoreMetricsManager {
public static LongCounter timerDequeueTotal = new NopLongCounter();
public static LongCounter timerEnqueueTotal = new NopLongCounter();
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
return Lists.newArrayList();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 25e947512..ab9fc6da7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -20,8 +20,7 @@ package org.apache.rocketmq.store.plugin;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
-
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
@@ -29,7 +28,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.message.MessageExt;
@@ -643,7 +641,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
return next.getMetricsView();
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index ced1fb818..5240ac8e9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -21,7 +21,7 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -352,8 +352,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
}
@Override
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
- List<Pair<InstrumentSelector, View>> res = super.getMetricsView();
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
+ List<Pair<InstrumentSelector, ViewBuilder>> res = super.getMetricsView();
res.addAll(TieredStoreMetricsManager.getMetricsView());
return res;
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 3ca0fb614..d8a07f0a7 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -101,8 +102,8 @@ public class TieredStoreMetricsManager {
public static ObservableLongGauge storageSize = new NopObservableLongGauge();
public static ObservableLongGauge storageMessageReserveTime = new NopObservableLongGauge();
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
- ArrayList<Pair<InstrumentSelector, View>> res = new ArrayList<>();
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
+ ArrayList<Pair<InstrumentSelector, ViewBuilder>> res = new ArrayList<>();
InstrumentSelector providerRpcLatencySelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
@@ -114,10 +115,9 @@ public class TieredStoreMetricsManager {
.setName(HISTOGRAM_API_LATENCY)
.build();
- View rpcLatencyView = View.builder()
+ ViewBuilder rpcLatencyViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 3d, 5d, 7d, 10d, 100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000)))
- .setDescription("tiered_store_rpc_latency_view")
- .build();
+ .setDescription("tiered_store_rpc_latency_view");
InstrumentSelector uploadBufferSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
@@ -129,15 +129,14 @@ public class TieredStoreMetricsManager {
.setName(HISTOGRAM_DOWNLOAD_BYTES)
.build();
- View bufferSizeView = View.builder()
+ ViewBuilder bufferSizeViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d * TieredStoreUtil.KB, 10d * TieredStoreUtil.KB, 100d * TieredStoreUtil.KB, 1d * TieredStoreUtil.MB, 10d * TieredStoreUtil.MB, 32d * TieredStoreUtil.MB, 50d * TieredStoreUtil.MB, 100d * TieredStoreUtil.MB)))
- .setDescription("tiered_store_buffer_size_view")
- .build();
+ .setDescription("tiered_store_buffer_size_view");
- res.add(new Pair<>(rpcLatencySelector, rpcLatencyView));
- res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyView));
- res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeView));
- res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeView));
+ res.add(new Pair<>(rpcLatencySelector, rpcLatencyViewBuilder));
+ res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyViewBuilder));
+ res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeViewBuilder));
+ res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeViewBuilder));
return res;
}
--
2.32.0.windows.2
From a4bcc2a74d8bec9c9d34565536e87df06e0b11c1 Mon Sep 17 00:00:00 2001
From: Ziyi Tan <tanziyi0925@gmail.com>
Date: Thu, 17 Aug 2023 13:53:48 +0800
Subject: [PATCH 2/7] [ISSUE #7178] refresh metadata after broker startup
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
---
.../rocketmq/broker/BrokerController.java | 24 +++++++++----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 30b1d2299..13f9d002b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -663,7 +663,7 @@ public class BrokerController {
BrokerController.this.getSlaveSynchronize().syncAll();
lastSyncTimeMs = System.currentTimeMillis();
}
-
+
//timer checkpoint, latency-sensitive, so sync it more frequently
if (messageStoreConfig.isTimerWheelEnable()) {
BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
@@ -698,17 +698,6 @@ public class BrokerController {
initializeBrokerScheduledTasks();
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.brokerOuterAPI.refreshMetadata();
- } catch (Exception e) {
- LOG.error("ScheduledTask refresh metadata exception", e);
- }
- }
- }, 10, 5, TimeUnit.SECONDS);
-
if (this.brokerConfig.getNamesrvAddr() != null) {
this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
@@ -1682,6 +1671,17 @@ public class BrokerController {
if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ BrokerController.this.brokerOuterAPI.refreshMetadata();
+ } catch (Exception e) {
+ LOG.error("ScheduledTask refresh metadata exception", e);
+ }
+ }
+ }, 10, 5, TimeUnit.SECONDS);
}
protected void scheduleSendHeartbeat() {
--
2.32.0.windows.2
From 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Thu, 17 Aug 2023 13:59:04 +0800
Subject: [PATCH 3/7] [ISSUE #7201] Remove the DefaultMessageStore.class
dependency in TransientStorePool
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
.../rocketmq/store/AllocateMappedFileService.java | 6 +++---
.../apache/rocketmq/store/DefaultMessageStore.java | 7 +++++--
.../apache/rocketmq/store/TransientStorePool.java | 13 ++++---------
3 files changed, 12 insertions(+), 14 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index dca7d5325..c8420fea1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (this.messageStore.isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
- canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
+ canSubmitRequests = this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size();
}
}
@@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
@@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6115ead59..f2a54ddf6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore {
this.reputMessageService = new ConcurrentReputMessageService();
}
- this.transientStorePool = new TransientStorePool(this);
+ this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore {
}
public int remainTransientStoreBufferNumbs() {
- return this.transientStorePool.availableBufferNums();
+ if (this.isTransientStorePoolEnable()) {
+ return this.transientStorePool.availableBufferNums();
+ }
+ return Integer.MAX_VALUE;
}
@Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index 8c1a5338b..0d42ee69e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -33,13 +33,11 @@ public class TransientStorePool {
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
- private final DefaultMessageStore messageStore;
private volatile boolean isRealCommit = true;
- public TransientStorePool(final DefaultMessageStore messageStore) {
- this.messageStore = messageStore;
- this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
- this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
+ public TransientStorePool(final int poolSize, final int fileSize) {
+ this.poolSize = poolSize;
+ this.fileSize = fileSize;
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
@@ -81,10 +79,7 @@ public class TransientStorePool {
}
public int availableBufferNums() {
- if (messageStore.isTransientStorePoolEnable()) {
- return availableBuffers.size();
- }
- return Integer.MAX_VALUE;
+ return availableBuffers.size();
}
public boolean isRealCommit() {
--
2.32.0.windows.2
From 2b93e1e32fd458d9df2091e89ea259ddd4d54061 Mon Sep 17 00:00:00 2001
From: iamgd67 <iamgd67@sina.com>
Date: Thu, 17 Aug 2023 15:31:14 +0800
Subject: [PATCH 4/7] Update mqbroker to use runbroker.sh instead of
runserver.sh when use --enable-proxy (#7150)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Update mqbroker to use runbroker.sh instead of runserver.sh when enabling `--enable-proxy`
this allow JVM `heap` and `gc` configuration using broker's settings instead of other common serverices'(proxy,namenode, etc).
our main purpose, like the filename `mqbroker` suggest, is to start broker (which embeds a proxy), so use broker's config is reasonable
chinese version
mqbroker的--enable-proxy选项是启动内嵌了proxy的broker而不是内嵌broker的proxy而且broker的工作量和重要程度大于proxy所以使用broker的gc和heap配置更合适
---
distribution/bin/mqbroker | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker
index 3758ed597..35eb93c44 100644
--- a/distribution/bin/mqbroker
+++ b/distribution/bin/mqbroker
@@ -68,11 +68,11 @@ if [ "$enable_proxy" = true ]; then
if [ "$broker_config" != "" ]; then
args_for_proxy=${args_for_proxy}" -bc "${broker_config}
fi
- sh ${ROCKETMQ_HOME}/bin/runserver.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
+ sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
else
args_for_broker=$other_args
if [ "$broker_config" != "" ]; then
args_for_broker=${args_for_broker}" -c "${broker_config}
fi
sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup ${args_for_broker}
-fi
\ No newline at end of file
+fi
--
2.32.0.windows.2
From 05e7cde610255ed9410fffb0f153efe7c2c8a326 Mon Sep 17 00:00:00 2001
From: yao-wenbin <ywb992134@163.com>
Date: Fri, 18 Aug 2023 09:49:59 +0800
Subject: [PATCH 5/7] [ISSUE #7042] maven-compile job failed, Because TlsTest's
serverRejectsSSLClient test case will throw TooLongFrameException (#7179)
---
.../remoting/netty/NettyRemotingServer.java | 2 +-
.../java/org/apache/rocketmq/remoting/TlsTest.java | 14 ++++++++++++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 90e358ce3..17f138f86 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -502,7 +502,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
case DISABLED:
ctx.close();
log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
- break;
+ throw new UnsupportedOperationException("The NettyRemotingServer in SSL disabled mode doesn't support ssl client");
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index de7edbbfb..a4890d73d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -144,8 +144,13 @@ public class TlsTest {
tlsClientKeyPath = "";
tlsClientCertPath = "";
clientConfig.setUseTLS(false);
- } else if ("serverRejectsSSLClient".equals(name.getMethodName())) {
+ } else if ("disabledServerRejectsSSLClient".equals(name.getMethodName())) {
tlsMode = TlsMode.DISABLED;
+ } else if ("disabledServerAcceptUnAuthClient".equals(name.getMethodName())) {
+ tlsMode = TlsMode.DISABLED;
+ tlsClientKeyPath = "";
+ tlsClientCertPath = "";
+ clientConfig.setUseTLS(false);
} else if ("reloadSslContextForServer".equals(name.getMethodName())) {
tlsClientAuthServer = false;
tlsServerNeedClientAuth = "none";
@@ -211,7 +216,7 @@ public class TlsTest {
}
@Test
- public void serverRejectsSSLClient() throws Exception {
+ public void disabledServerRejectsSSLClient() throws Exception {
try {
RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 1000 * 5);
failBecauseExceptionWasNotThrown(RemotingSendRequestException.class);
@@ -219,6 +224,11 @@ public class TlsTest {
}
}
+ @Test
+ public void disabledServerAcceptUnAuthClient() throws Exception {
+ requestThenAssertResponse();
+ }
+
/**
* Tests that a server configured to require client authentication refuses to accept connections
* from a client that has an untrusted certificate.
--
2.32.0.windows.2
From 72d796f2b20b3ec6aebca8c004d9275d7c749a95 Mon Sep 17 00:00:00 2001
From: lk <xdkxlk@outlook.com>
Date: Fri, 18 Aug 2023 11:55:39 +0800
Subject: [PATCH 6/7] [ISSUE #7205] support batch ack for pop orderly (#7206)
---
.../broker/processor/AckMessageProcessor.java | 99 ++++++-----
.../rocketmq/client/impl/MQClientAPIImpl.java | 91 ++++++++--
.../test/client/rmq/RMQPopClient.java | 22 +++
.../client/consumer/pop/BasePopNormally.java | 6 +
.../test/client/consumer/pop/BatchAckIT.java | 159 ++++++++++++++++++
5 files changed, 322 insertions(+), 55 deletions(-)
create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 687811409..244b459d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.BitSet;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
@@ -186,46 +187,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
- // order
- String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
- long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
- if (ackOffset < oldOffset) {
- return;
- }
- while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
- }
- try {
- oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
- if (ackOffset < oldOffset) {
- return;
- }
- long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
- topic, consumeGroup,
- qId, ackOffset,
- popTime);
- if (nextOffset > -1) {
- if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
- topic, consumeGroup, qId)) {
- this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
- consumeGroup, topic, qId, nextOffset);
- }
- if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
- consumeGroup, qId, invisibleTime)) {
- this.brokerController.getPopMessageProcessor().notifyMessageArriving(
- topic, consumeGroup, qId);
- }
- } else if (nextOffset == -1) {
- String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
- lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
- POP_LOGGER.warn(errorInfo);
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(errorInfo);
- return;
- }
- } finally {
- this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
- }
- brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
+ ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}
@@ -250,17 +212,22 @@ public class AckMessageProcessor implements NettyRequestProcessor {
}
BatchAckMsg batchAckMsg = new BatchAckMsg();
- for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) {
- if (!batchAck.getBitSet().get(i)) {
- continue;
+ BitSet bitSet = batchAck.getBitSet();
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+ if (i == Integer.MAX_VALUE) {
+ break;
}
long offset = startOffset + i;
if (offset < minOffset || offset > maxOffset) {
continue;
}
- batchAckMsg.getAckOffsetList().add(offset);
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
+ ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response);
+ } else {
+ batchAckMsg.getAckOffsetList().add(offset);
+ }
}
- if (batchAckMsg.getAckOffsetList().isEmpty()) {
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
@@ -311,4 +278,46 @@ public class AckMessageProcessor implements NettyRequestProcessor {
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
}
+
+ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) {
+ String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
+ long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
+ }
+ try {
+ oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
+ topic, consumeGroup,
+ qId, ackOffset,
+ popTime);
+ if (nextOffset > -1) {
+ if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+ topic, consumeGroup, qId)) {
+ this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+ consumeGroup, topic, qId, nextOffset);
+ }
+ if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
+ consumeGroup, qId, invisibleTime)) {
+ this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+ topic, consumeGroup, qId);
+ }
+ } else if (nextOffset == -1) {
+ String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
+ lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
+ POP_LOGGER.warn(errorInfo);
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(errorInfo);
+ return;
+ }
+ } finally {
+ this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
+ }
+ brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1);
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 5101ffc8e..213c26fd6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
@@ -76,7 +78,8 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -114,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -196,6 +201,10 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
@@ -207,10 +216,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH
import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -221,8 +226,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
@@ -885,9 +888,77 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
final String addr,
final long timeOut,
final AckCallback ackCallback,
- final AckMessageRequestHeader requestHeader //
+ final AckMessageRequestHeader requestHeader
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
+ }
+
+ public void batchAckMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final String topic,
+ final String consumerGroup,
+ final List<String> extraInfoList
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ String brokerName = null;
+ Map<String, BatchAck> batchAckMap = new HashMap<>();
+ for (String extraInfo : extraInfoList) {
+ String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+ if (brokerName == null) {
+ brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
+ }
+ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+ ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+ ExtraInfoUtil.getPopTime(extraInfoData);
+ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+ BatchAck newBatchAck = new BatchAck();
+ newBatchAck.setConsumerGroup(consumerGroup);
+ newBatchAck.setTopic(topic);
+ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+ newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+ newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+ newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+ newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+ newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+ newBatchAck.setBitSet(new BitSet());
+ return newBatchAck;
+ });
+ bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+ }
+
+ BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody();
+ requestBody.setBrokerName(brokerName);
+ requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+ batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
+ }
+
+ public void batchAckMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final BatchAckMessageRequestBody requestBody
) throws RemotingException, MQBrokerException, InterruptedException {
- final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
+ }
+
+ protected void ackMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final AckMessageRequestHeader requestHeader,
+ final BatchAckMessageRequestBody requestBody
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request;
+ if (requestHeader != null) {
+ request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ if (requestBody != null) {
+ request.setBody(requestBody.encode());
+ }
+ }
this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
@Override
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 496bd6da4..09c60c0b4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.client.rmq;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.AckCallback;
@@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer {
return future;
}
+ public CompletableFuture<AckResult> batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup,
+ List<String> extraInfoList) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ future.completeExceptionally(e);
+ }
+ }, topic, consumerGroup, extraInfoList);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
public CompletableFuture<AckResult> changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic,
String consumerGroup, String extraInfo, long invisibleTime) {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
index 952fbe3f5..2e29b95a5 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
@@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop {
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true,
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
}
+
+ protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, int maxNums) {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
new file mode 100644
index 000000000..ec9153ccc
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class BatchAckIT extends BasePop {
+
+ protected String topic;
+ protected String group;
+ protected RMQNormalProducer producer = null;
+ protected RMQPopClient client = null;
+ protected String brokerAddr;
+ protected MessageQueue messageQueue;
+
+ @Before
+ public void setUp() {
+ brokerAddr = brokerController1.getBrokerAddr();
+ topic = MQRandomUtils.getRandomTopic();
+ group = initConsumerGroup();
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL);
+ producer = getProducer(NAMESRV_ADDR, topic);
+ client = getRMQPopClient();
+ messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ @Test
+ public void testBatchAckNormallyWithPopBuffer() throws Throwable {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
+
+ testBatchAck(() -> {
+ try {
+ return popMessageAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
+
+ testBatchAck(() -> {
+ try {
+ return popMessageAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBatchAckOrderly() throws Throwable {
+ testBatchAck(() -> {
+ try {
+ return popMessageOrderlyAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void testBatchAck(Supplier<PopResult> popResultSupplier) throws Throwable {
+ // Send 10 messages but do not ack, let them enter the retry topic
+ producer.send(10);
+ AtomicInteger firstMsgRcvNum = new AtomicInteger();
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+ PopResult popResult = popResultSupplier.get();
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+ firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size());
+ }
+ assertEquals(10, firstMsgRcvNum.get());
+ });
+ // sleep 6s, expect messages to enter the retry topic
+ TimeUnit.SECONDS.sleep(6);
+
+ producer.send(20);
+ List<String> extraInfoList = new ArrayList<>();
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+ PopResult popResult = popResultSupplier.get();
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+ extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
+ }
+ }
+ assertEquals(30, extraInfoList.size());
+ });
+
+ AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, group, extraInfoList).get();
+ assertEquals(AckStatus.OK, ackResult.getStatus());
+
+ // sleep 6s, expected that messages that have been acked will not be re-consumed
+ TimeUnit.SECONDS.sleep(6);
+ PopResult popResult = popResultSupplier.get();
+ assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus());
+ }
+
+ private CompletableFuture<PopResult> popMessageAsync() {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+ }
+
+ private CompletableFuture<PopResult> popMessageOrderlyAsync() {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false,
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null);
+ }
+}
--
2.32.0.windows.2
From cc16a1b51216e1e80c22011b8b01e060bb4af8b3 Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Tue, 22 Aug 2023 10:42:25 +0800
Subject: [PATCH 7/7] Set table reference the same object for
setSubscriptionGroupTable method (#7204)
---
.../broker/subscription/SubscriptionGroupManager.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 74e39c0fe..e63b93058 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -341,10 +341,7 @@ public class SubscriptionGroupManager extends ConfigManager {
public void setSubscriptionGroupTable(ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
- this.subscriptionGroupTable.clear();
- for (String key : subscriptionGroupTable.keySet()) {
- putSubscriptionGroupConfig(subscriptionGroupTable.get(key));
- }
+ this.subscriptionGroupTable = subscriptionGroupTable;
}
public boolean containsSubscriptionGroup(String group) {
--
2.32.0.windows.2