rocketmq/patch017-backport-Convergent-thread-pool-creation.patch
2023-11-20 10:37:37 +08:00

2244 lines
122 KiB
Diff

From c100d815d754d7cb330bc63e145bafd2d9b59cb1 Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Mon, 11 Sep 2023 10:13:56 +0800
Subject: [PATCH 1/6] [ISSUE #7328] Convergent thread pool creation (#7329)
* Convergence thread pool creation to facilitate subsequent iteration management
* Convergence thread pool creation in ThreadPoolMonitor.java
* fix unit test
* Convergence ThreadPool constructor
* Convergence ScheduledThreadPool constructor
* remove unused import
* Convergence ScheduledThreadPool constructor
* remove unused import
---------
---
.../rocketmq/broker/BrokerController.java | 39 +++++-----
.../client/ClientHousekeepingService.java | 4 +-
.../DefaultConsumerIdsChangeListener.java | 3 +-
.../broker/controller/ReplicasManager.java | 9 +--
.../dledger/DLedgerRoleChangeHandler.java | 4 +-
.../broker/failover/EscapeBridge.java | 4 +-
.../broker/latency/BrokerFastFailure.java | 5 +-
.../BrokerFixedThreadPoolExecutor.java | 57 --------------
.../broker/latency/FutureTaskExt.java | 39 ----------
.../rocketmq/broker/out/BrokerOuterAPI.java | 7 +-
.../schedule/ScheduleMessageService.java | 7 +-
.../broker/topic/TopicRouteInfoManager.java | 4 +-
...ractTransactionalMessageCheckListener.java | 4 +-
.../rocketmq/broker/BrokerControllerTest.java | 2 +-
.../broker/latency/BrokerFastFailureTest.java | 1 +
.../common/config/AbstractRocksDBStorage.java | 6 +-
.../FutureTaskExtThreadPoolExecutor.java | 3 +-
.../common/thread/ThreadPoolMonitor.java | 6 +-
.../rocketmq/common/utils/ThreadUtils.java | 74 ++++++++++++++++---
.../rocketmq/container/BrokerContainer.java | 6 +-
.../controller/ControllerManager.java | 14 +---
.../controller/impl/DLedgerController.java | 10 +--
.../DefaultBrokerHeartbeatManager.java | 3 +-
.../rocketmq/namesrv/NamesrvController.java | 22 ++----
.../grpc/v2/channel/GrpcChannelManager.java | 6 +-
.../remoting/RemotingProtocolServer.java | 4 +-
.../proxy/service/ClusterServiceManager.java | 12 +--
.../proxy/service/LocalServiceManager.java | 4 +-
.../receipt/DefaultReceiptHandleManager.java | 8 +-
.../service/route/TopicRouteService.java | 9 +--
.../remoting/netty/NettyRemotingClient.java | 4 +-
.../remoting/netty/NettyRemotingServer.java | 4 +-
.../rocketmq/store/DefaultMessageStore.java | 8 +-
.../ha/autoswitch/AutoSwitchHAService.java | 38 +++++-----
.../rocketmq/store/kv/CompactionStore.java | 21 +++---
.../store/queue/ConsumeQueueStore.java | 4 +-
.../store/stats/BrokerStatsManager.java | 14 ++--
.../store/timer/TimerMessageStore.java | 6 +-
.../apache/rocketmq/test/util/StatUtil.java | 1 -
.../common/TieredStoreExecutor.java | 14 ++--
.../tools/admin/DefaultMQAdminExtImpl.java | 3 +-
41 files changed, 215 insertions(+), 278 deletions(-)
delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6aba70cb2..275b64b1a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
@@ -98,6 +97,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.Configuration;
@@ -160,7 +160,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -455,10 +454,10 @@ public class BrokerController {
* Initialize resources including remoting server and thread executors.
*/
protected void initializeResources() {
- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));
- this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
@@ -466,7 +465,7 @@ public class BrokerController {
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
- this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
@@ -474,7 +473,7 @@ public class BrokerController {
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
- this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getLitePullMessageThreadPoolNums(),
this.brokerConfig.getLitePullMessageThreadPoolNums(),
1000 * 60,
@@ -482,7 +481,7 @@ public class BrokerController {
this.litePullThreadPoolQueue,
new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));
- this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
+ this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
1000 * 60,
@@ -490,7 +489,7 @@ public class BrokerController {
this.putThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
- this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getAckMessageThreadPoolNums(),
this.brokerConfig.getAckMessageThreadPoolNums(),
1000 * 60,
@@ -498,7 +497,7 @@ public class BrokerController {
this.ackThreadPoolQueue,
new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
- this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
@@ -506,7 +505,7 @@ public class BrokerController {
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
- this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
+ this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getAdminBrokerThreadPoolNums(),
this.brokerConfig.getAdminBrokerThreadPoolNums(),
1000 * 60,
@@ -514,7 +513,7 @@ public class BrokerController {
this.adminBrokerThreadPoolQueue,
new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
- this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
@@ -522,7 +521,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
- this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
+ this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
@@ -530,7 +529,7 @@ public class BrokerController {
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));
- this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getConsumerManageThreadPoolNums(),
this.brokerConfig.getConsumerManageThreadPoolNums(),
1000 * 60,
@@ -538,7 +537,7 @@ public class BrokerController {
this.consumerManagerThreadPoolQueue,
new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));
- this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
@@ -546,7 +545,7 @@ public class BrokerController {
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));
- this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
+ this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
@@ -554,7 +553,7 @@ public class BrokerController {
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));
- this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
+ this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor(
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
1000 * 60,
@@ -562,9 +561,9 @@ public class BrokerController {
this.loadBalanceThreadPoolQueue,
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
- this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
+ this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
- this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
+ this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
@@ -828,8 +827,6 @@ public class BrokerController {
initializeResources();
- registerProcessor();
-
initializeScheduledTasks();
initialTransaction();
@@ -1690,6 +1687,8 @@ public class BrokerController {
}
}
}, 10, 5, TimeUnit.SECONDS);
+
+ registerProcessor();
}
protected void scheduleSendHeartbeat() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index 98e5f450f..cbb81f632 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -18,11 +18,11 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -35,7 +35,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
public ClientHousekeepingService(final BrokerController brokerController) {
this.brokerController = brokerController;
- scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("ClientHousekeepingScheduledThread", brokerController.getBrokerIdentity()));
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index 2ce036a0f..d17a2a547 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
private final BrokerController brokerController;
private final int cacheSize = 8096;
- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));
private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 37c82e434..a989e6e68 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -27,10 +27,8 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -42,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -107,9 +106,9 @@ public class ReplicasManager {
public ReplicasManager(final BrokerController brokerController) {
this.brokerController = brokerController;
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
- this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
- this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+ this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
+ this.executorService = ThreadUtils.newThreadPoolExecutor(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
+ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
this.brokerConfig = brokerController.getBrokerConfig();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
index 75023ee1b..e6cb97640 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
@@ -21,12 +21,12 @@ import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -49,7 +49,7 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange
this.messageStore = messageStore;
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
- this.executorService = Executors.newSingleThreadExecutor(
+ this.executorService = ThreadUtils.newSingleThreadExecutor(
new ThreadFactoryImpl("DLegerRoleChangeHandler_", brokerController.getBrokerIdentity()));
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 7c350fc1d..6a0817480 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
@@ -43,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -72,7 +72,7 @@ public class EscapeBridge {
public void start() throws Exception {
if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && brokerController.getBrokerConfig().isEnableRemoteEscape()) {
final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000);
- this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+ this.defaultAsyncSenderExecutor = ThreadUtils.newThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index d3d0bc8ba..3b6e9dc67 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -18,13 +18,14 @@ package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.netty.RequestTask;
@@ -43,7 +44,7 @@ public class BrokerFastFailure {
public BrokerFastFailure(final BrokerController brokerController) {
this.brokerController = brokerController;
- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
brokerController == null ? null : brokerController.getBrokerConfig()));
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
deleted file mode 100644
index d2d1143a3..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.latency;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
- final TimeUnit unit,
- final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory,
- final RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
- return new FutureTaskExt<>(runnable, value);
- }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
deleted file mode 100644
index f132efaeb..000000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.latency;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-public class FutureTaskExt<V> extends FutureTask<V> {
- private final Runnable runnable;
-
- public FutureTaskExt(final Callable<V> callable) {
- super(callable);
- this.runnable = null;
- }
-
- public FutureTaskExt(final Runnable runnable, final V result) {
- super(runnable, result);
- this.runnable = runnable;
- }
-
- public Runnable getRunnable() {
- return runnable;
- }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index ae81e8b11..9dfb8127d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.DefaultTopAddressing;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -144,7 +145,7 @@ public class BrokerOuterAPI {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr());
- private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+ private final ExecutorService brokerOuterExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
private final ClientMetadata clientMetadata;
private final RpcClient rpcClient;
@@ -1092,7 +1093,7 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() {
+ public ExecutorService getBrokerOuterExecutor() {
return brokerOuterExecutor;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 297b14207..0c2e6507b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager {
public ScheduleMessageService(final BrokerController brokerController) {
this.brokerController = brokerController;
this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
- scheduledPersistService = new ScheduledThreadPoolExecutor(1,
+ scheduledPersistService = ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
}
@@ -134,9 +133,9 @@ public class ScheduleMessageService extends ConfigManager {
public void start() {
if (started.compareAndSet(false, true)) {
this.load();
- this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
+ this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
- this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
+ this.handleExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index b35564725..11bde5f5f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -23,7 +23,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -66,7 +66,7 @@ public class TopicRouteInfoManager {
}
public void start() {
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
+ this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 771d84300..982355d78 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
@@ -97,7 +97,7 @@ public abstract class AbstractTransactionalMessageCheckListener {
public synchronized void initExecutorService() {
if (executorService == null) {
- executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
+ executorService = ThreadUtils.newThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
new ThreadFactoryImpl("Transaction-msg-check-thread", brokerController.getBrokerIdentity()), new CallerRunsPolicy());
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 75ad961ce..6035a20ac 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -23,9 +23,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.latency.FutureTaskExt;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
index 5d0f7f9d7..31b547cf1 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.latency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.junit.Test;
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index a720a5be3..6f19a9815 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import com.google.common.collect.Maps;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -82,8 +82,8 @@ public abstract class AbstractRocksDBStorage {
private volatile boolean closed;
private final Semaphore reloadPermit = new Semaphore(1);
- private final ScheduledExecutorService reloadScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("RocksDBStorageReloadService_"));
- private final ThreadPoolExecutor manualCompactionThread = new ThreadPoolExecutor(
+ private final ScheduledExecutorService reloadScheduler = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("RocksDBStorageReloadService_"));
+ private final ThreadPoolExecutor manualCompactionThread = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue(1),
new ThreadFactoryImpl("RocksDBManualCompactionService_"),
diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
index 411da9221..7b68873a9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
+++ b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
@@ -29,7 +29,8 @@ public class FutureTaskExtThreadPoolExecutor extends ThreadPoolExecutor {
public FutureTaskExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index 49d97a5d7..1bfabbffe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -22,12 +22,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ public class ThreadPoolMonitor {
private static Logger waterMarkLogger = LoggerFactory.getLogger(ThreadPoolMonitor.class);
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new CopyOnWriteArrayList<>();
- private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor(
+ private static final ScheduledExecutorService MONITOR_SCHEDULED = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
);
@@ -81,7 +81,7 @@ public class ThreadPoolMonitor {
String name,
int queueCapacity,
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
- ThreadPoolExecutor executor = new FutureTaskExtThreadPoolExecutor(
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
index 4b366d4e3..1644c6360 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
@@ -20,38 +20,94 @@ package org.apache.rocketmq.common.utils;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.thread.FutureTaskExtThreadPoolExecutor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public final class ThreadUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
- public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
- TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
- return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+ public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
+ return ThreadUtils.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
}
- public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
- return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
+ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
+ return ThreadUtils.newThreadPoolExecutor(1, threadFactory);
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, corePoolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ threadFactory);
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit, BlockingQueue<Runnable> workQueue,
+ String processName,
+ boolean isDaemon) {
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
+ }
+
+ public static ExecutorService newThreadPoolExecutor(final int corePoolSize,
+ final int maximumPoolSize,
+ final long keepAliveTime,
+ final TimeUnit unit,
+ final BlockingQueue<Runnable> workQueue,
+ final ThreadFactory threadFactory) {
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ return new FutureTaskExtThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
- return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon));
+ return ThreadUtils.newScheduledThreadPool(1, processName, isDaemon);
+ }
+
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
+ return ThreadUtils.newScheduledThreadPool(1, threadFactory);
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, Executors.defaultThreadFactory());
}
- public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, String processName,
boolean isDaemon) {
- return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon));
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, newThreadFactory(processName, isDaemon));
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler);
}
public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
- return newGenericThreadFactory("Remoting-" + processName, isDaemon);
+ return newGenericThreadFactory("ThreadUtils-" + processName, isDaemon);
}
public static ThreadFactory newGenericThreadFactory(String processName) {
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
index c6446f058..5b712bc30 100644
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
@@ -47,14 +47,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class BrokerContainer implements IBrokerContainer {
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new BasicThreadFactory.Builder()
.namingPattern("BrokerContainerScheduledThread")
.daemon(true)
@@ -143,7 +141,7 @@ public class BrokerContainer implements IBrokerContainer {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.containerClientHouseKeepingService);
this.fastRemotingServer = this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 2);
- this.brokerContainerExecutor = new ThreadPoolExecutor(
+ this.brokerContainerExecutor = ThreadUtils.newThreadPoolExecutor(
1,
1,
1000 * 60,
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 7c91e70da..3e6b0eba5 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
@@ -34,8 +32,8 @@ import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.future.FutureTaskExt;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
@@ -93,18 +91,14 @@ public class ControllerManager {
public boolean initialize() {
this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
- this.controllerRequestExecutor = new ThreadPoolExecutor(
+ this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor(
this.controllerConfig.getControllerThreadPoolNums(),
this.controllerConfig.getControllerThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.controllerRequestThreadPoolQueue,
- new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
- return new FutureTaskExt<T>(runnable, value);
- }
- };
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_"));
+
this.notifyService.initialize();
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index fa91f288e..33e4406e4 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -44,6 +43,7 @@ import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.Controller;
import org.apache.rocketmq.controller.elect.ElectPolicy;
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
@@ -66,11 +66,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
@@ -136,7 +136,7 @@ public class DLedgerController implements Controller {
this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
this.dLedgerServer.registerStateMachine(this.statemachine);
this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
- this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
+ this.scanInactiveMasterService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
this.brokerLifecycleListeners = new ArrayList<>();
}
@@ -513,7 +513,7 @@ public class DLedgerController implements Controller {
class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
private final String selfId;
- private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
+ private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
public RoleChangeHandler(final String selfId) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index 2fbddb9cd..6ebb2c994 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
@Override
public void initialize() {
- this.scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
+ this.scheduledService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
this.executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 15c65ebec..be327cffa 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -20,10 +20,7 @@ import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -31,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
@@ -62,10 +60,10 @@ public class NamesrvController {
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
new BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build());
- private final ScheduledExecutorService scanExecutorService = new ScheduledThreadPoolExecutor(1,
+ private final ScheduledExecutorService scanExecutorService = ThreadUtils.newScheduledThreadPool(1,
new BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build());
private final KVConfigManager kvConfigManager;
@@ -138,20 +136,10 @@ public class NamesrvController {
private void initiateThreadExecutors() {
this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
- this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
- return new FutureTaskExt<>(runnable, value);
- }
- };
+ this.defaultExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_"));
this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
- this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
- return new FutureTaskExt<>(runnable, value);
- }
- };
+ this.clientRequestExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_"));
}
private void initiateSslContext() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 14330dd8d..a18cf7600 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -21,13 +21,13 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -43,7 +43,7 @@ public class GrpcChannelManager implements StartAndShutdown {
protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
protected final ConcurrentMap<String /* nonce */, ResultFuture> resultNonceFutureMap = new ConcurrentHashMap<>();
- protected final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ protected final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("GrpcChannelManager_")
);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index bcc9edd09..fe07090d5 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.common.future.FutureTaskExt;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -178,7 +178,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue())
);
- this.timerExecutor = Executors.newSingleThreadScheduledExecutor(
+ this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
);
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index d2ddfc352..9786cec55 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.proxy.service;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
@@ -27,23 +26,24 @@ import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.common.NameserverAccessConfig;
+import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager;
+import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
import org.apache.rocketmq.proxy.service.message.ClusterMessageService;
import org.apache.rocketmq.proxy.service.message.MessageService;
import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
-import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
-import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
import org.apache.rocketmq.proxy.service.relay.ClusterProxyRelayService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ClusterTopicRouteService;
@@ -73,7 +73,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
NameserverAccessConfig nameserverAccessConfig = new NameserverAccessConfig(proxyConfig.getNamesrvAddr(),
proxyConfig.getNamesrvDomain(), proxyConfig.getNamesrvDomainSubgroup());
- this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(3);
this.messagingClientAPIFactory = new MQClientAPIFactory(
nameserverAccessConfig,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
index 4d1ca7b66..59cd92685 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.proxy.service;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
@@ -28,6 +27,7 @@ import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.admin.AdminService;
@@ -58,7 +58,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
private final MQClientAPIFactory mqClientAPIFactory;
private final ChannelManager channelManager;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("LocalServiceManagerScheduledThread"));
public LocalServiceManager(BrokerController brokerController, RPCHook rpcHook) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index 69f44344a..207603fe8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -42,20 +41,21 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
+import org.apache.rocketmq.proxy.common.RenewEvent;
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -68,7 +68,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
protected final StateEventListener<RenewEvent> eventListener;
protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
protected final ScheduledExecutorService scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
protected final ThreadPoolExecutor renewalWorkerService;
public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index caf62a1e0..ccf094c03 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -19,25 +19,24 @@ package org.apache.rocketmq.proxy.service.route;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.base.Optional;
import java.time.Duration;
import java.util.List;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Optional;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.latency.Resolver;
import org.apache.rocketmq.client.latency.ServiceDetector;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.Address;
@@ -63,7 +62,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("TopicRouteService_")
);
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 8491f4354..64621dd6c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +70,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_"));
- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
+ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_"));
if (eventLoopGroup != null) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index e626260c9..aa0d46542 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.constant.HAProxyConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.BinaryUtil;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -83,7 +84,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -171,7 +171,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
private ScheduledExecutorService buildScheduleExecutor() {
- return new ScheduledThreadPoolExecutor(1,
+ return ThreadUtils.newScheduledThreadPool(1,
new ThreadFactoryImpl("NettyServerScheduler_", true),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index f2a54ddf6..02ea47f13 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
@@ -83,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -205,7 +205,7 @@ public class DefaultMessageStore implements MessageStore {
private ConcurrentMap<String, TopicConfig> topicConfigTable;
private final ScheduledExecutorService scheduledCleanQueueExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable) throws IOException {
@@ -253,7 +253,7 @@ public class DefaultMessageStore implements MessageStore {
this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
this.scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
@@ -2915,7 +2915,7 @@ public class DefaultMessageStore implements MessageStore {
private final ExecutorService batchDispatchRequestExecutor;
public MainBatchDispatchRequestService() {
- batchDispatchRequestExecutor = new ThreadPoolExecutor(
+ batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor(
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
1000 * 60,
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index d5393fdca..f20bc3e28 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -17,10 +17,26 @@
package org.apache.rocketmq.store.ha.autoswitch;
-
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.EpochEntry;
@@ -36,30 +52,12 @@ import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
/**
* SwitchAble ha service, support switch role to master or slave.
*/
public class AutoSwitchHAService extends DefaultHAService {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
+ private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
private final ConcurrentHashMap<Long/*brokerId*/, Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
private final List<Consumer<Set<Long/*brokerId*/>>> syncStateSetChangedListeners = new ArrayList<>();
private final Set<Long/*brokerId*/> syncStateSet = new HashSet<>();
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index b37c90726..639084fa2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -16,17 +16,25 @@
*/
package org.apache.rocketmq.store.kv;
-import java.util.Random;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.CleanupPolicy;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -35,15 +43,6 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
public class CompactionStore {
public static final String COMPACTION_DIR = "compaction";
@@ -76,7 +75,7 @@ public class CompactionStore {
this.positionMgr = new CompactionPositionMgr(compactionPath);
this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1, config.getCompactionThreadNum()));
- this.compactionSchedule = Executors.newScheduledThreadPool(this.compactionThreadNum,
+ this.compactionSchedule = ThreadUtils.newScheduledThreadPool(this.compactionThreadNum,
new ThreadFactoryImpl("compactionSchedule_"));
this.offsetMapSize = config.getMaxOffsetMapSize() / compactionThreadNum;
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 8d38503b3..d03d15d65 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
@@ -34,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.CommitLog;
@@ -175,7 +175,7 @@ public class ConsumeQueueStore {
}
private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQueue, String threadNamePrefix) {
- return new ThreadPoolExecutor(
+ return ThreadUtils.newThreadPoolExecutor(
this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
1000 * 60,
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 2dd3fc5b5..489d7b4fb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.store.stats;
import java.util.HashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BrokerConfig;
@@ -32,13 +31,14 @@ import org.apache.rocketmq.common.statistics.StatisticsItemScheduledPrinter;
import org.apache.rocketmq.common.statistics.StatisticsItemStateGetter;
import org.apache.rocketmq.common.statistics.StatisticsKindMeta;
import org.apache.rocketmq.common.statistics.StatisticsManager;
+import org.apache.rocketmq.common.stats.MomentStatsItemSet;
import org.apache.rocketmq.common.stats.Stats;
+import org.apache.rocketmq.common.stats.StatsItem;
+import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.stats.MomentStatsItemSet;
-import org.apache.rocketmq.common.stats.StatsItem;
-import org.apache.rocketmq.common.stats.StatsItemSet;
public class BrokerStatsManager {
@@ -281,11 +281,11 @@ public class BrokerStatsManager {
private void initScheduleService() {
this.scheduledExecutorService =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
this.commercialExecutor =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
this.accountExecutor =
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
}
public MomentStatsItemSet getMomentStatsItemSetFallSize() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 181f7087a..0d50de65a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -35,7 +35,6 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -54,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue;
@@ -174,11 +174,11 @@ public class TimerMessageStore {
this.lastBrokerRole = storeConfig.getBrokerRole();
if (messageStore instanceof DefaultMessageStore) {
- scheduler = Executors.newSingleThreadScheduledExecutor(
+ scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("TimerScheduledThread",
((DefaultMessageStore) messageStore).getBrokerIdentity()));
} else {
- scheduler = Executors.newSingleThreadScheduledExecutor(
+ scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
new ThreadFactoryImpl("TimerScheduledThread"));
}
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
index f3d105bc6..080b7e385 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.annotation.Generated;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 6dd0e8846..65d586f43 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -20,10 +20,10 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.utils.ThreadUtils;
public class TieredStoreExecutor {
@@ -43,20 +43,20 @@ public class TieredStoreExecutor {
public static ExecutorService compactIndexFileExecutor;
public static void init() {
- commonScheduledExecutor = new ScheduledThreadPoolExecutor(
+ commonScheduledExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryImpl("TieredCommonExecutor_"));
- commitExecutor = new ScheduledThreadPoolExecutor(
+ commitExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
new ThreadFactoryImpl("TieredCommitExecutor_"));
- cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
+ cleanExpiredFileExecutor = ThreadUtils.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryImpl("TieredCleanFileExecutor_"));
dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- dispatchExecutor = new ThreadPoolExecutor(
+ dispatchExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(2, Runtime.getRuntime().availableProcessors()),
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
1000 * 60,
@@ -66,7 +66,7 @@ public class TieredStoreExecutor {
new ThreadPoolExecutor.DiscardOldestPolicy());
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- fetchDataExecutor = new ThreadPoolExecutor(
+ fetchDataExecutor = ThreadUtils.newThreadPoolExecutor(
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
Math.max(64, Runtime.getRuntime().availableProcessors() * 8),
1000 * 60,
@@ -75,7 +75,7 @@ public class TieredStoreExecutor {
new ThreadFactoryImpl("TieredFetchExecutor_"));
compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
- compactIndexFileExecutor = new ThreadPoolExecutor(
+ compactIndexFileExecutor = ThreadUtils.newThreadPoolExecutor(
1,
1,
1000 * 60,
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index fa3596d51..1ebff6d8a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
@@ -193,7 +194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
int threadPoolCoreSize = Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", "20"));
- this.threadPoolExecutor = new ThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
+ this.threadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
break;
case RUNNING:
--
2.32.0.windows.2
From dad6b4dadfec7a58e78a6715ec16c2eb6b17ff27 Mon Sep 17 00:00:00 2001
From: Ziyi Tan <ajb459684460@gmail.com>
Date: Mon, 11 Sep 2023 14:34:10 +0800
Subject: [PATCH 2/6] [ISSUE #7334] `registerIncrementBrokerData` for single
topic update (#7335)
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
---
.../broker/topic/TopicConfigManager.java | 30 +++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 4e3c1736c..754605438 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -290,7 +290,11 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true, true);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
return topicConfig;
@@ -394,7 +398,11 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true, true);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
return topicConfig;
@@ -435,7 +443,11 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true, true);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
return topicConfig;
@@ -461,7 +473,11 @@ public class TopicConfigManager extends ConfigManager {
dataVersion.nextVersion(stateMachineVersion);
this.persist();
- this.brokerController.registerBrokerAll(false, true, true);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
}
@@ -484,7 +500,11 @@ public class TopicConfigManager extends ConfigManager {
dataVersion.nextVersion(stateMachineVersion);
this.persist();
- this.brokerController.registerBrokerAll(false, true, true);
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
}
}
--
2.32.0.windows.2
From 0dbd0772b99f618f757d42cd64542b83e2100e4f Mon Sep 17 00:00:00 2001
From: Ziyi Tan <ajb459684460@gmail.com>
Date: Mon, 11 Sep 2023 15:48:07 +0800
Subject: [PATCH 3/6] [ISSUE #7326] Split the request to register to the
nameserver (#7325)
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
---
.../rocketmq/broker/BrokerController.java | 41 +++++++++++--------
.../broker/topic/TopicConfigManager.java | 21 ++++++++++
.../apache/rocketmq/common/BrokerConfig.java | 24 +++++++++++
.../test/route/CreateAndUpdateTopicIT.java | 31 ++++++++++++++
4 files changed, 99 insertions(+), 18 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 275b64b1a..9e49f636d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1765,29 +1765,34 @@ public class BrokerController {
}
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
+ ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
+ ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
- TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
-
- topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
- topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
-
- topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
- entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
- ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
-
- if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
- ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
- for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
- TopicConfig tmp =
+ for (TopicConfig topicConfig : topicConfigMap.values()) {
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ topicConfigTable.put(topicConfig.getTopicName(),
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
- topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
- topicConfigTable.put(topicConfig.getTopicName(), tmp);
+ topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
+ } else {
+ topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+
+ if (this.brokerConfig.isEnableSplitRegistration()
+ && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
+ doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+ topicConfigTable.clear();
}
- topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
- if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
+ .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
+ buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
+ if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 754605438..8537929be 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -47,7 +48,9 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -609,6 +612,24 @@ public class TopicConfigManager extends ConfigManager {
return topicConfigSerializeWrapper;
}
+ public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) {
+ return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
+ }
+
+ public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
+ final ConcurrentMap<String, TopicConfig> topicConfigTable,
+ final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
+ ) {
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
+ topicConfigWrapper.setTopicConfigTable(topicConfigTable);
+ topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
+ topicConfigWrapper.setDataVersion(this.getDataVersion());
+ if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) {
+ this.getDataVersion().nextVersion();
+ }
+ return topicConfigWrapper;
+ }
+
@Override
public String encode() {
return encode(false);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 45d26b29c..0d248c4e1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity {
private boolean enableMixedMessageType = false;
+ /**
+ * This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time,
+ * otherwise there will be a loss of routing
+ */
+ private boolean enableSplitRegistration = false;
+
+ private int splitRegistrationSize = 800;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1731,4 +1739,20 @@ public class BrokerConfig extends BrokerIdentity {
public void setEnableMixedMessageType(boolean enableMixedMessageType) {
this.enableMixedMessageType = enableMixedMessageType;
}
+
+ public boolean isEnableSplitRegistration() {
+ return enableSplitRegistration;
+ }
+
+ public void setEnableSplitRegistration(boolean enableSplitRegistration) {
+ this.enableSplitRegistration = enableSplitRegistration;
+ }
+
+ public int getSplitRegistrationSize() {
+ return splitRegistrationSize;
+ }
+
+ public void setSplitRegistrationSize(int splitRegistrationSize) {
+ this.splitRegistrationSize = splitRegistrationSize;
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
index 7e3c7b871..2370e68c0 100644
--- a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.route;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
@@ -111,4 +112,34 @@ public class CreateAndUpdateTopicIT extends BaseConf {
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
}
+
+ @Test
+ public void testCreateOrUpdateTopic_EnableSplitRegistration() {
+ brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
+ brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
+ brokerController3.getBrokerConfig().setEnableSplitRegistration(true);
+
+ String testTopic = "test-topic-";
+
+ for (int i = 0; i < 1000; i++) {
+ TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
+ brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
+ brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
+ brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
+ }
+
+ brokerController1.registerBrokerAll(false, true, true);
+ brokerController2.registerBrokerAll(false, true, true);
+ brokerController3.registerBrokerAll(false, true, true);
+
+ for (int i = 0; i < 1000; i++) {
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+ }
+
+ brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
+ brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
+ brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
+ }
}
--
2.32.0.windows.2
From a9e353285cea762b0c5eab567bdfa8e5c8c2d279 Mon Sep 17 00:00:00 2001
From: rongtong <jinrongtong5@163.com>
Date: Mon, 11 Sep 2023 15:55:18 +0800
Subject: [PATCH 4/6] Add the configuration of topicQueueLock number to better
support different scenarios (#7317)
---
.../main/java/org/apache/rocketmq/store/CommitLog.java | 2 +-
.../java/org/apache/rocketmq/store/TopicQueueLock.java | 8 ++++++++
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++++
3 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index e6ee3bacc..456bf2b86 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -122,7 +122,7 @@ public class CommitLog implements Swappable {
this.flushDiskWatcher = new FlushDiskWatcher();
- this.topicQueueLock = new TopicQueueLock();
+ this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
index a78eeed23..5a131b5c3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
@@ -34,6 +34,14 @@ public class TopicQueueLock {
}
}
+ public TopicQueueLock(int size) {
+ this.size = size;
+ this.lockList = new ArrayList<>(size);
+ for (int i = 0; i < this.size; i++) {
+ this.lockList.add(new ReentrantLock());
+ }
+ }
+
public void lock(String topicQueueKey) {
Lock lock = this.lockList.get((topicQueueKey.hashCode() & 0x7fffffff) % this.size);
lock.lock();
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index efb728ac0..9fa448043 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -401,6 +401,8 @@ public class MessageStoreConfig {
private long memTableFlushInterval = 60 * 60 * 1000L;
private boolean enableRocksDBLog = false;
+ private int topicQueueLockNum = 32;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -1751,4 +1753,12 @@ public class MessageStoreConfig {
public void setEnableRocksDBLog(boolean enableRocksDBLog) {
this.enableRocksDBLog = enableRocksDBLog;
}
+
+ public int getTopicQueueLockNum() {
+ return topicQueueLockNum;
+ }
+
+ public void setTopicQueueLockNum(int topicQueueLockNum) {
+ this.topicQueueLockNum = topicQueueLockNum;
+ }
}
--
2.32.0.windows.2
From 57f04c95d3a2ba6b91583058a6e4eda209f72d6e Mon Sep 17 00:00:00 2001
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
Date: Mon, 11 Sep 2023 18:23:25 +0800
Subject: [PATCH 5/6] [ISSUE #7343] Rollback modifications to registerProcessor
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
---
.../java/org/apache/rocketmq/broker/BrokerController.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9e49f636d..13a3feb4e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -827,6 +827,8 @@ public class BrokerController {
initializeResources();
+ registerProcessor();
+
initializeScheduledTasks();
initialTransaction();
@@ -1687,8 +1689,6 @@ public class BrokerController {
}
}
}, 10, 5, TimeUnit.SECONDS);
-
- registerProcessor();
}
protected void scheduleSendHeartbeat() {
--
2.32.0.windows.2
From dad6ad09d13dadc36b6342671c77f619bbb8c522 Mon Sep 17 00:00:00 2001
From: Ao Qiao <qiao_ao@foxmail.com>
Date: Tue, 12 Sep 2023 08:28:45 +0800
Subject: [PATCH 6/6] [ISSUE #7340] Abstract Duplicate code into a method in
`TopicConfigManager` (#7341)
---
.../broker/topic/TopicConfigManager.java | 44 ++++++-------------
1 file changed, 14 insertions(+), 30 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 8537929be..511d29e12 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -293,11 +293,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
- this.brokerController.registerSingleTopicAll(topicConfig);
- } else {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
- }
+ registerBrokerData(topicConfig);
}
return topicConfig;
@@ -337,11 +333,7 @@ public class TopicConfigManager extends ConfigManager {
log.error("createTopicIfAbsent ", e);
}
if (createNew && register) {
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
- this.brokerController.registerSingleTopicAll(topicConfig);
- } else {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
- }
+ registerBrokerData(topicConfig);
}
return getTopicConfig(topicConfig.getTopicName());
}
@@ -401,11 +393,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
- this.brokerController.registerSingleTopicAll(topicConfig);
- } else {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
- }
+ registerBrokerData(topicConfig);
}
return topicConfig;
@@ -446,11 +434,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
- this.brokerController.registerSingleTopicAll(topicConfig);
- } else {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
- }
+ registerBrokerData(topicConfig);
}
return topicConfig;
@@ -476,11 +460,7 @@ public class TopicConfigManager extends ConfigManager {
dataVersion.nextVersion(stateMachineVersion);
this.persist();
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
- this.brokerController.registerSingleTopicAll(topicConfig);
- } else {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
- }
+ registerBrokerData(topicConfig);
}
}
@@ -503,11 +483,7 @@ public class TopicConfigManager extends ConfigManager {
dataVersion.nextVersion(stateMachineVersion);
this.persist();
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
- this.brokerController.registerSingleTopicAll(topicConfig);
- } else {
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
- }
+ registerBrokerData(topicConfig);
}
}
@@ -699,6 +675,14 @@ public class TopicConfigManager extends ConfigManager {
}
}
+ private void registerBrokerData(TopicConfig topicConfig) {
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
+ }
+ }
+
public boolean containsTopic(String topic) {
return topicConfigTable.containsKey(topic);
}
--
2.32.0.windows.2