1391 lines
70 KiB
Diff
1391 lines
70 KiB
Diff
From 50d1050437ed8748f86ee50261b50a1e1f63162e Mon Sep 17 00:00:00 2001
|
||
From: Jixiang Jin <lollipop@apache.org>
|
||
Date: Wed, 16 Aug 2023 21:15:00 +0800
|
||
Subject: [PATCH 1/7] To config the cardinalityLimit for openTelemetry metrics
|
||
exporting and fix logging config for metrics (#7196)
|
||
|
||
---
|
||
WORKSPACE | 14 +++---
|
||
.../broker/metrics/BrokerMetricsManager.java | 47 ++++++++++++++-----
|
||
.../broker/metrics/PopMetricsManager.java | 11 +++--
|
||
.../src/main/resources/rmq.broker.logback.xml | 17 ++++---
|
||
.../apache/rocketmq/common/BrokerConfig.java | 9 ++++
|
||
.../metrics/ControllerMetricsManager.java | 6 +--
|
||
pom.xml | 4 +-
|
||
.../metrics/RemotingMetricsManager.java | 10 ++--
|
||
.../rocketmq/store/DefaultMessageStore.java | 24 +++++-----
|
||
.../apache/rocketmq/store/MessageStore.java | 6 +--
|
||
.../metrics/DefaultStoreMetricsManager.java | 4 +-
|
||
.../plugin/AbstractPluginMessageStore.java | 6 +--
|
||
.../tieredstore/TieredMessageStore.java | 6 +--
|
||
.../metrics/TieredStoreMetricsManager.java | 23 +++++----
|
||
14 files changed, 110 insertions(+), 77 deletions(-)
|
||
|
||
diff --git a/WORKSPACE b/WORKSPACE
|
||
index a8a0aafe9..3126f2d1d 100644
|
||
--- a/WORKSPACE
|
||
+++ b/WORKSPACE
|
||
@@ -88,14 +88,14 @@ maven_install(
|
||
"io.grpc:grpc-api:1.47.0",
|
||
"io.grpc:grpc-testing:1.47.0",
|
||
"org.springframework:spring-core:5.3.26",
|
||
- "io.opentelemetry:opentelemetry-exporter-otlp:1.19.0",
|
||
- "io.opentelemetry:opentelemetry-exporter-prometheus:1.19.0-alpha",
|
||
- "io.opentelemetry:opentelemetry-exporter-logging:1.19.0",
|
||
- "io.opentelemetry:opentelemetry-sdk:1.19.0",
|
||
+ "io.opentelemetry:opentelemetry-exporter-otlp:1.29.0",
|
||
+ "io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha",
|
||
+ "io.opentelemetry:opentelemetry-exporter-logging:1.29.0",
|
||
+ "io.opentelemetry:opentelemetry-sdk:1.29.0",
|
||
"com.squareup.okio:okio-jvm:3.0.0",
|
||
- "io.opentelemetry:opentelemetry-api:1.19.0",
|
||
- "io.opentelemetry:opentelemetry-sdk-metrics:1.19.0",
|
||
- "io.opentelemetry:opentelemetry-sdk-common:1.19.0",
|
||
+ "io.opentelemetry:opentelemetry-api:1.29.0",
|
||
+ "io.opentelemetry:opentelemetry-sdk-metrics:1.29.0",
|
||
+ "io.opentelemetry:opentelemetry-sdk-common:1.29.0",
|
||
"io.github.aliyunmq:rocketmq-slf4j-api:1.0.0",
|
||
"io.github.aliyunmq:rocketmq-logback-classic:1.0.0",
|
||
"org.slf4j:jul-to-slf4j:2.0.6",
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
|
||
index f0b76107e..6af5afc14 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
|
||
@@ -34,8 +34,10 @@ import io.opentelemetry.sdk.metrics.InstrumentType;
|
||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
|
||
import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
|
||
+import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
|
||
import io.opentelemetry.sdk.resources.Resource;
|
||
import java.util.ArrayList;
|
||
import java.util.Arrays;
|
||
@@ -361,22 +363,45 @@ public class BrokerMetricsManager {
|
||
.setType(InstrumentType.HISTOGRAM)
|
||
.setName(HISTOGRAM_MESSAGE_SIZE)
|
||
.build();
|
||
- View messageSizeView = View.builder()
|
||
- .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
|
||
- .build();
|
||
- providerBuilder.registerView(messageSizeSelector, messageSizeView);
|
||
-
|
||
- for (Pair<InstrumentSelector, View> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
|
||
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
|
||
+ ViewBuilder messageSizeViewBuilder = View.builder()
|
||
+ .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets));
|
||
+ // To config the cardinalityLimit for openTelemetry metrics exporting.
|
||
+ SdkMeterProviderUtil.setCardinalityLimit(messageSizeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
|
||
+ providerBuilder.registerView(messageSizeSelector, messageSizeViewBuilder.build());
|
||
+
|
||
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : RemotingMetricsManager.getMetricsView()) {
|
||
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
|
||
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
|
||
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
|
||
}
|
||
|
||
- for (Pair<InstrumentSelector, View> selectorViewPair : messageStore.getMetricsView()) {
|
||
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
|
||
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : messageStore.getMetricsView()) {
|
||
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
|
||
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
|
||
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
|
||
}
|
||
|
||
- for (Pair<InstrumentSelector, View> selectorViewPair : PopMetricsManager.getMetricsView()) {
|
||
- providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
|
||
+ for (Pair<InstrumentSelector, ViewBuilder> selectorViewPair : PopMetricsManager.getMetricsView()) {
|
||
+ ViewBuilder viewBuilder = selectorViewPair.getObject2();
|
||
+ SdkMeterProviderUtil.setCardinalityLimit(viewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
|
||
+ providerBuilder.registerView(selectorViewPair.getObject1(), viewBuilder.build());
|
||
}
|
||
+
|
||
+ // default view builder for all counter.
|
||
+ InstrumentSelector defaultCounterSelector = InstrumentSelector.builder()
|
||
+ .setType(InstrumentType.COUNTER)
|
||
+ .build();
|
||
+ ViewBuilder defaultCounterViewBuilder = View.builder().setDescription("default view for counter.");
|
||
+ SdkMeterProviderUtil.setCardinalityLimit(defaultCounterViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
|
||
+ providerBuilder.registerView(defaultCounterSelector, defaultCounterViewBuilder.build());
|
||
+
|
||
+ //default view builder for all observable gauge.
|
||
+ InstrumentSelector defaultGaugeSelector = InstrumentSelector.builder()
|
||
+ .setType(InstrumentType.OBSERVABLE_GAUGE)
|
||
+ .build();
|
||
+ ViewBuilder defaultGaugeViewBuilder = View.builder().setDescription("default view for gauge.");
|
||
+ SdkMeterProviderUtil.setCardinalityLimit(defaultGaugeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
|
||
+ providerBuilder.registerView(defaultGaugeSelector, defaultGaugeViewBuilder.build());
|
||
}
|
||
|
||
private void initStatsMetrics() {
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
|
||
index 463371d7e..2de220da1 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
|
||
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
import io.opentelemetry.sdk.metrics.InstrumentType;
|
||
import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.time.Duration;
|
||
import java.util.Arrays;
|
||
import java.util.List;
|
||
@@ -63,7 +64,7 @@ public class PopMetricsManager {
|
||
private static LongCounter popReviveGetTotal = new NopLongCounter();
|
||
private static LongCounter popReviveRetryMessageTotal = new NopLongCounter();
|
||
|
||
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
List<Double> rpcCostTimeBuckets = Arrays.asList(
|
||
(double) Duration.ofMillis(1).toMillis(),
|
||
(double) Duration.ofMillis(10).toMillis(),
|
||
@@ -76,10 +77,10 @@ public class PopMetricsManager {
|
||
.setType(InstrumentType.HISTOGRAM)
|
||
.setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
|
||
.build();
|
||
- View popBufferScanTimeConsumeView = View.builder()
|
||
- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
|
||
- .build();
|
||
- return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView));
|
||
+ ViewBuilder popBufferScanTimeConsumeViewBuilder = View.builder()
|
||
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
|
||
+
|
||
+ return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeViewBuilder));
|
||
}
|
||
|
||
public static void initMetrics(Meter meter, BrokerController brokerController,
|
||
diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml
|
||
index 7d49f6664..3c51e59d4 100644
|
||
--- a/broker/src/main/resources/rmq.broker.logback.xml
|
||
+++ b/broker/src/main/resources/rmq.broker.logback.xml
|
||
@@ -559,27 +559,27 @@
|
||
</sift>
|
||
</appender>
|
||
|
||
- <appender name="RocketmqBrokerMetricsAppender" class="ch.qos.logback.classic.sift.SiftingAppender">
|
||
+ <appender name="RocketmqBrokerMetricsSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender">
|
||
<discriminator>
|
||
<key>brokerContainerLogDir</key>
|
||
<defaultValue>${file.separator}</defaultValue>
|
||
</discriminator>
|
||
<sift>
|
||
- <appender name="RocketmqCommercialAppender"
|
||
+ <appender name="RocketmqBrokerMetricsAppender"
|
||
class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||
<file>
|
||
- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metric.log
|
||
+ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}broker_metrics.log
|
||
</file>
|
||
<append>true</append>
|
||
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
|
||
<fileNamePattern>
|
||
- ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metric.%i.log.gz
|
||
+ ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}broker_metrics.%i.log.gz
|
||
</fileNamePattern>
|
||
<minIndex>1</minIndex>
|
||
- <maxIndex>10</maxIndex>
|
||
+ <maxIndex>3</maxIndex>
|
||
</rollingPolicy>
|
||
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
|
||
- <maxFileSize>500MB</maxFileSize>
|
||
+ <maxFileSize>512MB</maxFileSize>
|
||
</triggeringPolicy>
|
||
<encoder>
|
||
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
|
||
@@ -588,6 +588,9 @@
|
||
</appender>
|
||
</sift>
|
||
</appender>
|
||
+ <appender name="RocketmqBrokerMetricsSiftingAppender" class="ch.qos.logback.classic.AsyncAppender">
|
||
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender_inner"/>
|
||
+ </appender>
|
||
|
||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||
<encoder>
|
||
@@ -670,7 +673,7 @@
|
||
</logger>
|
||
|
||
<logger name="io.opentelemetry.exporter.logging.LoggingMetricExporter" additivity="false" level="INFO">
|
||
- <appender-ref ref="RocketmqBrokerMetricsAppender"/>
|
||
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender"/>
|
||
</logger>
|
||
|
||
<root level="INFO">
|
||
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
index 99a5db5ad..45d26b29c 100644
|
||
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
||
@@ -350,6 +350,7 @@ public class BrokerConfig extends BrokerIdentity {
|
||
|
||
private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
|
||
|
||
+ private int metricsOtelCardinalityLimit = 50 * 1000;
|
||
private String metricsGrpcExporterTarget = "";
|
||
private String metricsGrpcExporterHeader = "";
|
||
private long metricGrpcExporterTimeOutInMills = 3 * 1000;
|
||
@@ -1531,6 +1532,14 @@ public class BrokerConfig extends BrokerIdentity {
|
||
this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
|
||
}
|
||
|
||
+ public int getMetricsOtelCardinalityLimit() {
|
||
+ return metricsOtelCardinalityLimit;
|
||
+ }
|
||
+
|
||
+ public void setMetricsOtelCardinalityLimit(int metricsOtelCardinalityLimit) {
|
||
+ this.metricsOtelCardinalityLimit = metricsOtelCardinalityLimit;
|
||
+ }
|
||
+
|
||
public String getMetricsGrpcExporterTarget() {
|
||
return metricsGrpcExporterTarget;
|
||
}
|
||
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
|
||
index 9b30a3b43..650740bcc 100644
|
||
--- a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
|
||
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
|
||
@@ -203,7 +203,7 @@ public class ControllerMetricsManager {
|
||
10 * s
|
||
);
|
||
|
||
- View latecyView = View.builder()
|
||
+ View latencyView = View.builder()
|
||
.setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets))
|
||
.build();
|
||
|
||
@@ -217,8 +217,8 @@ public class ControllerMetricsManager {
|
||
.setName(HISTOGRAM_DLEDGER_OP_LATENCY)
|
||
.build();
|
||
|
||
- providerBuilder.registerView(requestLatencySelector, latecyView);
|
||
- providerBuilder.registerView(dLedgerOpLatencySelector, latecyView);
|
||
+ providerBuilder.registerView(requestLatencySelector, latencyView);
|
||
+ providerBuilder.registerView(dLedgerOpLatencySelector, latencyView);
|
||
}
|
||
|
||
private void initMetric(Meter meter) {
|
||
diff --git a/pom.xml b/pom.xml
|
||
index 3a08d75f2..9f0b3eb96 100644
|
||
--- a/pom.xml
|
||
+++ b/pom.xml
|
||
@@ -133,8 +133,8 @@
|
||
<caffeine.version>2.9.3</caffeine.version>
|
||
<spring.version>5.3.27</spring.version>
|
||
<okio-jvm.version>3.0.0</okio-jvm.version>
|
||
- <opentelemetry.version>1.26.0</opentelemetry.version>
|
||
- <opentelemetry-exporter-prometheus.version>1.26.0-alpha</opentelemetry-exporter-prometheus.version>
|
||
+ <opentelemetry.version>1.29.0</opentelemetry.version>
|
||
+ <opentelemetry-exporter-prometheus.version>1.29.0-alpha</opentelemetry-exporter-prometheus.version>
|
||
<jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
|
||
<s3.version>2.20.29</s3.version>
|
||
<rocksdb.version>1.0.3</rocksdb.version>
|
||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
|
||
index 34136f94f..2e0d70856 100644
|
||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
|
||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
|
||
@@ -26,6 +26,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
import io.opentelemetry.sdk.metrics.InstrumentType;
|
||
import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.time.Duration;
|
||
import java.util.Arrays;
|
||
import java.util.List;
|
||
@@ -61,7 +62,7 @@ public class RemotingMetricsManager {
|
||
.build();
|
||
}
|
||
|
||
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
List<Double> rpcCostTimeBuckets = Arrays.asList(
|
||
(double) Duration.ofMillis(1).toMillis(),
|
||
(double) Duration.ofMillis(3).toMillis(),
|
||
@@ -77,10 +78,9 @@ public class RemotingMetricsManager {
|
||
.setType(InstrumentType.HISTOGRAM)
|
||
.setName(HISTOGRAM_RPC_LATENCY)
|
||
.build();
|
||
- View view = View.builder()
|
||
- .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
|
||
- .build();
|
||
- return Lists.newArrayList(new Pair<>(selector, view));
|
||
+ ViewBuilder viewBuilder = View.builder()
|
||
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets));
|
||
+ return Lists.newArrayList(new Pair<>(selector, viewBuilder));
|
||
}
|
||
|
||
public static String getWriteAndFlushResult(Future<?> future) {
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||
index 25e4a166f..6115ead59 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||
@@ -22,7 +22,7 @@ import io.openmessaging.storage.dledger.entry.DLedgerEntry;
|
||
import io.opentelemetry.api.common.AttributesBuilder;
|
||
import io.opentelemetry.api.metrics.Meter;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
-import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.io.File;
|
||
import java.io.IOException;
|
||
import java.io.RandomAccessFile;
|
||
@@ -42,23 +42,24 @@ import java.util.Map;
|
||
import java.util.Objects;
|
||
import java.util.Optional;
|
||
import java.util.Set;
|
||
-import java.util.concurrent.LinkedBlockingQueue;
|
||
-import java.util.concurrent.TimeUnit;
|
||
-import java.util.concurrent.TimeoutException;
|
||
-import java.util.concurrent.ExecutionException;
|
||
-import java.util.concurrent.Executors;
|
||
-import java.util.concurrent.ExecutorService;
|
||
-import java.util.concurrent.ThreadPoolExecutor;
|
||
-import java.util.concurrent.ScheduledExecutorService;
|
||
import java.util.concurrent.CompletableFuture;
|
||
-import java.util.concurrent.ConcurrentMap;
|
||
import java.util.concurrent.ConcurrentHashMap;
|
||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||
+import java.util.concurrent.ConcurrentMap;
|
||
+import java.util.concurrent.ExecutionException;
|
||
+import java.util.concurrent.ExecutorService;
|
||
+import java.util.concurrent.Executors;
|
||
+import java.util.concurrent.LinkedBlockingQueue;
|
||
+import java.util.concurrent.ScheduledExecutorService;
|
||
+import java.util.concurrent.ThreadPoolExecutor;
|
||
+import java.util.concurrent.TimeUnit;
|
||
+import java.util.concurrent.TimeoutException;
|
||
import java.util.concurrent.atomic.AtomicInteger;
|
||
import java.util.concurrent.atomic.AtomicLong;
|
||
import java.util.function.Supplier;
|
||
import org.apache.commons.lang3.StringUtils;
|
||
import org.apache.rocketmq.common.AbstractBrokerRunnable;
|
||
+import org.apache.rocketmq.common.BoundaryType;
|
||
import org.apache.rocketmq.common.BrokerConfig;
|
||
import org.apache.rocketmq.common.BrokerIdentity;
|
||
import org.apache.rocketmq.common.MixAll;
|
||
@@ -82,7 +83,6 @@ import org.apache.rocketmq.common.topic.TopicValidator;
|
||
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
|
||
import org.apache.rocketmq.common.utils.QueueTypeUtils;
|
||
import org.apache.rocketmq.common.utils.ServiceProvider;
|
||
-import org.apache.rocketmq.common.BoundaryType;
|
||
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
|
||
@@ -3268,7 +3268,7 @@ public class DefaultMessageStore implements MessageStore {
|
||
}
|
||
|
||
@Override
|
||
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
return DefaultStoreMetricsManager.getMetricsView();
|
||
}
|
||
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
|
||
index 31bbb907f..989cbbe31 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
|
||
@@ -19,8 +19,7 @@ package org.apache.rocketmq.store;
|
||
import io.opentelemetry.api.common.AttributesBuilder;
|
||
import io.opentelemetry.api.metrics.Meter;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
-import io.opentelemetry.sdk.metrics.View;
|
||
-
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.nio.ByteBuffer;
|
||
import java.util.HashMap;
|
||
import java.util.LinkedList;
|
||
@@ -28,7 +27,6 @@ import java.util.List;
|
||
import java.util.Set;
|
||
import java.util.concurrent.CompletableFuture;
|
||
import java.util.function.Supplier;
|
||
-
|
||
import org.apache.rocketmq.common.BoundaryType;
|
||
import org.apache.rocketmq.common.Pair;
|
||
import org.apache.rocketmq.common.SystemClock;
|
||
@@ -964,7 +962,7 @@ public interface MessageStore {
|
||
*
|
||
* @return List of metrics selector and view pair
|
||
*/
|
||
- List<Pair<InstrumentSelector, View>> getMetricsView();
|
||
+ List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView();
|
||
|
||
/**
|
||
* Init store metrics
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
|
||
index ff87f6369..45a6bbc68 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
|
||
@@ -23,7 +23,7 @@ import io.opentelemetry.api.metrics.LongCounter;
|
||
import io.opentelemetry.api.metrics.Meter;
|
||
import io.opentelemetry.api.metrics.ObservableLongGauge;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
-import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.io.File;
|
||
import java.util.List;
|
||
import java.util.function.Supplier;
|
||
@@ -69,7 +69,7 @@ public class DefaultStoreMetricsManager {
|
||
public static LongCounter timerDequeueTotal = new NopLongCounter();
|
||
public static LongCounter timerEnqueueTotal = new NopLongCounter();
|
||
|
||
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
return Lists.newArrayList();
|
||
}
|
||
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
|
||
index 25e947512..ab9fc6da7 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
|
||
@@ -20,8 +20,7 @@ package org.apache.rocketmq.store.plugin;
|
||
import io.opentelemetry.api.common.AttributesBuilder;
|
||
import io.opentelemetry.api.metrics.Meter;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
-import io.opentelemetry.sdk.metrics.View;
|
||
-
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.nio.ByteBuffer;
|
||
import java.util.HashMap;
|
||
import java.util.LinkedList;
|
||
@@ -29,7 +28,6 @@ import java.util.List;
|
||
import java.util.Set;
|
||
import java.util.concurrent.CompletableFuture;
|
||
import java.util.function.Supplier;
|
||
-
|
||
import org.apache.rocketmq.common.Pair;
|
||
import org.apache.rocketmq.common.SystemClock;
|
||
import org.apache.rocketmq.common.message.MessageExt;
|
||
@@ -643,7 +641,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
|
||
}
|
||
|
||
@Override
|
||
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
return next.getMetricsView();
|
||
}
|
||
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||
index ced1fb818..5240ac8e9 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
|
||
@@ -21,7 +21,7 @@ import io.opentelemetry.api.common.Attributes;
|
||
import io.opentelemetry.api.common.AttributesBuilder;
|
||
import io.opentelemetry.api.metrics.Meter;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
-import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.util.List;
|
||
import java.util.Set;
|
||
import java.util.concurrent.CompletableFuture;
|
||
@@ -352,8 +352,8 @@ public class TieredMessageStore extends AbstractPluginMessageStore {
|
||
}
|
||
|
||
@Override
|
||
- public List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
- List<Pair<InstrumentSelector, View>> res = super.getMetricsView();
|
||
+ public List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
+ List<Pair<InstrumentSelector, ViewBuilder>> res = super.getMetricsView();
|
||
res.addAll(TieredStoreMetricsManager.getMetricsView());
|
||
return res;
|
||
}
|
||
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
|
||
index 3ca0fb614..d8a07f0a7 100644
|
||
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
|
||
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
|
||
@@ -27,6 +27,7 @@ import io.opentelemetry.sdk.metrics.Aggregation;
|
||
import io.opentelemetry.sdk.metrics.InstrumentSelector;
|
||
import io.opentelemetry.sdk.metrics.InstrumentType;
|
||
import io.opentelemetry.sdk.metrics.View;
|
||
+import io.opentelemetry.sdk.metrics.ViewBuilder;
|
||
import java.util.ArrayList;
|
||
import java.util.Arrays;
|
||
import java.util.HashMap;
|
||
@@ -101,8 +102,8 @@ public class TieredStoreMetricsManager {
|
||
public static ObservableLongGauge storageSize = new NopObservableLongGauge();
|
||
public static ObservableLongGauge storageMessageReserveTime = new NopObservableLongGauge();
|
||
|
||
- public static List<Pair<InstrumentSelector, View>> getMetricsView() {
|
||
- ArrayList<Pair<InstrumentSelector, View>> res = new ArrayList<>();
|
||
+ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
|
||
+ ArrayList<Pair<InstrumentSelector, ViewBuilder>> res = new ArrayList<>();
|
||
|
||
InstrumentSelector providerRpcLatencySelector = InstrumentSelector.builder()
|
||
.setType(InstrumentType.HISTOGRAM)
|
||
@@ -114,10 +115,9 @@ public class TieredStoreMetricsManager {
|
||
.setName(HISTOGRAM_API_LATENCY)
|
||
.build();
|
||
|
||
- View rpcLatencyView = View.builder()
|
||
+ ViewBuilder rpcLatencyViewBuilder = View.builder()
|
||
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d, 3d, 5d, 7d, 10d, 100d, 200d, 400d, 600d, 800d, 1d * 1000, 1d * 1500, 1d * 3000)))
|
||
- .setDescription("tiered_store_rpc_latency_view")
|
||
- .build();
|
||
+ .setDescription("tiered_store_rpc_latency_view");
|
||
|
||
InstrumentSelector uploadBufferSizeSelector = InstrumentSelector.builder()
|
||
.setType(InstrumentType.HISTOGRAM)
|
||
@@ -129,15 +129,14 @@ public class TieredStoreMetricsManager {
|
||
.setName(HISTOGRAM_DOWNLOAD_BYTES)
|
||
.build();
|
||
|
||
- View bufferSizeView = View.builder()
|
||
+ ViewBuilder bufferSizeViewBuilder = View.builder()
|
||
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(1d * TieredStoreUtil.KB, 10d * TieredStoreUtil.KB, 100d * TieredStoreUtil.KB, 1d * TieredStoreUtil.MB, 10d * TieredStoreUtil.MB, 32d * TieredStoreUtil.MB, 50d * TieredStoreUtil.MB, 100d * TieredStoreUtil.MB)))
|
||
- .setDescription("tiered_store_buffer_size_view")
|
||
- .build();
|
||
+ .setDescription("tiered_store_buffer_size_view");
|
||
|
||
- res.add(new Pair<>(rpcLatencySelector, rpcLatencyView));
|
||
- res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyView));
|
||
- res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeView));
|
||
- res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeView));
|
||
+ res.add(new Pair<>(rpcLatencySelector, rpcLatencyViewBuilder));
|
||
+ res.add(new Pair<>(providerRpcLatencySelector, rpcLatencyViewBuilder));
|
||
+ res.add(new Pair<>(uploadBufferSizeSelector, bufferSizeViewBuilder));
|
||
+ res.add(new Pair<>(downloadBufferSizeSelector, bufferSizeViewBuilder));
|
||
return res;
|
||
}
|
||
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From a4bcc2a74d8bec9c9d34565536e87df06e0b11c1 Mon Sep 17 00:00:00 2001
|
||
From: Ziyi Tan <tanziyi0925@gmail.com>
|
||
Date: Thu, 17 Aug 2023 13:53:48 +0800
|
||
Subject: [PATCH 2/7] [ISSUE #7178] refresh metadata after broker startup
|
||
|
||
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
|
||
---
|
||
.../rocketmq/broker/BrokerController.java | 24 +++++++++----------
|
||
1 file changed, 12 insertions(+), 12 deletions(-)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||
index 30b1d2299..13f9d002b 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
||
@@ -663,7 +663,7 @@ public class BrokerController {
|
||
BrokerController.this.getSlaveSynchronize().syncAll();
|
||
lastSyncTimeMs = System.currentTimeMillis();
|
||
}
|
||
-
|
||
+
|
||
//timer checkpoint, latency-sensitive, so sync it more frequently
|
||
if (messageStoreConfig.isTimerWheelEnable()) {
|
||
BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
|
||
@@ -698,17 +698,6 @@ public class BrokerController {
|
||
|
||
initializeBrokerScheduledTasks();
|
||
|
||
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
|
||
- @Override
|
||
- public void run() {
|
||
- try {
|
||
- BrokerController.this.brokerOuterAPI.refreshMetadata();
|
||
- } catch (Exception e) {
|
||
- LOG.error("ScheduledTask refresh metadata exception", e);
|
||
- }
|
||
- }
|
||
- }, 10, 5, TimeUnit.SECONDS);
|
||
-
|
||
if (this.brokerConfig.getNamesrvAddr() != null) {
|
||
this.updateNamesrvAddr();
|
||
LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
|
||
@@ -1682,6 +1671,17 @@ public class BrokerController {
|
||
if (brokerConfig.isSkipPreOnline()) {
|
||
startServiceWithoutCondition();
|
||
}
|
||
+
|
||
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
|
||
+ @Override
|
||
+ public void run() {
|
||
+ try {
|
||
+ BrokerController.this.brokerOuterAPI.refreshMetadata();
|
||
+ } catch (Exception e) {
|
||
+ LOG.error("ScheduledTask refresh metadata exception", e);
|
||
+ }
|
||
+ }
|
||
+ }, 10, 5, TimeUnit.SECONDS);
|
||
}
|
||
|
||
protected void scheduleSendHeartbeat() {
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d Mon Sep 17 00:00:00 2001
|
||
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
||
Date: Thu, 17 Aug 2023 13:59:04 +0800
|
||
Subject: [PATCH 3/7] [ISSUE #7201] Remove the DefaultMessageStore.class
|
||
dependency in TransientStorePool
|
||
|
||
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
||
---
|
||
.../rocketmq/store/AllocateMappedFileService.java | 6 +++---
|
||
.../apache/rocketmq/store/DefaultMessageStore.java | 7 +++++--
|
||
.../apache/rocketmq/store/TransientStorePool.java | 13 ++++---------
|
||
3 files changed, 12 insertions(+), 14 deletions(-)
|
||
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
|
||
index dca7d5325..c8420fea1 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
|
||
@@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread {
|
||
if (this.messageStore.isTransientStorePoolEnable()) {
|
||
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
|
||
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
|
||
- canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
|
||
+ canSubmitRequests = this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size();
|
||
}
|
||
}
|
||
|
||
@@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread {
|
||
if (nextPutOK) {
|
||
if (canSubmitRequests <= 0) {
|
||
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
|
||
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
|
||
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
|
||
this.requestTable.remove(nextFilePath);
|
||
return null;
|
||
}
|
||
@@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread {
|
||
if (nextNextPutOK) {
|
||
if (canSubmitRequests <= 0) {
|
||
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
|
||
- "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
|
||
+ "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
|
||
this.requestTable.remove(nextNextFilePath);
|
||
} else {
|
||
boolean offerOK = this.requestQueue.offer(nextNextReq);
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||
index 6115ead59..f2a54ddf6 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
||
@@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore {
|
||
this.reputMessageService = new ConcurrentReputMessageService();
|
||
}
|
||
|
||
- this.transientStorePool = new TransientStorePool(this);
|
||
+ this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
|
||
|
||
this.scheduledExecutorService =
|
||
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
|
||
@@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore {
|
||
}
|
||
|
||
public int remainTransientStoreBufferNumbs() {
|
||
- return this.transientStorePool.availableBufferNums();
|
||
+ if (this.isTransientStorePoolEnable()) {
|
||
+ return this.transientStorePool.availableBufferNums();
|
||
+ }
|
||
+ return Integer.MAX_VALUE;
|
||
}
|
||
|
||
@Override
|
||
diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
|
||
index 8c1a5338b..0d42ee69e 100644
|
||
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
|
||
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
|
||
@@ -33,13 +33,11 @@ public class TransientStorePool {
|
||
private final int poolSize;
|
||
private final int fileSize;
|
||
private final Deque<ByteBuffer> availableBuffers;
|
||
- private final DefaultMessageStore messageStore;
|
||
private volatile boolean isRealCommit = true;
|
||
|
||
- public TransientStorePool(final DefaultMessageStore messageStore) {
|
||
- this.messageStore = messageStore;
|
||
- this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize();
|
||
- this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
|
||
+ public TransientStorePool(final int poolSize, final int fileSize) {
|
||
+ this.poolSize = poolSize;
|
||
+ this.fileSize = fileSize;
|
||
this.availableBuffers = new ConcurrentLinkedDeque<>();
|
||
}
|
||
|
||
@@ -81,10 +79,7 @@ public class TransientStorePool {
|
||
}
|
||
|
||
public int availableBufferNums() {
|
||
- if (messageStore.isTransientStorePoolEnable()) {
|
||
- return availableBuffers.size();
|
||
- }
|
||
- return Integer.MAX_VALUE;
|
||
+ return availableBuffers.size();
|
||
}
|
||
|
||
public boolean isRealCommit() {
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 2b93e1e32fd458d9df2091e89ea259ddd4d54061 Mon Sep 17 00:00:00 2001
|
||
From: iamgd67 <iamgd67@sina.com>
|
||
Date: Thu, 17 Aug 2023 15:31:14 +0800
|
||
Subject: [PATCH 4/7] Update mqbroker to use runbroker.sh instead of
|
||
runserver.sh when use --enable-proxy (#7150)
|
||
MIME-Version: 1.0
|
||
Content-Type: text/plain; charset=UTF-8
|
||
Content-Transfer-Encoding: 8bit
|
||
|
||
Update mqbroker to use runbroker.sh instead of runserver.sh when enabling `--enable-proxy`
|
||
this allow JVM `heap` and `gc` configuration using broker's settings instead of other common serverices'(proxy,namenode, etc).
|
||
our main purpose, like the filename `mqbroker` suggest, is to start broker (which embeds a proxy), so use broker's config is reasonable
|
||
|
||
chinese version
|
||
mqbroker的--enable-proxy选项是启动内嵌了proxy的broker,而不是内嵌broker的proxy,而且broker的工作量和重要程度大于proxy,所以使用broker的gc和heap配置更合适
|
||
---
|
||
distribution/bin/mqbroker | 4 ++--
|
||
1 file changed, 2 insertions(+), 2 deletions(-)
|
||
|
||
diff --git a/distribution/bin/mqbroker b/distribution/bin/mqbroker
|
||
index 3758ed597..35eb93c44 100644
|
||
--- a/distribution/bin/mqbroker
|
||
+++ b/distribution/bin/mqbroker
|
||
@@ -68,11 +68,11 @@ if [ "$enable_proxy" = true ]; then
|
||
if [ "$broker_config" != "" ]; then
|
||
args_for_proxy=${args_for_proxy}" -bc "${broker_config}
|
||
fi
|
||
- sh ${ROCKETMQ_HOME}/bin/runserver.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
|
||
+ sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.proxy.logback.xml org.apache.rocketmq.proxy.ProxyStartup ${args_for_proxy}
|
||
else
|
||
args_for_broker=$other_args
|
||
if [ "$broker_config" != "" ]; then
|
||
args_for_broker=${args_for_broker}" -c "${broker_config}
|
||
fi
|
||
sh ${ROCKETMQ_HOME}/bin/runbroker.sh -Drmq.logback.configurationFile=$ROCKETMQ_HOME/conf/rmq.broker.logback.xml org.apache.rocketmq.broker.BrokerStartup ${args_for_broker}
|
||
-fi
|
||
\ No newline at end of file
|
||
+fi
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 05e7cde610255ed9410fffb0f153efe7c2c8a326 Mon Sep 17 00:00:00 2001
|
||
From: yao-wenbin <ywb992134@163.com>
|
||
Date: Fri, 18 Aug 2023 09:49:59 +0800
|
||
Subject: [PATCH 5/7] [ISSUE #7042] maven-compile job failed, Because TlsTest's
|
||
serverRejectsSSLClient test case will throw TooLongFrameException (#7179)
|
||
|
||
---
|
||
.../remoting/netty/NettyRemotingServer.java | 2 +-
|
||
.../java/org/apache/rocketmq/remoting/TlsTest.java | 14 ++++++++++++--
|
||
2 files changed, 13 insertions(+), 3 deletions(-)
|
||
|
||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
||
index 90e358ce3..17f138f86 100644
|
||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
||
@@ -502,7 +502,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
||
case DISABLED:
|
||
ctx.close();
|
||
log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");
|
||
- break;
|
||
+ throw new UnsupportedOperationException("The NettyRemotingServer in SSL disabled mode doesn't support ssl client");
|
||
case PERMISSIVE:
|
||
case ENFORCING:
|
||
if (null != sslContext) {
|
||
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
|
||
index de7edbbfb..a4890d73d 100644
|
||
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
|
||
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
|
||
@@ -144,8 +144,13 @@ public class TlsTest {
|
||
tlsClientKeyPath = "";
|
||
tlsClientCertPath = "";
|
||
clientConfig.setUseTLS(false);
|
||
- } else if ("serverRejectsSSLClient".equals(name.getMethodName())) {
|
||
+ } else if ("disabledServerRejectsSSLClient".equals(name.getMethodName())) {
|
||
tlsMode = TlsMode.DISABLED;
|
||
+ } else if ("disabledServerAcceptUnAuthClient".equals(name.getMethodName())) {
|
||
+ tlsMode = TlsMode.DISABLED;
|
||
+ tlsClientKeyPath = "";
|
||
+ tlsClientCertPath = "";
|
||
+ clientConfig.setUseTLS(false);
|
||
} else if ("reloadSslContextForServer".equals(name.getMethodName())) {
|
||
tlsClientAuthServer = false;
|
||
tlsServerNeedClientAuth = "none";
|
||
@@ -211,7 +216,7 @@ public class TlsTest {
|
||
}
|
||
|
||
@Test
|
||
- public void serverRejectsSSLClient() throws Exception {
|
||
+ public void disabledServerRejectsSSLClient() throws Exception {
|
||
try {
|
||
RemotingCommand response = remotingClient.invokeSync(getServerAddress(), createRequest(), 1000 * 5);
|
||
failBecauseExceptionWasNotThrown(RemotingSendRequestException.class);
|
||
@@ -219,6 +224,11 @@ public class TlsTest {
|
||
}
|
||
}
|
||
|
||
+ @Test
|
||
+ public void disabledServerAcceptUnAuthClient() throws Exception {
|
||
+ requestThenAssertResponse();
|
||
+ }
|
||
+
|
||
/**
|
||
* Tests that a server configured to require client authentication refuses to accept connections
|
||
* from a client that has an untrusted certificate.
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From 72d796f2b20b3ec6aebca8c004d9275d7c749a95 Mon Sep 17 00:00:00 2001
|
||
From: lk <xdkxlk@outlook.com>
|
||
Date: Fri, 18 Aug 2023 11:55:39 +0800
|
||
Subject: [PATCH 6/7] [ISSUE #7205] support batch ack for pop orderly (#7206)
|
||
|
||
---
|
||
.../broker/processor/AckMessageProcessor.java | 99 ++++++-----
|
||
.../rocketmq/client/impl/MQClientAPIImpl.java | 91 ++++++++--
|
||
.../test/client/rmq/RMQPopClient.java | 22 +++
|
||
.../client/consumer/pop/BasePopNormally.java | 6 +
|
||
.../test/client/consumer/pop/BatchAckIT.java | 159 ++++++++++++++++++
|
||
5 files changed, 322 insertions(+), 55 deletions(-)
|
||
create mode 100644 test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
|
||
index 687811409..244b459d6 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
|
||
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
|
||
import com.alibaba.fastjson.JSON;
|
||
import io.netty.channel.Channel;
|
||
import io.netty.channel.ChannelHandlerContext;
|
||
+import java.util.BitSet;
|
||
import org.apache.rocketmq.broker.BrokerController;
|
||
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
|
||
import org.apache.rocketmq.common.KeyBuilder;
|
||
@@ -186,46 +187,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
|
||
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
|
||
|
||
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
|
||
- // order
|
||
- String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
|
||
- long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
|
||
- if (ackOffset < oldOffset) {
|
||
- return;
|
||
- }
|
||
- while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
|
||
- }
|
||
- try {
|
||
- oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
|
||
- if (ackOffset < oldOffset) {
|
||
- return;
|
||
- }
|
||
- long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
|
||
- topic, consumeGroup,
|
||
- qId, ackOffset,
|
||
- popTime);
|
||
- if (nextOffset > -1) {
|
||
- if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
|
||
- topic, consumeGroup, qId)) {
|
||
- this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
|
||
- consumeGroup, topic, qId, nextOffset);
|
||
- }
|
||
- if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
|
||
- consumeGroup, qId, invisibleTime)) {
|
||
- this.brokerController.getPopMessageProcessor().notifyMessageArriving(
|
||
- topic, consumeGroup, qId);
|
||
- }
|
||
- } else if (nextOffset == -1) {
|
||
- String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
|
||
- lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
|
||
- POP_LOGGER.warn(errorInfo);
|
||
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
|
||
- response.setRemark(errorInfo);
|
||
- return;
|
||
- }
|
||
- } finally {
|
||
- this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
|
||
- }
|
||
- brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
|
||
+ ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
|
||
return;
|
||
}
|
||
|
||
@@ -250,17 +212,22 @@ public class AckMessageProcessor implements NettyRequestProcessor {
|
||
}
|
||
|
||
BatchAckMsg batchAckMsg = new BatchAckMsg();
|
||
- for (int i = 0; batchAck.getBitSet() != null && i < batchAck.getBitSet().length(); i++) {
|
||
- if (!batchAck.getBitSet().get(i)) {
|
||
- continue;
|
||
+ BitSet bitSet = batchAck.getBitSet();
|
||
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
|
||
+ if (i == Integer.MAX_VALUE) {
|
||
+ break;
|
||
}
|
||
long offset = startOffset + i;
|
||
if (offset < minOffset || offset > maxOffset) {
|
||
continue;
|
||
}
|
||
- batchAckMsg.getAckOffsetList().add(offset);
|
||
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
|
||
+ ackOrderly(topic, consumeGroup, qId, offset, popTime, invisibleTime, channel, response);
|
||
+ } else {
|
||
+ batchAckMsg.getAckOffsetList().add(offset);
|
||
+ }
|
||
}
|
||
- if (batchAckMsg.getAckOffsetList().isEmpty()) {
|
||
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
|
||
return;
|
||
}
|
||
|
||
@@ -311,4 +278,46 @@ public class AckMessageProcessor implements NettyRequestProcessor {
|
||
PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
|
||
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
|
||
}
|
||
+
|
||
+ protected void ackOrderly(String topic, String consumeGroup, int qId, long ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand response) {
|
||
+ String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + PopAckConstants.SPLIT + qId;
|
||
+ long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
|
||
+ if (ackOffset < oldOffset) {
|
||
+ return;
|
||
+ }
|
||
+ while (!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey)) {
|
||
+ }
|
||
+ try {
|
||
+ oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, topic, qId);
|
||
+ if (ackOffset < oldOffset) {
|
||
+ return;
|
||
+ }
|
||
+ long nextOffset = brokerController.getConsumerOrderInfoManager().commitAndNext(
|
||
+ topic, consumeGroup,
|
||
+ qId, ackOffset,
|
||
+ popTime);
|
||
+ if (nextOffset > -1) {
|
||
+ if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
|
||
+ topic, consumeGroup, qId)) {
|
||
+ this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
|
||
+ consumeGroup, topic, qId, nextOffset);
|
||
+ }
|
||
+ if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
|
||
+ consumeGroup, qId, invisibleTime)) {
|
||
+ this.brokerController.getPopMessageProcessor().notifyMessageArriving(
|
||
+ topic, consumeGroup, qId);
|
||
+ }
|
||
+ } else if (nextOffset == -1) {
|
||
+ String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
|
||
+ lockKey, oldOffset, ackOffset, nextOffset, channel.remoteAddress());
|
||
+ POP_LOGGER.warn(errorInfo);
|
||
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
|
||
+ response.setRemark(errorInfo);
|
||
+ return;
|
||
+ }
|
||
+ } finally {
|
||
+ this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
|
||
+ }
|
||
+ brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, 1);
|
||
+ }
|
||
}
|
||
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
||
index 5101ffc8e..213c26fd6 100644
|
||
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
||
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
|
||
@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
|
||
import java.nio.ByteBuffer;
|
||
import java.util.ArrayList;
|
||
import java.util.Arrays;
|
||
+import java.util.BitSet;
|
||
import java.util.Collections;
|
||
import java.util.HashMap;
|
||
import java.util.Iterator;
|
||
@@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
|
||
import org.apache.rocketmq.client.producer.SendResult;
|
||
import org.apache.rocketmq.client.producer.SendStatus;
|
||
import org.apache.rocketmq.common.AclConfig;
|
||
+import org.apache.rocketmq.common.BoundaryType;
|
||
import org.apache.rocketmq.common.MQVersion;
|
||
import org.apache.rocketmq.common.MixAll;
|
||
import org.apache.rocketmq.common.Pair;
|
||
@@ -76,7 +78,8 @@ import org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
|
||
import org.apache.rocketmq.common.namesrv.TopAddressing;
|
||
import org.apache.rocketmq.common.sysflag.PullSysFlag;
|
||
import org.apache.rocketmq.common.topic.TopicValidator;
|
||
-import org.apache.rocketmq.common.BoundaryType;
|
||
+import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
import org.apache.rocketmq.remoting.CommandCustomHeader;
|
||
import org.apache.rocketmq.remoting.InvokeCallback;
|
||
import org.apache.rocketmq.remoting.RPCHook;
|
||
@@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
|
||
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
||
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
|
||
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
|
||
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
|
||
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
|
||
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
|
||
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
|
||
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
|
||
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
|
||
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
|
||
@@ -114,7 +120,6 @@ import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
|
||
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
|
||
import org.apache.rocketmq.remoting.protocol.body.GroupList;
|
||
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
|
||
-import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
|
||
import org.apache.rocketmq.remoting.protocol.body.KVTable;
|
||
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
|
||
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
|
||
@@ -196,6 +201,10 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
|
||
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
|
||
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
|
||
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
|
||
+import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
|
||
+import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
|
||
@@ -207,10 +216,6 @@ import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
|
||
import org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
|
||
-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
|
||
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
|
||
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
|
||
-import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
|
||
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
|
||
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
|
||
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
|
||
@@ -221,8 +226,6 @@ import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
|
||
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
||
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
|
||
import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
|
||
-import org.apache.rocketmq.logging.org.slf4j.Logger;
|
||
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
||
|
||
import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
|
||
|
||
@@ -885,9 +888,77 @@ public class MQClientAPIImpl implements NameServerUpdateCallback {
|
||
final String addr,
|
||
final long timeOut,
|
||
final AckCallback ackCallback,
|
||
- final AckMessageRequestHeader requestHeader //
|
||
+ final AckMessageRequestHeader requestHeader
|
||
+ ) throws RemotingException, MQBrokerException, InterruptedException {
|
||
+ ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
|
||
+ }
|
||
+
|
||
+ public void batchAckMessageAsync(
|
||
+ final String addr,
|
||
+ final long timeOut,
|
||
+ final AckCallback ackCallback,
|
||
+ final String topic,
|
||
+ final String consumerGroup,
|
||
+ final List<String> extraInfoList
|
||
+ ) throws RemotingException, MQBrokerException, InterruptedException {
|
||
+ String brokerName = null;
|
||
+ Map<String, BatchAck> batchAckMap = new HashMap<>();
|
||
+ for (String extraInfo : extraInfoList) {
|
||
+ String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
|
||
+ if (brokerName == null) {
|
||
+ brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
|
||
+ }
|
||
+ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
|
||
+ ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
|
||
+ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
|
||
+ ExtraInfoUtil.getPopTime(extraInfoData);
|
||
+ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
|
||
+ BatchAck newBatchAck = new BatchAck();
|
||
+ newBatchAck.setConsumerGroup(consumerGroup);
|
||
+ newBatchAck.setTopic(topic);
|
||
+ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
|
||
+ newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
|
||
+ newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
|
||
+ newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
|
||
+ newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
|
||
+ newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
|
||
+ newBatchAck.setBitSet(new BitSet());
|
||
+ return newBatchAck;
|
||
+ });
|
||
+ bAck.getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(extraInfoData) - ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
|
||
+ }
|
||
+
|
||
+ BatchAckMessageRequestBody requestBody = new BatchAckMessageRequestBody();
|
||
+ requestBody.setBrokerName(brokerName);
|
||
+ requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
|
||
+ batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
|
||
+ }
|
||
+
|
||
+ public void batchAckMessageAsync(
|
||
+ final String addr,
|
||
+ final long timeOut,
|
||
+ final AckCallback ackCallback,
|
||
+ final BatchAckMessageRequestBody requestBody
|
||
) throws RemotingException, MQBrokerException, InterruptedException {
|
||
- final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
|
||
+ ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
|
||
+ }
|
||
+
|
||
+ protected void ackMessageAsync(
|
||
+ final String addr,
|
||
+ final long timeOut,
|
||
+ final AckCallback ackCallback,
|
||
+ final AckMessageRequestHeader requestHeader,
|
||
+ final BatchAckMessageRequestBody requestBody
|
||
+ ) throws RemotingException, MQBrokerException, InterruptedException {
|
||
+ RemotingCommand request;
|
||
+ if (requestHeader != null) {
|
||
+ request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
|
||
+ } else {
|
||
+ request = RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
|
||
+ if (requestBody != null) {
|
||
+ request.setBody(requestBody.encode());
|
||
+ }
|
||
+ }
|
||
this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
|
||
|
||
@Override
|
||
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
|
||
index 496bd6da4..09c60c0b4 100644
|
||
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
|
||
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
|
||
@@ -17,6 +17,7 @@
|
||
|
||
package org.apache.rocketmq.test.client.rmq;
|
||
|
||
+import java.util.List;
|
||
import java.util.concurrent.CompletableFuture;
|
||
import org.apache.rocketmq.client.ClientConfig;
|
||
import org.apache.rocketmq.client.consumer.AckCallback;
|
||
@@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer {
|
||
return future;
|
||
}
|
||
|
||
+ public CompletableFuture<AckResult> batchAckMessageAsync(String brokerAddr, String topic, String consumerGroup,
|
||
+ List<String> extraInfoList) {
|
||
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
|
||
+ try {
|
||
+ this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new AckCallback() {
|
||
+ @Override
|
||
+ public void onSuccess(AckResult ackResult) {
|
||
+ future.complete(ackResult);
|
||
+ }
|
||
+
|
||
+ @Override
|
||
+ public void onException(Throwable e) {
|
||
+ future.completeExceptionally(e);
|
||
+ }
|
||
+ }, topic, consumerGroup, extraInfoList);
|
||
+ } catch (Throwable t) {
|
||
+ future.completeExceptionally(t);
|
||
+ }
|
||
+ return future;
|
||
+ }
|
||
+
|
||
public CompletableFuture<AckResult> changeInvisibleTimeAsync(String brokerAddr, String brokerName, String topic,
|
||
String consumerGroup, String extraInfo, long invisibleTime) {
|
||
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
|
||
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
|
||
index 952fbe3f5..2e29b95a5 100644
|
||
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
|
||
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
|
||
@@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop {
|
||
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true,
|
||
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
|
||
}
|
||
+
|
||
+ protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, int maxNums) {
|
||
+ return client.popMessageAsync(
|
||
+ brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, false,
|
||
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
|
||
+ }
|
||
}
|
||
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
|
||
new file mode 100644
|
||
index 000000000..ec9153ccc
|
||
--- /dev/null
|
||
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
|
||
@@ -0,0 +1,159 @@
|
||
+/*
|
||
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
||
+ * contributor license agreements. See the NOTICE file distributed with
|
||
+ * this work for additional information regarding copyright ownership.
|
||
+ * The ASF licenses this file to You under the Apache License, Version 2.0
|
||
+ * (the "License"); you may not use this file except in compliance with
|
||
+ * the License. You may obtain a copy of the License at
|
||
+ *
|
||
+ * http://www.apache.org/licenses/LICENSE-2.0
|
||
+ *
|
||
+ * Unless required by applicable law or agreed to in writing, software
|
||
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
||
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
+ * See the License for the specific language governing permissions and
|
||
+ * limitations under the License.
|
||
+ */
|
||
+
|
||
+package org.apache.rocketmq.test.client.consumer.pop;
|
||
+
|
||
+import java.time.Duration;
|
||
+import java.util.ArrayList;
|
||
+import java.util.List;
|
||
+import java.util.concurrent.CompletableFuture;
|
||
+import java.util.concurrent.TimeUnit;
|
||
+import java.util.concurrent.atomic.AtomicInteger;
|
||
+import java.util.function.Supplier;
|
||
+import org.apache.rocketmq.client.consumer.AckResult;
|
||
+import org.apache.rocketmq.client.consumer.AckStatus;
|
||
+import org.apache.rocketmq.client.consumer.PopResult;
|
||
+import org.apache.rocketmq.client.consumer.PopStatus;
|
||
+import org.apache.rocketmq.common.attribute.CQType;
|
||
+import org.apache.rocketmq.common.attribute.TopicMessageType;
|
||
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
|
||
+import org.apache.rocketmq.common.filter.ExpressionType;
|
||
+import org.apache.rocketmq.common.message.MessageConst;
|
||
+import org.apache.rocketmq.common.message.MessageExt;
|
||
+import org.apache.rocketmq.common.message.MessageQueue;
|
||
+import org.apache.rocketmq.test.base.IntegrationTestBase;
|
||
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
|
||
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
|
||
+import org.apache.rocketmq.test.util.MQRandomUtils;
|
||
+import org.junit.After;
|
||
+import org.junit.Before;
|
||
+import org.junit.Test;
|
||
+
|
||
+import static org.awaitility.Awaitility.await;
|
||
+import static org.junit.Assert.assertEquals;
|
||
+
|
||
+public class BatchAckIT extends BasePop {
|
||
+
|
||
+ protected String topic;
|
||
+ protected String group;
|
||
+ protected RMQNormalProducer producer = null;
|
||
+ protected RMQPopClient client = null;
|
||
+ protected String brokerAddr;
|
||
+ protected MessageQueue messageQueue;
|
||
+
|
||
+ @Before
|
||
+ public void setUp() {
|
||
+ brokerAddr = brokerController1.getBrokerAddr();
|
||
+ topic = MQRandomUtils.getRandomTopic();
|
||
+ group = initConsumerGroup();
|
||
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, CQType.SimpleCQ, TopicMessageType.NORMAL);
|
||
+ producer = getProducer(NAMESRV_ADDR, topic);
|
||
+ client = getRMQPopClient();
|
||
+ messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
|
||
+ }
|
||
+
|
||
+ @After
|
||
+ public void tearDown() {
|
||
+ shutdown();
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void testBatchAckNormallyWithPopBuffer() throws Throwable {
|
||
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
|
||
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
|
||
+
|
||
+ testBatchAck(() -> {
|
||
+ try {
|
||
+ return popMessageAsync().get();
|
||
+ } catch (Exception e) {
|
||
+ throw new RuntimeException(e);
|
||
+ }
|
||
+ });
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
|
||
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
|
||
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
|
||
+
|
||
+ testBatchAck(() -> {
|
||
+ try {
|
||
+ return popMessageAsync().get();
|
||
+ } catch (Exception e) {
|
||
+ throw new RuntimeException(e);
|
||
+ }
|
||
+ });
|
||
+ }
|
||
+
|
||
+ @Test
|
||
+ public void testBatchAckOrderly() throws Throwable {
|
||
+ testBatchAck(() -> {
|
||
+ try {
|
||
+ return popMessageOrderlyAsync().get();
|
||
+ } catch (Exception e) {
|
||
+ throw new RuntimeException(e);
|
||
+ }
|
||
+ });
|
||
+ }
|
||
+
|
||
+ public void testBatchAck(Supplier<PopResult> popResultSupplier) throws Throwable {
|
||
+ // Send 10 messages but do not ack, let them enter the retry topic
|
||
+ producer.send(10);
|
||
+ AtomicInteger firstMsgRcvNum = new AtomicInteger();
|
||
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
|
||
+ PopResult popResult = popResultSupplier.get();
|
||
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
|
||
+ firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size());
|
||
+ }
|
||
+ assertEquals(10, firstMsgRcvNum.get());
|
||
+ });
|
||
+ // sleep 6s, expect messages to enter the retry topic
|
||
+ TimeUnit.SECONDS.sleep(6);
|
||
+
|
||
+ producer.send(20);
|
||
+ List<String> extraInfoList = new ArrayList<>();
|
||
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
|
||
+ PopResult popResult = popResultSupplier.get();
|
||
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
|
||
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
|
||
+ extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
|
||
+ }
|
||
+ }
|
||
+ assertEquals(30, extraInfoList.size());
|
||
+ });
|
||
+
|
||
+ AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, group, extraInfoList).get();
|
||
+ assertEquals(AckStatus.OK, ackResult.getStatus());
|
||
+
|
||
+ // sleep 6s, expected that messages that have been acked will not be re-consumed
|
||
+ TimeUnit.SECONDS.sleep(6);
|
||
+ PopResult popResult = popResultSupplier.get();
|
||
+ assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus());
|
||
+ }
|
||
+
|
||
+ private CompletableFuture<PopResult> popMessageAsync() {
|
||
+ return client.popMessageAsync(
|
||
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false,
|
||
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
|
||
+ }
|
||
+
|
||
+ private CompletableFuture<PopResult> popMessageOrderlyAsync() {
|
||
+ return client.popMessageAsync(
|
||
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, group, 3000, false,
|
||
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null);
|
||
+ }
|
||
+}
|
||
--
|
||
2.32.0.windows.2
|
||
|
||
|
||
From cc16a1b51216e1e80c22011b8b01e060bb4af8b3 Mon Sep 17 00:00:00 2001
|
||
From: rongtong <jinrongtong5@163.com>
|
||
Date: Tue, 22 Aug 2023 10:42:25 +0800
|
||
Subject: [PATCH 7/7] Set table reference the same object for
|
||
setSubscriptionGroupTable method (#7204)
|
||
|
||
---
|
||
.../broker/subscription/SubscriptionGroupManager.java | 5 +----
|
||
1 file changed, 1 insertion(+), 4 deletions(-)
|
||
|
||
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
|
||
index 74e39c0fe..e63b93058 100644
|
||
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
|
||
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
|
||
@@ -341,10 +341,7 @@ public class SubscriptionGroupManager extends ConfigManager {
|
||
|
||
|
||
public void setSubscriptionGroupTable(ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
|
||
- this.subscriptionGroupTable.clear();
|
||
- for (String key : subscriptionGroupTable.keySet()) {
|
||
- putSubscriptionGroupConfig(subscriptionGroupTable.get(key));
|
||
- }
|
||
+ this.subscriptionGroupTable = subscriptionGroupTable;
|
||
}
|
||
|
||
public boolean containsSubscriptionGroup(String group) {
|
||
--
|
||
2.32.0.windows.2
|
||
|