rocketmq/patch011-backport-optimize-config.patch
2023-10-01 22:17:15 +08:00

1391 lines
70 KiB
Diff
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

From 50d1050437ed8748f86ee50261b50a1e1f63162e Mon Sep 17 00:00:00 2001
From: Jixiang Jin <lollipop@apache.org>
Date: Wed, 16 Aug 2023 21:15:00 +0800
Subject: [PATCH 1/7] To config the cardinalityLimit for openTelemetry metrics
exporting and fix logging config for metrics (#7196)
---
WORKSPACE | 14 +++---
.../broker/metrics/BrokerMetricsManager.java | 47 ++++++++++++++-----
.../broker/metrics/PopMetricsManager.java | 11 +++--
.../src/main/resources/rmq.broker.logback.xml | 17 ++++---
.../apache/rocketmq/common/BrokerConfig.java | 9 ++++
.../metrics/ControllerMetricsManager.java | 6 +--
pom.xml | 4 +-
.../metrics/RemotingMetricsManager.java | 10 ++--
.../rocketmq/store/DefaultMessageStore.java | 24 +++++-----
.../apache/rocketmq/store/MessageStore.java | 6 +--
.../metrics/DefaultStoreMetricsManager.java | 4 +-
.../plugin/AbstractPluginMessageStore.java | 6 +--
.../tieredstore/TieredMessageStore.java | 6 +--
.../metrics/TieredStoreMetricsManager.java | 23 +++++----
14 files changed, 110 insertions(+), 77 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index a8a0aafe9..3126f2d1d 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -88,14 +88,14 @@ maven_install(
"io.grpc:grpc-api:1.47.0",
"io.grpc:grpc-testing:1.47.0",
"org.springframework:spring-core:5.3.26",
- "io.opentelemetry:opentelemetry-exporter-otlp:1.19.0",
- "io.opentelemetry:opentelemetry-exporter-prometheus:1.19.0-alpha",
- "io.opentelemetry:opentelemetry-exporter-logging:1.19.0",
- "io.opentelemetry:opentelemetry-sdk:1.19.0",
+ "io.opentelemetry:opentelemetry-exporter-otlp:1.29.0",
+ "io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha",
+ "io.opentelemetry:opentelemetry-exporter-logging:1.29.0",
+ "io.opentelemetry:opentelemetry-sdk:1.29.0",
"com.squareup.okio:okio-jvm:3.0.0",
- "io.opentelemetry:opentelemetry-api:1.19.0",
- "io.opentelemetry:opentelemetry-sdk-metrics:1.19.0",
- "io.opentelemetry:opentelemetry-sdk-common:1.19.0",
+ "io.opentelemetry:opentelemetry-api:1.29.0",
+ "io.opentelemetry:opentelemetry-sdk-metrics:1.29.0",
+ "io.opentelemetry:opentelemetry-sdk-common:1.29.0",
"io.github.aliyunmq:rocketmq-slf4j-api:1.0.0",
"io.github.aliyunmq:rocketmq-logback-classic:1.0.0",
"org.slf4j:jul-to-slf4j:2.0.6",
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index f0b76107e..6af5afc14 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -34,8 +34,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -361,22 +363,45 @@ public class BrokerMetricsManager {
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_MESSAGE_SIZE)
.build();
- View messageSizeView = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
- .build();
- providerBuilder.registerView(messageSizeSelector, messageSizeView);
-
- for (Pair<InstrumentSelector, View> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ ViewBuilder messageSizeViewBuilder = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets));
+ // To config the cardinalityLimit for openTelemetry metrics exporting.
+ SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build());
+
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
}
- for (Pair<InstrumentSelector, View> selectorViewPair : messageStore.getMetricsView()) {
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : messageStore.getMetricsView()) {
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
}
- for (Pair<InstrumentSelector, View> selectorViewPair : PopMetricsManager.getMetricsView()) {
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : PopMetricsManager.getMetricsView()) {
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
}
+
+ // default view builder for all counter.
+ InstrumentSelector defaultCounterSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.COUNTER)
+ .build();
+ ViewBuilder defaultCounterViewBuilder = View.builder().setDescription("default view for counter.");
+ SdkMeterProviderUtil.setCardinalityLimit(defaultCounterViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(defaultCounterSelector, defaultCounterViewBuilder.build());
+
+ //default view builder for all observable gauge.
+ InstrumentSelector defaultGaugeSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.OBSERVABLE_GAUGE)
+ .build();
+ ViewBuilder defaultGaugeViewBuilder = View.builder().setDescription("default view for gauge.");
+ SdkMeterProviderUtil.setCardinalityLimit(defaultGaugeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
+ providerBuilder.registerView(defaultGaugeSelector, defaultGaugeViewBuilder.build());
}
private void initStatsMetrics() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
index 463371d7e..2de220da1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -63,7 +64,7 @@ public class PopMetricsManager {
private static LongCounter popReviveGetTotal = new NopLongCounter();
private static LongCounter popReviveRetryMessageTotal = new NopLongCounter();
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
List<Double> rpcCostTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(1).toMillis(),
(double) Duration.ofMillis(10).toMillis(),
@@ -76,10 +77,10 @@ public class PopMetricsManager {
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
.build();
- View popBufferScanTimeConsumeView = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
- .build();
- return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView));
+ ViewBuilder popBufferScanTimeConsumeViewBuilder = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
+
+ return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeViewBuilder));
}
public static void initMetrics(Meter meter, BrokerController brokerController,
diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml
index 7d49f6664..3c51e59d4 100644
--- a/broker/src/main/resources/rmq.broker.logback.xml
+++ b/broker/src/main/resources/rmq.broker.logback.xml
@@ -559,27 +559,27 @@
</sift>
</appender>
- <appender name="RocketmqBrokerMetricsAppender" class="ch.qos.logback.classic.sift.SiftingAppender">
+ <appender name="RocketmqBrokerMetricsSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender">
<discriminator>
<key>brokerContainerLogDir</key>
<defaultValue>${file.separator}</defaultValue>
</discriminator>
<sift>
- <appender name="RocketmqCommercialAppender"
+ <appender name="RocketmqBrokerMetricsAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>
- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metric.log
+ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metrics.log
</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>
- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metric.%i.log.gz
+ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metrics.%i.log.gz
</fileNamePattern>
<minIndex>1</minIndex>
- <maxIndex>10</maxIndex>
+ <maxIndex>3</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>500MB</maxFileSize>
+ <maxFileSize>512MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
@@ -588,6 +588,9 @@
</appender>
</sift>
</appender>
+ <appender name="RocketmqBrokerMetricsSiftingAppender" class="ch.qos.logback.classic.AsyncAppender">
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender_inner"/>
+ </appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
@@ -670,7 +673,7 @@
</logger>
<logger name="io.opentelemetry.exporter.logging.LoggingMetricExporter" additivity="false" level="INFO">
- <appender-ref ref="RocketmqBrokerMetricsAppender"/>
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender"/>
</logger>
<root level="INFO">
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 99a5db5ad..45d26b29c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -350,6 +350,7 @@ public class BrokerConfig extends BrokerIdentity {
private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+ private int metricsOtelCardinalityLimit = 50 * 1000;
private String metricsGrpcExporterTarget = "";
private String metricsGrpcExporterHeader = "";
private long metricGrpcExporterTimeOutInMills = 3 * 1000;
@@ -1531,6 +1532,14 @@ public class BrokerConfig extends BrokerIdentity {
this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
}
+ public int getMetricsOtelCardinalityLimit() {
+ return metricsOtelCardinalityLimit;
+ }
+
+ public void setMetricsOtelCardinalityLimit(int metricsOtelCardinalityLimit) {
+ this.metricsOtelCardinalityLimit = metricsOtelCardinalityLimit;
+ }
+
public String getMetricsGrpcExporterTarget() {
return metricsGrpcExporterTarget;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
index 9b30a3b43..650740bcc 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
@@ -203,7 +203,7 @@ public class ControllerMetricsManager {
10 * s
);
- View latecyView = View.builder()
+ View latencyView = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets))
.build();
@@ -217,8 +217,8 @@ public class ControllerMetricsManager {
.setName(HISTOGRAM_DLEDGER_OP_LATENCY)
.build();
- providerBuilder.registerView(requestLatencySelector, latecyView);
- providerBuilder.registerView(dLedgerOpLatencySelector, latecyView);
+ providerBuilder.registerView(requestLatencySelector, latencyView);
+ providerBuilder.registerView(dLedgerOpLatencySelector, latencyView);
}
private void initMetric(Meter meter) {
diff --git a/pom.xml b/pom.xml
index 3a08d75f2..9f0b3eb96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,8 +133,8 @@
<caffeine.version>2.9.3</caffeine.version>
<spring.version>5.3.27</spring.version>
<okio-jvm.version>3.0.0</okio-jvm.version>
- <opentelemetry.version>1.26.0</opentelemetry.version>
- <opentelemetry-exporter-prometheus.version>1.26.0-alpha</opentelemetry-exporter-prometheus.version>
+ <opentelemetry.version>1.29.0</opentelemetry.version>
+ <opentelemetry-exporter-prometheus.version>1.29.0-alpha</opentelemetry-exporter-prometheus.version>
<jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
<s3.version>2.20.29</s3.version>
<rocksdb.version>1.0.3</rocksdb.version>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
index 34136f94f..2e0d70856 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
@@ -26,6 +26,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -61,7 +62,7 @@ public class RemotingMetricsManager {
.build();
}
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
List<Double> rpcCostTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(1).toMillis(),
(double) Duration.ofMillis(3).toMillis(),
@@ -77,10 +78,9 @@ public class RemotingMetricsManager {
.setType(InstrumentType.HISTOGRAM)
.setName(HISTOGRAM_RPC_LATENCY)
.build();
- View view = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
- .build();
- return Lists.newArrayList(new Pair<>(selector, view));
+ ViewBuilder viewBuilder = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
+ return Lists.newArrayList(new Pair<>(selector, viewBuilder));
}
public static String getWriteAndFlushResult(Future<?> future) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 25e4a166f..6115ead59 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -22,7 +22,7 @@ import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -42,23 +42,24 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.BrokerIdentity;
import org.apache.rocketmq.common.MixAll;
@@ -82,7 +83,6 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
-import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -3268,7 +3268,7 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
return DefaultStoreMetricsManager.getMetricsView();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 31bbb907f..989cbbe31 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -19,8 +19,7 @@ package org.apache.rocketmq.store;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
-
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
@@ -28,7 +27,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
@@ -964,7 +962,7 @@ public interface MessageStore {
*
* @return List of metrics selector and view pair
*/
- List<Pair<InstrumentSelector, View>> getMetricsView();
+ List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();
/**
* Init store metrics
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index ff87f6369..45a6bbc68 100644
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -23,7 +23,7 @@ import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.io.File;
import java.util.List;
import java.util.function.Supplier;
@@ -69,7 +69,7 @@ public class DefaultStoreMetricsManager {
public static LongCounter timerDequeueTotal = new NopLongCounter();
public static LongCounter timerEnqueueTotal = new NopLongCounter();
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
return Lists.newArrayList();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 25e947512..ab9fc6da7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -20,8 +20,7 @@ package org.apache.rocketmq.store.plugin;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
-
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
@@ -29,7 +28,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.message.MessageExt;
@@ -643,7 +641,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
return next.getMetricsView();
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index ced1fb818..5240ac8e9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -21,7 +21,7 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
-import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -352,8 +352,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
}
@Override
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
- List<Pair<InstrumentSelector, View>> res = super.getMetricsView();
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
+ List<Pair<InstrumentSelector, ViewBuilder>> res = super.getMetricsView();
res.addAll(TieredStoreMetricsManager.getMetricsView());
return res;
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 3ca0fb614..d8a07f0a7 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentSelector;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.ViewBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -101,8 +102,8 @@ public class TieredStoreMetricsManager {
public static ObservableLongGauge storageSize = new NopObservableLongGauge();
public static ObservableLongGauge storageMessageReserveTime = new NopObservableLongGauge();
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
- ArrayList<Pair<InstrumentSelector, View>> res = new ArrayList<>();
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
+ ArrayList<Pair<InstrumentSelector, ViewBuilder>> res = new ArrayList<>();
InstrumentSelector providerRpcLatencySelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
@@ -114,10 +115,9 @@ public class TieredStoreMetricsManager {
.setName(HISTOGRAM_API_LATENCY)
.build();
- View rpcLatencyView = View.builder()
+ ViewBuilder rpcLatencyViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 3d, 5d, 7d, 10d, 100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000)))
- .setDescription("tiered_store_rpc_latency_view")
- .build();
+ .setDescription("tiered_store_rpc_latency_view");
InstrumentSelector uploadBufferSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
@@ -129,15 +129,14 @@ public class TieredStoreMetricsManager {
.setName(HISTOGRAM_DOWNLOAD_BYTES)
.build();
- View bufferSizeView = View.builder()
+ ViewBuilder bufferSizeViewBuilder = View.builder()
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d * TieredStoreUtil.KB, 10d * TieredStoreUtil.KB, 100d * TieredStoreUtil.KB, 1d * TieredStoreUtil.MB, 10d * TieredStoreUtil.MB, 32d * TieredStoreUtil.MB, 50d * TieredStoreUtil.MB, 100d * TieredStoreUtil.MB)))
- .setDescription("tiered_store_buffer_size_view")
- .build();
+ .setDescription("tiered_store_buffer_size_view");
- res.add(new Pair<>(rpcLatencySelector, rpcLatencyView));
- res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyView));
- res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeView));
- res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeView));
+ res.add(new Pair<>(rpcLatencySelector, rpcLatencyViewBuilder));
+ res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyViewBuilder));
+ res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeViewBuilder));
+ res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeViewBuilder));
return res;
}
--
2.32.0.windows.2
From a4bcc2a74d8bec9c9d34565536e87df06e0b11c1 Mon Sep 17 00:00:00 2001
From: Ziyi Tan <tanziyi0925@gmail.com>
Date: Thu, 17 Aug 2023 13:53:48 +0800
Subject: [PATCH 2/7] [ISSUE #7178] refresh metadata after broker startup
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
---
.../rocketmq/broker/BrokerController.java | 24 +++++++++----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 30b1d2299..13f9d002b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -663,7 +663,7 @@ public class BrokerController {
BrokerController.this.getSlaveSynchronize().syncAll();
lastSyncTimeMs = System.currentTimeMillis();
}
-
+
//timer checkpoint, latency-sensitive, so sync it more frequently
if (messageStoreConfig.isTimerWheelEnable()) {
BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
@@ -698,17 +698,6 @@ public class BrokerController {
initializeBrokerScheduledTasks();
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.brokerOuterAPI.refreshMetadata();
- } catch (Exception e) {
- LOG.error("ScheduledTask refresh metadata exception", e);
- }
- }
- }, 10, 5, TimeUnit.SECONDS);
-
if (this.brokerConfig.getNamesrvAddr() != null) {
this.updateNamesrvAddr();
LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
@@ -1682,6 +1671,17 @@ public class BrokerController {
if (brokerConfig.isSkipPreOnline()) {
startServiceWithoutCondition();
}
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ BrokerController.this.brokerOuterAPI.refreshMetadata();
+ } catch (Exception e) {
+ LOG.error("ScheduledTask refresh metadata exception", e);
+ }
+ }
+ }, 10, 5, TimeUnit.SECONDS);
}
protected void scheduleSendHeartbeat() {
--
2.32.0.windows.2
From 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Thu, 17 Aug 2023 13:59:04 +0800
Subject: [PATCH 3/7] [ISSUE #7201] Remove the DefaultMessageStore.class
dependency in TransientStorePool
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
.../rocketmq/store/AllocateMappedFileService.java | 6 +++---
.../apache/rocketmq/store/DefaultMessageStore.java | 7 +++++--
.../apache/rocketmq/store/TransientStorePool.java | 13 ++++---------
3 files changed, 12 insertions(+), 14 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index dca7d5325..c8420fea1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (this.messageStore.isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
- canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
+ canSubmitRequests = this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size();
}
}
@@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
@@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread {
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6115ead59..f2a54ddf6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore {
this.reputMessageService = new ConcurrentReputMessageService();
}
- this.transientStorePool = new TransientStorePool(this);
+ this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore {
}
public int remainTransientStoreBufferNumbs() {
- return this.transientStorePool.availableBufferNums();
+ if (this.isTransientStorePoolEnable()) {
+ return this.transientStorePool.availableBufferNums();
+ }
+ return Integer.MAX_VALUE;
}
@Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index 8c1a5338b..0d42ee69e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -33,13 +33,11 @@ public class TransientStorePool {
private final int poolSize;
private final int fileSize;
private final Deque<ByteBuffer> availableBuffers;
- private final DefaultMessageStore messageStore;
private volatile boolean isRealCommit = true;
- public TransientStorePool(final DefaultMessageStore messageStore) {
- this.messageStore = messageStore;
- this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
- this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
+ public TransientStorePool(final int poolSize, final int fileSize) {
+ this.poolSize = poolSize;
+ this.fileSize = fileSize;
this.availableBuffers = new ConcurrentLinkedDeque<>();
}
@@ -81,10 +79,7 @@ public class TransientStorePool {
}
public int availableBufferNums() {
- if (messageStore.isTransientStorePoolEnable()) {
- return availableBuffers.size();
- }
- return Integer.MAX_VALUE;
+ return availableBuffers.size();
}
public boolean isRealCommit() {
--
2.32.0.windows.2
From 2b93e1e32fd458d9df2091e89ea259ddd4d54061 Mon Sep 17 00:00:00 2001
From: iamgd67 <iamgd67@sina.com>
Date: Thu, 17 Aug 2023 15:31:14 +0800
Subject: [PATCH 4/7] Update mqbroker to use runbroker.sh instead of
runserver.sh when use --enable-proxy (#7150)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Update mqbroker to use runbroker.sh instead of runserver.sh when enabling `--enable-proxy`
this allow JVM `heap` and `gc` configuration using broker's settings instead of other common serverices'(proxy,namenode, etc).
our main purpose, like the filename `mqbroker` suggest, is to start broker (which embeds a proxy), so use broker's config is reasonable
chinese version
mqbroker的--enable-proxy选项是启动内嵌了proxy的broker而不是内嵌broker的proxy而且broker的工作量和重要程度大于proxy所以使用broker的gc和heap配置更合适
---
distribution/bin/mqbroker | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker
index 3758ed597..35eb93c44 100644
--- a/distribution/bin/mqbroker
+++ b/distribution/bin/mqbroker
@@ -68,11 +68,11 @@ if [ "$enable_proxy" = true ]; then
if [ "$broker_config" != "" ]; then
args_for_proxy=${args_for_proxy}" -bc "${broker_config}
fi
- sh ${ROCKETMQ_HOME}/bin/runserver.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
+ sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
else
args_for_broker=$other_args
if [ "$broker_config" != "" ]; then
args_for_broker=${args_for_broker}" -c "${broker_config}
fi
sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup ${args_for_broker}
-fi
\ No newline at end of file
+fi
--
2.32.0.windows.2
From 05e7cde610255ed9410fffb0f153efe7c2c8a326 Mon Sep 17 00:00:00 2001
From: yao-wenbin <ywb992134@163.com>
Date: Fri, 18 Aug 2023 09:49:59 +0800
Subject: [PATCH 5/7] [ISSUE #7042] maven-compile job failed, Because TlsTest's
serverRejectsSSLClient test case will throw TooLongFrameException (#7179)
---
.../remoting/netty/NettyRemotingServer.java | 2 +-
.../java/org/apache/rocketmq/remoting/TlsTest.java | 14 ++++++++++++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 90e358ce3..17f138f86 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -502,7 +502,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
case DISABLED:
ctx.close();
log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
- break;
+ throw new UnsupportedOperationException("The NettyRemotingServer in SSL disabled mode doesn't support ssl client");
case PERMISSIVE:
case ENFORCING:
if (null != sslContext) {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index de7edbbfb..a4890d73d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -144,8 +144,13 @@ public class TlsTest {
tlsClientKeyPath = "";
tlsClientCertPath = "";
clientConfig.setUseTLS(false);
- } else if ("serverRejectsSSLClient".equals(name.getMethodName())) {
+ } else if ("disabledServerRejectsSSLClient".equals(name.getMethodName())) {
tlsMode = TlsMode.DISABLED;
+ } else if ("disabledServerAcceptUnAuthClient".equals(name.getMethodName())) {
+ tlsMode = TlsMode.DISABLED;
+ tlsClientKeyPath = "";
+ tlsClientCertPath = "";
+ clientConfig.setUseTLS(false);
} else if ("reloadSslContextForServer".equals(name.getMethodName())) {
tlsClientAuthServer = false;
tlsServerNeedClientAuth = "none";
@@ -211,7 +216,7 @@ public class TlsTest {
}
@Test
- public void serverRejectsSSLClient() throws Exception {
+ public void disabledServerRejectsSSLClient() throws Exception {
try {
RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 1000 * 5);
failBecauseExceptionWasNotThrown(RemotingSendRequestException.class);
@@ -219,6 +224,11 @@ public class TlsTest {
}
}
+ @Test
+ public void disabledServerAcceptUnAuthClient() throws Exception {
+ requestThenAssertResponse();
+ }
+
/**
* Tests that a server configured to require client authentication refuses to accept connections
* from a client that has an untrusted certificate.
--
2.32.0.windows.2
From 72d796f2b20b3ec6aebca8c004d9275d7c749a95 Mon Sep 17 00:00:00 2001
From: lk <xdkxlk@outlook.com>
Date: Fri, 18 Aug 2023 11:55:39 +0800
Subject: [PATCH 6/7] [ISSUE #7205] support batch ack for pop orderly (#7206)
---
.../broker/processor/AckMessageProcessor.java | 99 ++++++-----
.../rocketmq/client/impl/MQClientAPIImpl.java | 91 ++++++++--
.../test/client/rmq/RMQPopClient.java | 22 +++
.../client/consumer/pop/BasePopNormally.java | 6 +
.../test/client/consumer/pop/BatchAckIT.java | 159 ++++++++++++++++++
5 files changed, 322 insertions(+), 55 deletions(-)
create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 687811409..244b459d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.BitSet;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
@@ -186,46 +187,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
- // order
- String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
- long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
- if (ackOffset < oldOffset) {
- return;
- }
- while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
- }
- try {
- oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
- if (ackOffset < oldOffset) {
- return;
- }
- long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
- topic, consumeGroup,
- qId, ackOffset,
- popTime);
- if (nextOffset > -1) {
- if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
- topic, consumeGroup, qId)) {
- this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
- consumeGroup, topic, qId, nextOffset);
- }
- if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
- consumeGroup, qId, invisibleTime)) {
- this.brokerController.getPopMessageProcessor().notifyMessageArriving(
- topic, consumeGroup, qId);
- }
- } else if (nextOffset == -1) {
- String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
- lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
- POP_LOGGER.warn(errorInfo);
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(errorInfo);
- return;
- }
- } finally {
- this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
- }
- brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
+ ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}
@@ -250,17 +212,22 @@ public class AckMessageProcessor implements NettyRequestProcessor {
}
BatchAckMsg batchAckMsg = new BatchAckMsg();
- for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) {
- if (!batchAck.getBitSet().get(i)) {
- continue;
+ BitSet bitSet = batchAck.getBitSet();
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
+ if (i == Integer.MAX_VALUE) {
+ break;
}
long offset = startOffset + i;
if (offset < minOffset || offset > maxOffset) {
continue;
}
- batchAckMsg.getAckOffsetList().add(offset);
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
+ ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response);
+ } else {
+ batchAckMsg.getAckOffsetList().add(offset);
+ }
}
- if (batchAckMsg.getAckOffsetList().isEmpty()) {
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
@@ -311,4 +278,46 @@ public class AckMessageProcessor implements NettyRequestProcessor {
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
}
+
+ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) {
+ String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
+ long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
+ }
+ try {
+ oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
+ topic, consumeGroup,
+ qId, ackOffset,
+ popTime);
+ if (nextOffset > -1) {
+ if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+ topic, consumeGroup, qId)) {
+ this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+ consumeGroup, topic, qId, nextOffset);
+ }
+ if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
+ consumeGroup, qId, invisibleTime)) {
+ this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+ topic, consumeGroup, qId);
+ }
+ } else if (nextOffset == -1) {
+ String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
+ lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
+ POP_LOGGER.warn(errorInfo);
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(errorInfo);
+ return;
+ }
+ } finally {
+ this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
+ }
+ brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1);
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 5101ffc8e..213c26fd6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
@@ -76,7 +78,8 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -114,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -196,6 +201,10 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
@@ -207,10 +216,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH
import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -221,8 +226,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
@@ -885,9 +888,77 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
final String addr,
final long timeOut,
final AckCallback ackCallback,
- final AckMessageRequestHeader requestHeader //
+ final AckMessageRequestHeader requestHeader
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
+ }
+
+ public void batchAckMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final String topic,
+ final String consumerGroup,
+ final List<String> extraInfoList
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ String brokerName = null;
+ Map<String, BatchAck> batchAckMap = new HashMap<>();
+ for (String extraInfo : extraInfoList) {
+ String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+ if (brokerName == null) {
+ brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
+ }
+ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+ ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+ ExtraInfoUtil.getPopTime(extraInfoData);
+ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+ BatchAck newBatchAck = new BatchAck();
+ newBatchAck.setConsumerGroup(consumerGroup);
+ newBatchAck.setTopic(topic);
+ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+ newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+ newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+ newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+ newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+ newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+ newBatchAck.setBitSet(new BitSet());
+ return newBatchAck;
+ });
+ bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+ }
+
+ BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody();
+ requestBody.setBrokerName(brokerName);
+ requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+ batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
+ }
+
+ public void batchAckMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final BatchAckMessageRequestBody requestBody
) throws RemotingException, MQBrokerException, InterruptedException {
- final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
+ }
+
+ protected void ackMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final AckMessageRequestHeader requestHeader,
+ final BatchAckMessageRequestBody requestBody
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request;
+ if (requestHeader != null) {
+ request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ if (requestBody != null) {
+ request.setBody(requestBody.encode());
+ }
+ }
this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
@Override
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 496bd6da4..09c60c0b4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.client.rmq;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.AckCallback;
@@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer {
return future;
}
+ public CompletableFuture<AckResult> batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup,
+ List<String> extraInfoList) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ future.completeExceptionally(e);
+ }
+ }, topic, consumerGroup, extraInfoList);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
public CompletableFuture<AckResult> changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic,
String consumerGroup, String extraInfo, long invisibleTime) {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
index 952fbe3f5..2e29b95a5 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
@@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop {
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true,
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
}
+
+ protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, int maxNums) {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
new file mode 100644
index 000000000..ec9153ccc
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class BatchAckIT extends BasePop {
+
+ protected String topic;
+ protected String group;
+ protected RMQNormalProducer producer = null;
+ protected RMQPopClient client = null;
+ protected String brokerAddr;
+ protected MessageQueue messageQueue;
+
+ @Before
+ public void setUp() {
+ brokerAddr = brokerController1.getBrokerAddr();
+ topic = MQRandomUtils.getRandomTopic();
+ group = initConsumerGroup();
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL);
+ producer = getProducer(NAMESRV_ADDR, topic);
+ client = getRMQPopClient();
+ messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ @Test
+ public void testBatchAckNormallyWithPopBuffer() throws Throwable {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
+
+ testBatchAck(() -> {
+ try {
+ return popMessageAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
+
+ testBatchAck(() -> {
+ try {
+ return popMessageAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBatchAckOrderly() throws Throwable {
+ testBatchAck(() -> {
+ try {
+ return popMessageOrderlyAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void testBatchAck(Supplier<PopResult> popResultSupplier) throws Throwable {
+ // Send 10 messages but do not ack, let them enter the retry topic
+ producer.send(10);
+ AtomicInteger firstMsgRcvNum = new AtomicInteger();
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+ PopResult popResult = popResultSupplier.get();
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+ firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size());
+ }
+ assertEquals(10, firstMsgRcvNum.get());
+ });
+ // sleep 6s, expect messages to enter the retry topic
+ TimeUnit.SECONDS.sleep(6);
+
+ producer.send(20);
+ List<String> extraInfoList = new ArrayList<>();
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+ PopResult popResult = popResultSupplier.get();
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+ extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
+ }
+ }
+ assertEquals(30, extraInfoList.size());
+ });
+
+ AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, group, extraInfoList).get();
+ assertEquals(AckStatus.OK, ackResult.getStatus());
+
+ // sleep 6s, expected that messages that have been acked will not be re-consumed
+ TimeUnit.SECONDS.sleep(6);
+ PopResult popResult = popResultSupplier.get();
+ assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus());
+ }
+
+ private CompletableFuture<PopResult> popMessageAsync() {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+ }
+
+ private CompletableFuture<PopResult> popMessageOrderlyAsync() {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false,
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null);
+ }
+}
--
2.32.0.windows.2
From cc16a1b51216e1e80c22011b8b01e060bb4af8b3 Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Tue, 22 Aug 2023 10:42:25 +0800
Subject: [PATCH 7/7] Set table reference the same object for
setSubscriptionGroupTable method (#7204)
---
.../broker/subscription/SubscriptionGroupManager.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 74e39c0fe..e63b93058 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -341,10 +341,7 @@ public class SubscriptionGroupManager extends ConfigManager {
public void setSubscriptionGroupTable(ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
- this.subscriptionGroupTable.clear();
- for (String key : subscriptionGroupTable.keySet()) {
- putSubscriptionGroupConfig(subscriptionGroupTable.get(key));
- }
+ this.subscriptionGroupTable = subscriptionGroupTable;
}
public boolean containsSubscriptionGroup(String group) {
--
2.32.0.windows.2