rocketmq/patch019-backport-some-bugfix.patch
2023-12-05 13:55:07 +08:00

1500 lines
63 KiB
Diff

From 42fcd278ca84f6988d48a7d11271fc81b921d59a Mon Sep 17 00:00:00 2001
From: lk <xdkxlk@outlook.com>
Date: Wed, 20 Sep 2023 15:41:23 +0800
Subject: [PATCH 01/12] [ISSUE #7374] Prepare to release Apache RocketMQ 5.1.4
(#7375)
---
common/src/main/java/org/apache/rocketmq/common/MQVersion.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
index bfd07a895..4f1990ff8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
- public static final int CURRENT_VERSION = Version.V5_1_3.ordinal();
+ public static final int CURRENT_VERSION = Version.V5_1_4.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
--
2.32.0.windows.2
From b8610d87bb55de1f4413460c05da529dab60c1c1 Mon Sep 17 00:00:00 2001
From: Jixiang Jin <lollipop@apache.org>
Date: Thu, 21 Sep 2023 16:21:44 +0800
Subject: [PATCH 02/12] Replace loggingMetricExporter with
OtlpJsonLoggingMetricExporter. (#7373)
* Replace loggingMetricExporter with OtlpJsonLoggingMetricExporter.
* Fix bazel workspace
* Move OtlpJsonLoggingMetricExporter to proxy and controller.
* Fix Bazel imports.
---
WORKSPACE | 1 +
broker/BUILD.bazel | 1 +
.../rocketmq/broker/metrics/BrokerMetricsManager.java | 9 +++++----
broker/src/main/resources/rmq.broker.logback.xml | 5 +++++
common/BUILD.bazel | 1 +
common/pom.xml | 4 ++++
controller/BUILD.bazel | 1 +
.../controller/metrics/ControllerMetricsManager.java | 9 +++++----
pom.xml | 5 +++++
proxy/BUILD.bazel | 1 +
.../rocketmq/proxy/metrics/ProxyMetricsManager.java | 11 ++++++-----
proxy/src/main/resources/rmq.proxy.logback.xml | 5 +++++
tieredstore/BUILD.bazel | 1 +
13 files changed, 41 insertions(+), 13 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index 3126f2d1d..8640485ba 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -92,6 +92,7 @@ maven_install(
"io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha",
"io.opentelemetry:opentelemetry-exporter-logging:1.29.0",
"io.opentelemetry:opentelemetry-sdk:1.29.0",
+ "io.opentelemetry:opentelemetry-exporter-logging-otlp:1.29.0",
"com.squareup.okio:okio-jvm:3.0.0",
"io.opentelemetry:opentelemetry-api:1.29.0",
"io.opentelemetry:opentelemetry-sdk-metrics:1.29.0",
diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 6adcdc7b9..64cb1b341 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -44,6 +44,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:io_opentelemetry_opentelemetry_sdk",
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 6af5afc14..39af18b9f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -23,7 +23,7 @@ import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
-import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
@@ -36,6 +36,7 @@ import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.ViewBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.resources.Resource;
@@ -113,7 +114,7 @@ public class BrokerMetricsManager {
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
- private LoggingMetricExporter loggingMetricExporter;
+ private MetricExporter loggingMetricExporter;
private Meter brokerMeter;
public static Supplier<AttributesBuilder> attributesBuilderSupplier = Attributes::builder;
@@ -327,8 +328,8 @@ public class BrokerMetricsManager {
if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
- loggingMetricExporter = LoggingMetricExporter.create(brokerConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
- java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+ loggingMetricExporter = OtlpJsonLoggingMetricExporter.create(brokerConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
+ java.util.logging.Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
.setInterval(brokerConfig.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();
diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml
index 3c51e59d4..32dc29736 100644
--- a/broker/src/main/resources/rmq.broker.logback.xml
+++ b/broker/src/main/resources/rmq.broker.logback.xml
@@ -672,6 +672,11 @@
<appender-ref ref="RocketmqTrafficSiftingAppender"/>
</logger>
+ <!-- Use json formatter to log metrics -->
+ <logger name="io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter" additivity="false" level="INFO">
+ <appender-ref ref="RocketmqBrokerMetricsSiftingAppender"/>
+ </logger>
+
<logger name="io.opentelemetry.exporter.logging.LoggingMetricExporter" additivity="false" level="INFO">
<appender-ref ref="RocketmqBrokerMetricsSiftingAppender"/>
</logger>
diff --git a/common/BUILD.bazel b/common/BUILD.bazel
index a95a19ccd..e6701d0fc 100644
--- a/common/BUILD.bazel
+++ b/common/BUILD.bazel
@@ -35,6 +35,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk",
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_lz4_lz4_java",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
diff --git a/common/pom.xml b/common/pom.xml
index 31eb0f087..accc7f0a8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -80,6 +80,10 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-exporter-logging-otlp</artifactId>
+ </dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
diff --git a/controller/BUILD.bazel b/controller/BUILD.bazel
index 843d9dc77..b2b743eb2 100644
--- a/controller/BUILD.bazel
+++ b/controller/BUILD.bazel
@@ -49,6 +49,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:org_slf4j_jul_to_slf4j",
],
)
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
index 650740bcc..be9e77eea 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
@@ -26,7 +26,7 @@ import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
-import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
@@ -38,6 +38,7 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.io.File;
@@ -121,7 +122,7 @@ public class ControllerMetricsManager {
private PrometheusHttpServer prometheusHttpServer;
- private LoggingMetricExporter loggingMetricExporter;
+ private MetricExporter loggingMetricExporter;
public static ControllerMetricsManager getInstance(ControllerManager controllerManager) {
if (instance == null) {
@@ -364,8 +365,8 @@ public class ControllerMetricsManager {
if (type == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
- loggingMetricExporter = LoggingMetricExporter.create(config.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
- java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+ loggingMetricExporter = OtlpJsonLoggingMetricExporter.create(config.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
+ java.util.logging.Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
.setInterval(config.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();
diff --git a/pom.xml b/pom.xml
index 9f0b3eb96..4b382c6da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -974,6 +974,11 @@
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-exporter-logging-otlp</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
diff --git a/proxy/BUILD.bazel b/proxy/BUILD.bazel
index b4f3c16e2..cb7af9254 100644
--- a/proxy/BUILD.bazel
+++ b/proxy/BUILD.bazel
@@ -52,6 +52,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_exporter_otlp",
"@maven//:io_opentelemetry_opentelemetry_exporter_prometheus",
"@maven//:io_opentelemetry_opentelemetry_exporter_logging",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:io_opentelemetry_opentelemetry_sdk",
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
index f5050858f..2b8dac5d8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
@@ -21,15 +21,16 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
-import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import java.util.HashMap;
@@ -42,9 +43,9 @@ import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.utils.StartAndShutdown;
-import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.AGGREGATION_DELTA;
@@ -67,7 +68,7 @@ public class ProxyMetricsManager implements StartAndShutdown {
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
- private LoggingMetricExporter loggingMetricExporter;
+ private MetricExporter loggingMetricExporter;
public static ObservableLongGauge proxyUp = null;
@@ -221,8 +222,8 @@ public class ProxyMetricsManager implements StartAndShutdown {
if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
- loggingMetricExporter = LoggingMetricExporter.create(proxyConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
- java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+ loggingMetricExporter = OtlpJsonLoggingMetricExporter.create(proxyConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
+ java.util.logging.Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
.setInterval(proxyConfig.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.build();
diff --git a/proxy/src/main/resources/rmq.proxy.logback.xml b/proxy/src/main/resources/rmq.proxy.logback.xml
index d38827f92..f968a45e6 100644
--- a/proxy/src/main/resources/rmq.proxy.logback.xml
+++ b/proxy/src/main/resources/rmq.proxy.logback.xml
@@ -418,6 +418,11 @@
<appender-ref ref="RocketmqProxyWatermarkAppender" />
</logger>
+ <!-- Use json formatter to log metrics -->
+ <logger name="io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter" additivity="false" level="INFO">
+ <appender-ref ref="RocketmqProxyMetricsAppender"/>
+ </logger>
+
<logger name="io.opentelemetry.exporter.logging.LoggingMetricExporter" additivity="false" level="INFO">
<appender-ref ref="RocketmqProxyMetricsAppender" />
</logger>
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
index 5b3885a4e..dea2c561b 100644
--- a/tieredstore/BUILD.bazel
+++ b/tieredstore/BUILD.bazel
@@ -36,6 +36,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk",
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
+ "@maven//:io_opentelemetry_opentelemetry_exporter_logging_otlp",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:com_alibaba_fastjson",
--
2.32.0.windows.2
From 1a681bdf9b5c5ab0be446d6394c0cac8768f45d9 Mon Sep 17 00:00:00 2001
From: lk <xdkxlk@outlook.com>
Date: Thu, 21 Sep 2023 19:58:29 +0800
Subject: [PATCH 03/12] [maven-release-plugin] prepare release
rocketmq-all-5.1.4 (#7377)
---
acl/pom.xml | 2 +-
broker/pom.xml | 2 +-
client/pom.xml | 2 +-
common/pom.xml | 2 +-
container/pom.xml | 2 +-
controller/pom.xml | 2 +-
distribution/pom.xml | 2 +-
example/pom.xml | 2 +-
filter/pom.xml | 2 +-
namesrv/pom.xml | 2 +-
openmessaging/pom.xml | 2 +-
pom.xml | 4 ++--
proxy/pom.xml | 2 +-
remoting/pom.xml | 2 +-
srvutil/pom.xml | 2 +-
store/pom.xml | 2 +-
test/pom.xml | 2 +-
tieredstore/pom.xml | 2 +-
tools/pom.xml | 2 +-
19 files changed, 20 insertions(+), 20 deletions(-)
diff --git a/acl/pom.xml b/acl/pom.xml
index 989c0cf77..9f6838b00 100644
--- a/acl/pom.xml
+++ b/acl/pom.xml
@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
diff --git a/broker/pom.xml b/broker/pom.xml
index 16e026276..d483e67ba 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/client/pom.xml b/client/pom.xml
index c59a43889..4febedc6d 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/common/pom.xml b/common/pom.xml
index accc7f0a8..b70873dfa 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/container/pom.xml b/container/pom.xml
index c8499f127..e6c1f4b4d 100644
--- a/container/pom.xml
+++ b/container/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/controller/pom.xml b/controller/pom.xml
index 3346c7c82..46a3834c6 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index dbde2d9d4..346c4de35 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
diff --git a/example/pom.xml b/example/pom.xml
index 862fc3169..9e7db43f8 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/filter/pom.xml b/filter/pom.xml
index 3fe51ceae..84189066d 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 684b2683c..7c218078a 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index aaa4c896c..fd499e3de 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pom.xml b/pom.xml
index 4b382c6da..0e1d04f15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
@@ -37,7 +37,7 @@
<url>git@github.com:apache/rocketmq.git</url>
<connection>scm:git:git@github.com:apache/rocketmq.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq.git</developerConnection>
- <tag>HEAD</tag>
+ <tag>rocketmq-all-5.1.4</tag>
</scm>
<mailingLists>
diff --git a/proxy/pom.xml b/proxy/pom.xml
index 3fbea107a..abf242eee 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 8a43c5c30..fc70cb62e 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index fa54ad019..d7f946cce 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/store/pom.xml b/store/pom.xml
index 38f04009d..6d6983c5d 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/test/pom.xml b/test/pom.xml
index 8f25c35c9..39090e426 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index c476040ba..7b209751f 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/tools/pom.xml b/tools/pom.xml
index 1c3b431bc..806787ec9 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4-SNAPSHOT</version>
+ <version>5.1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
--
2.32.0.windows.2
From 73b3fde83765e066541e3455cd1e6604292a9b7c Mon Sep 17 00:00:00 2001
From: lk <xdkxlk@outlook.com>
Date: Fri, 22 Sep 2023 10:08:59 +0800
Subject: [PATCH 04/12] [maven-release-plugin] prepare for next development
iteration (#7379)
---
acl/pom.xml | 2 +-
broker/pom.xml | 2 +-
client/pom.xml | 2 +-
common/pom.xml | 2 +-
container/pom.xml | 2 +-
controller/pom.xml | 2 +-
distribution/pom.xml | 2 +-
example/pom.xml | 2 +-
filter/pom.xml | 2 +-
namesrv/pom.xml | 2 +-
openmessaging/pom.xml | 2 +-
pom.xml | 4 ++--
proxy/pom.xml | 2 +-
remoting/pom.xml | 2 +-
srvutil/pom.xml | 2 +-
store/pom.xml | 2 +-
test/pom.xml | 2 +-
tieredstore/pom.xml | 2 +-
tools/pom.xml | 2 +-
19 files changed, 20 insertions(+), 20 deletions(-)
diff --git a/acl/pom.xml b/acl/pom.xml
index 9f6838b00..8a296e5ae 100644
--- a/acl/pom.xml
+++ b/acl/pom.xml
@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
diff --git a/broker/pom.xml b/broker/pom.xml
index d483e67ba..add83045d 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/client/pom.xml b/client/pom.xml
index 4febedc6d..d6fb3889b 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/common/pom.xml b/common/pom.xml
index b70873dfa..6104c3ac6 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/container/pom.xml b/container/pom.xml
index e6c1f4b4d..8af231e01 100644
--- a/container/pom.xml
+++ b/container/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/controller/pom.xml b/controller/pom.xml
index 46a3834c6..8432b220b 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 346c4de35..73474d34a 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
diff --git a/example/pom.xml b/example/pom.xml
index 9e7db43f8..a8c7f5382 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/filter/pom.xml b/filter/pom.xml
index 84189066d..892f46e9d 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 7c218078a..e320ed573 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index fd499e3de..f10c8af6f 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/pom.xml b/pom.xml
index 0e1d04f15..4202d4095 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,7 +28,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
@@ -37,7 +37,7 @@
<url>git@github.com:apache/rocketmq.git</url>
<connection>scm:git:git@github.com:apache/rocketmq.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq.git</developerConnection>
- <tag>rocketmq-all-5.1.4</tag>
+ <tag>HEAD</tag>
</scm>
<mailingLists>
diff --git a/proxy/pom.xml b/proxy/pom.xml
index abf242eee..5c5349a8c 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index fc70cb62e..f78480680 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index d7f946cce..894e9cc6f 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/store/pom.xml b/store/pom.xml
index 6d6983c5d..e979030e8 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/test/pom.xml b/test/pom.xml
index 39090e426..168cbab0b 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index 7b209751f..b2ea40bf3 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/tools/pom.xml b/tools/pom.xml
index 806787ec9..e1daa57a6 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
- <version>5.1.4</version>
+ <version>5.1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
--
2.32.0.windows.2
From 88a9d939ce110381b3b418370d4711c0c214dc7f Mon Sep 17 00:00:00 2001
From: Ji Juntao <juntao.jjt@alibaba-inc.com>
Date: Sat, 23 Sep 2023 17:38:27 +0800
Subject: [PATCH 05/12] [ISSUE #7381] Fix the problem of inaccurate timer
message metric (#7382)
* correct the timerMetrics' result.
* for further extension.
* checkstyle.
* use toLong.
---
.../store/timer/TimerMessageStore.java | 20 +++++++++++++++----
.../rocketmq/store/timer/TimerMetrics.java | 5 ++++-
.../rocketmq/store/timer/TimerRequest.java | 7 +++++--
.../store/timer/TimerMetricsTest.java | 10 ++++++++--
4 files changed, 33 insertions(+), 9 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 0d50de65a..ac4c61cd6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
@@ -599,7 +600,12 @@ public class TimerMessageStore {
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
return;
}
- timerMetrics.addAndGet(msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC), value);
+ if (msg.getProperty(TIMER_ENQUEUE_MS) != null
+ && NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) {
+ return;
+ }
+ // pass msg into addAndGet, for further more judgement extension.
+ timerMetrics.addAndGet(msg, value);
} catch (Throwable t) {
if (frequency.incrementAndGet() % 1000 == 0) {
LOGGER.error("error in adding metric", t);
@@ -1323,6 +1329,7 @@ public class TimerMessageStore {
perfCounterTicks.startTick(ENQUEUE_PUT);
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
+ req.setEnqueueTime(Long.MAX_VALUE);
dequeuePutQueue.put(req);
} else {
boolean doEnqueueRes = doEnqueue(
@@ -1452,9 +1459,14 @@ public class TimerMessageStore {
}
try {
perfCounterTicks.startTick(DEQUEUE_PUT);
- DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
- addMetric(tr.getMsg(), -1);
- MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
+ MessageExt msgExt = tr.getMsg();
+ DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+ if (tr.getEnqueueTime() == Long.MAX_VALUE) {
+ // never enqueue, mark it.
+ MessageAccessor.putProperty(msgExt, TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
+ }
+ addMetric(msgExt, -1);
+ MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
while (!doRes && !isStopped()) {
if (!isRunningDequeue()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
index e7b00cc07..7f8fedd8a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMetrics.java
@@ -38,6 +38,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -78,7 +80,8 @@ public class TimerMetrics extends ConfigManager {
return distPair.getCount().addAndGet(value);
}
- public long addAndGet(String topic, int value) {
+ public long addAndGet(MessageExt msg, int value) {
+ String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
Metric pair = getTopicPair(topic);
getDataVersion().nextVersion();
pair.setTimeStamp(System.currentTimeMillis());
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
index 1dd64f759..1b25d355c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerRequest.java
@@ -27,8 +27,9 @@ public class TimerRequest {
private final int sizePy;
private final long delayTime;
- private final long enqueueTime;
private final int magic;
+
+ private long enqueueTime;
private MessageExt msg;
@@ -94,7 +95,9 @@ public class TimerRequest {
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
-
+ public void setEnqueueTime(long enqueueTime) {
+ this.enqueueTime = enqueueTime;
+ }
public void idempotentRelease() {
idempotentRelease(true);
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
index b7392cc45..3c7b9b67f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMetricsTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.store.timer;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Assert;
import org.junit.Test;
@@ -31,8 +34,11 @@ public class TimerMetricsTest {
TimerMetrics first = new TimerMetrics(baseDir);
Assert.assertTrue(first.load());
- first.addAndGet("AAA", 1000);
- first.addAndGet("BBB", 2000);
+ MessageExt msg = new MessageExt();
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "AAA");
+ first.addAndGet(msg, 1000);
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "BBB");
+ first.addAndGet(msg, 2000);
Assert.assertEquals(1000, first.getTimingCount("AAA"));
Assert.assertEquals(2000, first.getTimingCount("BBB"));
long curr = System.currentTimeMillis();
--
2.32.0.windows.2
From d7e5c4d1a4e048cd97f0b29a96a0fc575927a03e Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Mon, 25 Sep 2023 13:37:36 +0800
Subject: [PATCH 06/12] [ISSUE #7389] Fix the problem that getLastMappedFile
function affects performance
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
.../apache/rocketmq/store/MappedFileQueue.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 32b90d14f..9a0824829 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -377,8 +377,19 @@ public class MappedFileQueue implements Swappable {
}
public MappedFile getLastMappedFile() {
- MappedFile[] mappedFiles = this.mappedFiles.toArray(new MappedFile[0]);
- return mappedFiles.length == 0 ? null : mappedFiles[mappedFiles.length - 1];
+ MappedFile mappedFileLast = null;
+ while (!this.mappedFiles.isEmpty()) {
+ try {
+ mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
+ break;
+ } catch (IndexOutOfBoundsException e) {
+ //continue;
+ } catch (Exception e) {
+ log.error("getLastMappedFile has exception.", e);
+ break;
+ }
+ }
+ return mappedFileLast;
}
public boolean resetOffset(long offset) {
--
2.32.0.windows.2
From 3fd43353fdf880deb5d63ba3ad50cc6e3259dc3a Mon Sep 17 00:00:00 2001
From: Zhouxiang Zhan <zhouxzhan@apache.org>
Date: Tue, 26 Sep 2023 13:53:51 +0800
Subject: [PATCH 07/12] [ISSUE #7393] Add timeout configuration for grpc server
(#7394)
* Add timeout configuration for grpc server
* Add proxyConfig
---
.../java/org/apache/rocketmq/proxy/ProxyStartup.java | 1 +
.../apache/rocketmq/proxy/config/ProxyConfig.java | 9 +++++++++
.../org/apache/rocketmq/proxy/grpc/GrpcServer.java | 10 ++++++++--
.../rocketmq/proxy/grpc/GrpcServerBuilder.java | 12 +++++++++++-
4 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 06d5f4525..3b2ca99bf 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -85,6 +85,7 @@ public class ProxyStartup {
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
.configInterceptor(accessValidators)
+ .shutdownTime(ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(), TimeUnit.SECONDS)
.build();
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index b2478fec3..c0d00d864 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -87,6 +87,7 @@ public class ProxyConfig implements ConfigFile {
*/
private String proxyMode = ProxyMode.CLUSTER.name();
private Integer grpcServerPort = 8081;
+ private long grpcShutdownTimeSeconds = 30;
private int grpcBossLoopNum = 1;
private int grpcWorkerLoopNum = PROCESSOR_NUMBER * 2;
private boolean enableGrpcEpoll = false;
@@ -443,6 +444,14 @@ public class ProxyConfig implements ConfigFile {
this.grpcServerPort = grpcServerPort;
}
+ public long getGrpcShutdownTimeSeconds() {
+ return grpcShutdownTimeSeconds;
+ }
+
+ public void setGrpcShutdownTimeSeconds(long grpcShutdownTimeSeconds) {
+ this.grpcShutdownTimeSeconds = grpcShutdownTimeSeconds;
+ }
+
public boolean isUseEndpointPortFromRequest() {
return useEndpointPortFromRequest;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
index 1bffa3c0b..d5b896fe1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
@@ -29,8 +29,14 @@ public class GrpcServer implements StartAndShutdown {
private final Server server;
- protected GrpcServer(Server server) {
+ private final long timeout;
+
+ private final TimeUnit unit;
+
+ protected GrpcServer(Server server, long timeout, TimeUnit unit) {
this.server = server;
+ this.timeout = timeout;
+ this.unit = unit;
}
public void start() throws Exception {
@@ -40,7 +46,7 @@ public class GrpcServer implements StartAndShutdown {
public void shutdown() {
try {
- this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ this.server.shutdown().awaitTermination(timeout, unit);
log.info("grpc server shutdown successfully.");
} catch (Exception e) {
e.printStackTrace();
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 9cddd3013..0e79006f6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -41,6 +41,10 @@ public class GrpcServerBuilder {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected NettyServerBuilder serverBuilder;
+ protected long time = 30;
+
+ protected TimeUnit unit = TimeUnit.SECONDS;
+
public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor, int port) {
return new GrpcServerBuilder(executor, port);
}
@@ -77,6 +81,12 @@ public class GrpcServerBuilder {
port, bossLoopNum, workerLoopNum, maxInboundMessageSize);
}
+ public GrpcServerBuilder shutdownTime(long time, TimeUnit unit) {
+ this.time = time;
+ this.unit = unit;
+ return this;
+ }
+
public GrpcServerBuilder addService(BindableService service) {
this.serverBuilder.addService(service);
return this;
@@ -93,7 +103,7 @@ public class GrpcServerBuilder {
}
public GrpcServer build() {
- return new GrpcServer(this.serverBuilder.build());
+ return new GrpcServer(this.serverBuilder.build(), time, unit);
}
public GrpcServerBuilder configInterceptor(List<AccessValidator> accessValidators) {
--
2.32.0.windows.2
From c3b86cd1e3c068bc5847671c899a284e49a2ecdc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=9F=B3=E8=87=BB=E8=87=BB=28Steven=20shi=29?=
<shirenchuang@users.noreply.github.com>
Date: Tue, 26 Sep 2023 16:07:13 +0800
Subject: [PATCH 08/12] [ISSUE #7398] Fix ExportConfigsCommand NPE (#7399)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: 石臻臻 <shirenchuang.src@cainiao.com>
---
.../command/export/ExportConfigsCommand.java | 42 ++++++++++++-------
1 file changed, 26 insertions(+), 16 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
index 03613b29c..c3f96d597 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/export/ExportConfigsCommand.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Arrays;
import java.util.Properties;
import com.alibaba.fastjson.JSON;
@@ -106,24 +107,33 @@ public class ExportConfigsCommand implements SubCommand {
}
}
+
private Properties needBrokerProprties(Properties properties) {
+ List<String> propertyKeys = Arrays.asList(
+ "brokerClusterName",
+ "brokerId",
+ "brokerName",
+ "brokerRole",
+ "fileReservedTime",
+ "filterServerNums",
+ "flushDiskType",
+ "maxMessageSize",
+ "messageDelayLevel",
+ "msgTraceTopicName",
+ "slaveReadEnable",
+ "traceOn",
+ "traceTopicEnable",
+ "useTLS",
+ "autoCreateTopicEnable",
+ "autoCreateSubscriptionGroup"
+ );
+
Properties newProperties = new Properties();
- newProperties.setProperty("brokerClusterName", properties.getProperty("brokerClusterName"));
- newProperties.setProperty("brokerId", properties.getProperty("brokerId"));
- newProperties.setProperty("brokerName", properties.getProperty("brokerName"));
- newProperties.setProperty("brokerRole", properties.getProperty("brokerRole"));
- newProperties.setProperty("fileReservedTime", properties.getProperty("fileReservedTime"));
- newProperties.setProperty("filterServerNums", properties.getProperty("filterServerNums"));
- newProperties.setProperty("flushDiskType", properties.getProperty("flushDiskType"));
- newProperties.setProperty("maxMessageSize", properties.getProperty("maxMessageSize"));
- newProperties.setProperty("messageDelayLevel", properties.getProperty("messageDelayLevel"));
- newProperties.setProperty("msgTraceTopicName", properties.getProperty("msgTraceTopicName"));
- newProperties.setProperty("slaveReadEnable", properties.getProperty("slaveReadEnable"));
- newProperties.setProperty("traceOn", properties.getProperty("traceOn"));
- newProperties.setProperty("traceTopicEnable", properties.getProperty("traceTopicEnable"));
- newProperties.setProperty("useTLS", properties.getProperty("useTLS"));
- newProperties.setProperty("autoCreateTopicEnable", properties.getProperty("autoCreateTopicEnable"));
- newProperties.setProperty("autoCreateSubscriptionGroup", properties.getProperty("autoCreateSubscriptionGroup"));
+ propertyKeys.stream()
+ .filter(key -> properties.getProperty(key) != null)
+ .forEach(key -> newProperties.setProperty(key, properties.getProperty(key)));
+
return newProperties;
}
+
}
--
2.32.0.windows.2
From 959a98120cea8022498557a308aff35e3d8def2a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E7=9F=B3=E8=87=BB=E8=87=BB=28Steven=20shi=29?=
<shirenchuang@users.noreply.github.com>
Date: Wed, 27 Sep 2023 01:59:58 +0800
Subject: [PATCH 09/12] [ISSUE #7400] Fix getBrokerEpochSubCommand NPE
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: 石臻臻 <shirenchuang.src@cainiao.com>
---
.../broker/processor/AdminBrokerProcessor.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 9e48431be..e77120e15 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2736,10 +2736,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final ReplicasManager replicasManager = this.brokerController.getReplicasManager();
assert replicasManager != null;
final BrokerConfig brokerConfig = this.brokerController.getBrokerConfig();
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ if (!brokerConfig.isEnableControllerMode()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("this request only for controllerMode ");
+ return response;
+ }
final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(),
- brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
+ brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset());
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setBody(entryCache.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
--
2.32.0.windows.2
From 0a6ae4605fef4eaab6262fbd370aba887d8ae58b Mon Sep 17 00:00:00 2001
From: tiger lee <1026203200@qq.com>
Date: Wed, 27 Sep 2023 14:43:15 +0800
Subject: [PATCH 10/12] [ISSUE #7396] Fix wrong word in
BrokerController#doResterBrokerAll (#7397)
---
.../main/java/org/apache/rocketmq/broker/BrokerController.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 53e2e1b62..d4bded600 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1807,7 +1807,7 @@ public class BrokerController {
TopicConfigSerializeWrapper topicConfigWrapper) {
if (shutdown) {
- BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
+ BrokerController.LOG.info("BrokerController#doRegisterBrokerAll: broker has shutdown, no need to register any more.");
return;
}
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
--
2.32.0.windows.2
From 4f1b42a7c5557bcadd6b9982a0c9bd876622c69e Mon Sep 17 00:00:00 2001
From: ShuangxiDing <dingshuangxi888@gmail.com>
Date: Thu, 28 Sep 2023 16:52:02 +0800
Subject: [PATCH 11/12] [ISSUE #7410] Handle the Exception when the Proxy
requests the client
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: 徒钟 <shuangxi.dsx@alibaba-inc.com>
---
.../remoting/channel/RemotingChannel.java | 23 ++++++++++++++-----
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 40946cabf..d755fdcc4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
+import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.channel.ChannelExtendAttributeGetter;
@@ -158,10 +159,15 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver
if (response.getCode() == ResponseCode.SUCCESS) {
ConsumerRunningInfo consumerRunningInfo = ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", consumerRunningInfo));
+ } else {
+ String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark());
+ RuntimeException e = new RuntimeException(errMsg);
+ responseFuture.completeExceptionally(e);
}
- String errMsg = String.format("get consumer running info failed, code:%s remark:%s", response.getCode(), response.getRemark());
- RuntimeException e = new RuntimeException(errMsg);
- responseFuture.completeExceptionally(e);
+ })
+ .exceptionally(t -> {
+ responseFuture.completeExceptionally(ExceptionUtils.getRealException(t));
+ return null;
});
return CompletableFuture.completedFuture(null);
} catch (Throwable t) {
@@ -183,10 +189,15 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver
if (response.getCode() == ResponseCode.SUCCESS) {
ConsumeMessageDirectlyResult result = ConsumeMessageDirectlyResult.decode(response.getBody(), ConsumeMessageDirectlyResult.class);
responseFuture.complete(new ProxyRelayResult<>(ResponseCode.SUCCESS, "", result));
+ } else {
+ String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
+ RuntimeException e = new RuntimeException(errMsg);
+ responseFuture.completeExceptionally(e);
}
- String errMsg = String.format("consume message directly failed, code:%s remark:%s", response.getCode(), response.getRemark());
- RuntimeException e = new RuntimeException(errMsg);
- responseFuture.completeExceptionally(e);
+ })
+ .exceptionally(t -> {
+ responseFuture.completeExceptionally(ExceptionUtils.getRealException(t));
+ return null;
});
return CompletableFuture.completedFuture(null);
} catch (Throwable t) {
--
2.32.0.windows.2
From c36bb78e850129b9db40adc5b0e1b9bfd5c8fd2e Mon Sep 17 00:00:00 2001
From: shriVATSA54 <116296557+shriVATSA54@users.noreply.github.com>
Date: Sat, 7 Oct 2023 12:22:39 +0530
Subject: [PATCH 12/12] [ISSUE 7313] Enhancement Optimization Method name
(#7420)
* Enhancment/method_name/#7313/
* Enhancment/method_name/#7313/
* Enhancment/method_name/#7313/
---
.../tieredstore/provider/TieredStoreTopicBlackListFilter.java | 2 +-
.../rocketmq/tieredstore/provider/TieredStoreTopicFilter.java | 2 +-
.../provider/TieredStoreTopicBlackListFilterTest.java | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
index 50adbb713..f8bf165bc 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilter.java
@@ -39,7 +39,7 @@ public class TieredStoreTopicBlackListFilter implements TieredStoreTopicFilter {
}
@Override
- public void addTopicToWhiteList(String topicName) {
+ public void addTopicToBlackList(String topicName) {
this.topicBlackSet.add(topicName);
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
index 3f26b8b02..f983ed6e9 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicFilter.java
@@ -21,5 +21,5 @@ public interface TieredStoreTopicFilter {
boolean filterTopic(String topicName);
- void addTopicToWhiteList(String topicName);
+ void addTopicToBlackList(String topicName);
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
index 2bf48173c..fbaafa1b4 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredStoreTopicBlackListFilterTest.java
@@ -30,7 +30,7 @@ public class TieredStoreTopicBlackListFilterTest {
String topicName = "WhiteTopic";
Assert.assertFalse(topicFilter.filterTopic(topicName));
- topicFilter.addTopicToWhiteList(topicName);
+ topicFilter.addTopicToBlackList(topicName);
Assert.assertTrue(topicFilter.filterTopic(topicName));
}
}
\ No newline at end of file
--
2.32.0.windows.2