2244 lines
122 KiB
Diff
2244 lines
122 KiB
Diff
From c100d815d754d7cb330bc63e145bafd2d9b59cb1 Mon Sep 17 00:00:00 2001
|
|
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
|
Date: Mon, 11 Sep 2023 10:13:56 +0800
|
|
Subject: [PATCH 1/6] [ISSUE #7328] Convergent thread pool creation (#7329)
|
|
|
|
* Convergence thread pool creation to facilitate subsequent iteration management
|
|
|
|
* Convergence thread pool creation in ThreadPoolMonitor.java
|
|
|
|
* fix unit test
|
|
|
|
* Convergence ThreadPool constructor
|
|
|
|
* Convergence ScheduledThreadPool constructor
|
|
|
|
* remove unused import
|
|
|
|
* Convergence ScheduledThreadPool constructor
|
|
|
|
* remove unused import
|
|
|
|
---------
|
|
---
|
|
.../rocketmq/broker/BrokerController.java | 39 +++++-----
|
|
.../client/ClientHousekeepingService.java | 4 +-
|
|
.../DefaultConsumerIdsChangeListener.java | 3 +-
|
|
.../broker/controller/ReplicasManager.java | 9 +--
|
|
.../dledger/DLedgerRoleChangeHandler.java | 4 +-
|
|
.../broker/failover/EscapeBridge.java | 4 +-
|
|
.../broker/latency/BrokerFastFailure.java | 5 +-
|
|
.../BrokerFixedThreadPoolExecutor.java | 57 --------------
|
|
.../broker/latency/FutureTaskExt.java | 39 ----------
|
|
.../rocketmq/broker/out/BrokerOuterAPI.java | 7 +-
|
|
.../schedule/ScheduleMessageService.java | 7 +-
|
|
.../broker/topic/TopicRouteInfoManager.java | 4 +-
|
|
...ractTransactionalMessageCheckListener.java | 4 +-
|
|
.../rocketmq/broker/BrokerControllerTest.java | 2 +-
|
|
.../broker/latency/BrokerFastFailureTest.java | 1 +
|
|
.../common/config/AbstractRocksDBStorage.java | 6 +-
|
|
.../FutureTaskExtThreadPoolExecutor.java | 3 +-
|
|
.../common/thread/ThreadPoolMonitor.java | 6 +-
|
|
.../rocketmq/common/utils/ThreadUtils.java | 74 ++++++++++++++++---
|
|
.../rocketmq/container/BrokerContainer.java | 6 +-
|
|
.../controller/ControllerManager.java | 14 +---
|
|
.../controller/impl/DLedgerController.java | 10 +--
|
|
.../DefaultBrokerHeartbeatManager.java | 3 +-
|
|
.../rocketmq/namesrv/NamesrvController.java | 22 ++----
|
|
.../grpc/v2/channel/GrpcChannelManager.java | 6 +-
|
|
.../remoting/RemotingProtocolServer.java | 4 +-
|
|
.../proxy/service/ClusterServiceManager.java | 12 +--
|
|
.../proxy/service/LocalServiceManager.java | 4 +-
|
|
.../receipt/DefaultReceiptHandleManager.java | 8 +-
|
|
.../service/route/TopicRouteService.java | 9 +--
|
|
.../remoting/netty/NettyRemotingClient.java | 4 +-
|
|
.../remoting/netty/NettyRemotingServer.java | 4 +-
|
|
.../rocketmq/store/DefaultMessageStore.java | 8 +-
|
|
.../ha/autoswitch/AutoSwitchHAService.java | 38 +++++-----
|
|
.../rocketmq/store/kv/CompactionStore.java | 21 +++---
|
|
.../store/queue/ConsumeQueueStore.java | 4 +-
|
|
.../store/stats/BrokerStatsManager.java | 14 ++--
|
|
.../store/timer/TimerMessageStore.java | 6 +-
|
|
.../apache/rocketmq/test/util/StatUtil.java | 1 -
|
|
.../common/TieredStoreExecutor.java | 14 ++--
|
|
.../tools/admin/DefaultMQAdminExtImpl.java | 3 +-
|
|
41 files changed, 215 insertions(+), 278 deletions(-)
|
|
delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
|
|
delete mode 100644 broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
index 6aba70cb2..275b64b1a 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
@@ -34,7 +34,6 @@ import org.apache.rocketmq.broker.failover.EscapeBridge;
|
|
import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
|
|
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
|
|
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
|
|
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
|
|
import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
|
|
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
|
|
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
|
|
@@ -98,6 +97,7 @@ import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.common.stats.MomentStatsItem;
|
|
import org.apache.rocketmq.common.utils.ServiceProvider;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.Configuration;
|
|
@@ -160,7 +160,6 @@ import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
@@ -455,10 +454,10 @@ public class BrokerController {
|
|
* Initialize resources including remoting server and thread executors.
|
|
*/
|
|
protected void initializeResources() {
|
|
- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));
|
|
|
|
- this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getSendMessageThreadPoolNums(),
|
|
this.brokerConfig.getSendMessageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -466,7 +465,7 @@ public class BrokerController {
|
|
this.sendThreadPoolQueue,
|
|
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getPullMessageThreadPoolNums(),
|
|
this.brokerConfig.getPullMessageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -474,7 +473,7 @@ public class BrokerController {
|
|
this.pullThreadPoolQueue,
|
|
new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.litePullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getLitePullMessageThreadPoolNums(),
|
|
this.brokerConfig.getLitePullMessageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -482,7 +481,7 @@ public class BrokerController {
|
|
this.litePullThreadPoolQueue,
|
|
new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.putMessageFutureExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
|
|
this.brokerConfig.getPutMessageFutureThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -490,7 +489,7 @@ public class BrokerController {
|
|
this.putThreadPoolQueue,
|
|
new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.ackMessageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getAckMessageThreadPoolNums(),
|
|
this.brokerConfig.getAckMessageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -498,7 +497,7 @@ public class BrokerController {
|
|
this.ackThreadPoolQueue,
|
|
new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.queryMessageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getQueryMessageThreadPoolNums(),
|
|
this.brokerConfig.getQueryMessageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -506,7 +505,7 @@ public class BrokerController {
|
|
this.queryThreadPoolQueue,
|
|
new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.adminBrokerExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getAdminBrokerThreadPoolNums(),
|
|
this.brokerConfig.getAdminBrokerThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -514,7 +513,7 @@ public class BrokerController {
|
|
this.adminBrokerThreadPoolQueue,
|
|
new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));
|
|
|
|
- this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.clientManageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getClientManageThreadPoolNums(),
|
|
this.brokerConfig.getClientManageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -522,7 +521,7 @@ public class BrokerController {
|
|
this.clientManagerThreadPoolQueue,
|
|
new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));
|
|
|
|
- this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getHeartbeatThreadPoolNums(),
|
|
this.brokerConfig.getHeartbeatThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -530,7 +529,7 @@ public class BrokerController {
|
|
this.heartbeatThreadPoolQueue,
|
|
new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));
|
|
|
|
- this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getConsumerManageThreadPoolNums(),
|
|
this.brokerConfig.getConsumerManageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -538,7 +537,7 @@ public class BrokerController {
|
|
this.consumerManagerThreadPoolQueue,
|
|
new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));
|
|
|
|
- this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.replyMessageExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
|
|
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -546,7 +545,7 @@ public class BrokerController {
|
|
this.replyThreadPoolQueue,
|
|
new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));
|
|
|
|
- this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.endTransactionExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getEndTransactionThreadPoolNums(),
|
|
this.brokerConfig.getEndTransactionThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -554,7 +553,7 @@ public class BrokerController {
|
|
this.endTransactionThreadPoolQueue,
|
|
new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));
|
|
|
|
- this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
|
|
+ this.loadBalanceExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
|
|
this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
|
|
1000 * 60,
|
|
@@ -562,9 +561,9 @@ public class BrokerController {
|
|
this.loadBalanceThreadPoolQueue,
|
|
new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));
|
|
|
|
- this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ this.syncBrokerMemberGroupExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
|
|
- this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ this.brokerHeartbeatExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("BrokerControllerHeartbeatScheduledThread", getBrokerIdentity()));
|
|
|
|
this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
|
|
@@ -828,8 +827,6 @@ public class BrokerController {
|
|
|
|
initializeResources();
|
|
|
|
- registerProcessor();
|
|
-
|
|
initializeScheduledTasks();
|
|
|
|
initialTransaction();
|
|
@@ -1690,6 +1687,8 @@ public class BrokerController {
|
|
}
|
|
}
|
|
}, 10, 5, TimeUnit.SECONDS);
|
|
+
|
|
+ registerProcessor();
|
|
}
|
|
|
|
protected void scheduleSendHeartbeat() {
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
|
|
index 98e5f450f..cbb81f632 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
|
|
@@ -18,11 +18,11 @@ package org.apache.rocketmq.broker.client;
|
|
|
|
import io.netty.channel.Channel;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.ChannelEventListener;
|
|
@@ -35,7 +35,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
|
|
|
|
public ClientHousekeepingService(final BrokerController brokerController) {
|
|
this.brokerController = brokerController;
|
|
- scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("ClientHousekeepingScheduledThread", brokerController.getBrokerIdentity()));
|
|
}
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
|
|
index 2ce036a0f..d17a2a547 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
|
|
@@ -22,7 +22,6 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
import org.apache.rocketmq.common.AbstractBrokerRunnable;
|
|
@@ -37,7 +36,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
|
|
private final BrokerController brokerController;
|
|
private final int cacheSize = 8096;
|
|
|
|
- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
ThreadUtils.newGenericThreadFactory("DefaultConsumerIdsChangeListener", true));
|
|
|
|
private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new ConcurrentHashMap<>(cacheSize);
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
|
|
index 37c82e434..a989e6e68 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
|
|
@@ -27,10 +27,8 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
@@ -42,6 +40,7 @@ import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.protocol.EpochEntry;
|
|
@@ -107,9 +106,9 @@ public class ReplicasManager {
|
|
public ReplicasManager(final BrokerController brokerController) {
|
|
this.brokerController = brokerController;
|
|
this.brokerOuterAPI = brokerController.getBrokerOuterAPI();
|
|
- this.scheduledService = Executors.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
|
|
- this.executorService = Executors.newFixedThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
|
|
- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
|
|
+ this.scheduledService = ThreadUtils.newScheduledThreadPool(3, new ThreadFactoryImpl("ReplicasManager_ScheduledService_", brokerController.getBrokerIdentity()));
|
|
+ this.executorService = ThreadUtils.newThreadPoolExecutor(3, new ThreadFactoryImpl("ReplicasManager_ExecutorService_", brokerController.getBrokerIdentity()));
|
|
+ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
|
|
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("ReplicasManager_scan_thread_", brokerController.getBrokerIdentity()));
|
|
this.haService = (AutoSwitchHAService) brokerController.getMessageStore().getHaService();
|
|
this.brokerConfig = brokerController.getBrokerConfig();
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
|
|
index 75023ee1b..e6cb97640 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/dledger/DLedgerRoleChangeHandler.java
|
|
@@ -21,12 +21,12 @@ import io.openmessaging.storage.dledger.DLedgerServer;
|
|
import io.openmessaging.storage.dledger.MemberState;
|
|
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
|
|
import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.DefaultMessageStore;
|
|
@@ -49,7 +49,7 @@ public class DLedgerRoleChangeHandler implements DLedgerLeaderElector.RoleChange
|
|
this.messageStore = messageStore;
|
|
this.dLedgerCommitLog = (DLedgerCommitLog) messageStore.getCommitLog();
|
|
this.dLegerServer = dLedgerCommitLog.getdLedgerServer();
|
|
- this.executorService = Executors.newSingleThreadExecutor(
|
|
+ this.executorService = ThreadUtils.newSingleThreadExecutor(
|
|
new ThreadFactoryImpl("DLegerRoleChangeHandler_", brokerController.getBrokerIdentity()));
|
|
}
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
|
|
index 7c350fc1d..6a0817480 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
|
|
@@ -24,7 +24,6 @@ import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
@@ -43,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
|
@@ -72,7 +72,7 @@ public class EscapeBridge {
|
|
public void start() throws Exception {
|
|
if (brokerController.getBrokerConfig().isEnableSlaveActingMaster() && brokerController.getBrokerConfig().isEnableRemoteEscape()) {
|
|
final BlockingQueue<Runnable> asyncSenderThreadPoolQueue = new LinkedBlockingQueue<>(50000);
|
|
- this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
|
|
+ this.defaultAsyncSenderExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
Runtime.getRuntime().availableProcessors(),
|
|
Runtime.getRuntime().availableProcessors(),
|
|
1000 * 60,
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
|
|
index d3d0bc8ba..3b6e9dc67 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
|
|
@@ -18,13 +18,14 @@ package org.apache.rocketmq.broker.latency;
|
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
import org.apache.rocketmq.common.AbstractBrokerRunnable;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.UtilAll;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.netty.RequestTask;
|
|
@@ -43,7 +44,7 @@ public class BrokerFastFailure {
|
|
|
|
public BrokerFastFailure(final BrokerController brokerController) {
|
|
this.brokerController = brokerController;
|
|
- this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
|
|
brokerController == null ? null : brokerController.getBrokerConfig()));
|
|
}
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
|
|
deleted file mode 100644
|
|
index d2d1143a3..000000000
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
|
|
+++ /dev/null
|
|
@@ -1,57 +0,0 @@
|
|
-/*
|
|
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
- * contributor license agreements. See the NOTICE file distributed with
|
|
- * this work for additional information regarding copyright ownership.
|
|
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
- * (the "License"); you may not use this file except in compliance with
|
|
- * the License. You may obtain a copy of the License at
|
|
- *
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
- *
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
- * See the License for the specific language governing permissions and
|
|
- * limitations under the License.
|
|
- */
|
|
-
|
|
-package org.apache.rocketmq.broker.latency;
|
|
-
|
|
-import java.util.concurrent.BlockingQueue;
|
|
-import java.util.concurrent.RejectedExecutionHandler;
|
|
-import java.util.concurrent.RunnableFuture;
|
|
-import java.util.concurrent.ThreadFactory;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
-import java.util.concurrent.TimeUnit;
|
|
-
|
|
-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
|
|
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
|
|
- final TimeUnit unit,
|
|
- final BlockingQueue<Runnable> workQueue) {
|
|
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
|
- }
|
|
-
|
|
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
|
|
- final TimeUnit unit,
|
|
- final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
|
|
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
|
- }
|
|
-
|
|
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
|
|
- final TimeUnit unit,
|
|
- final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
|
|
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
|
|
- }
|
|
-
|
|
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
|
|
- final TimeUnit unit,
|
|
- final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory,
|
|
- final RejectedExecutionHandler handler) {
|
|
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
|
- }
|
|
-
|
|
- @Override
|
|
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
|
|
- return new FutureTaskExt<>(runnable, value);
|
|
- }
|
|
-}
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
|
|
deleted file mode 100644
|
|
index f132efaeb..000000000
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
|
|
+++ /dev/null
|
|
@@ -1,39 +0,0 @@
|
|
-/*
|
|
- * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
- * contributor license agreements. See the NOTICE file distributed with
|
|
- * this work for additional information regarding copyright ownership.
|
|
- * The ASF licenses this file to You under the Apache License, Version 2.0
|
|
- * (the "License"); you may not use this file except in compliance with
|
|
- * the License. You may obtain a copy of the License at
|
|
- *
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
- *
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
- * See the License for the specific language governing permissions and
|
|
- * limitations under the License.
|
|
- */
|
|
-
|
|
-package org.apache.rocketmq.broker.latency;
|
|
-
|
|
-import java.util.concurrent.Callable;
|
|
-import java.util.concurrent.FutureTask;
|
|
-
|
|
-public class FutureTaskExt<V> extends FutureTask<V> {
|
|
- private final Runnable runnable;
|
|
-
|
|
- public FutureTaskExt(final Callable<V> callable) {
|
|
- super(callable);
|
|
- this.runnable = null;
|
|
- }
|
|
-
|
|
- public FutureTaskExt(final Runnable runnable, final V result) {
|
|
- super(runnable, result);
|
|
- this.runnable = runnable;
|
|
- }
|
|
-
|
|
- public Runnable getRunnable() {
|
|
- return runnable;
|
|
- }
|
|
-}
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
|
|
index ae81e8b11..9dfb8127d 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
|
|
@@ -27,9 +27,9 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.CountDownLatch;
|
|
+import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
-import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
|
|
import org.apache.rocketmq.client.consumer.PullResult;
|
|
import org.apache.rocketmq.client.consumer.PullStatus;
|
|
import org.apache.rocketmq.client.exception.MQBrokerException;
|
|
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.DefaultTopAddressing;
|
|
import org.apache.rocketmq.common.namesrv.TopAddressing;
|
|
import org.apache.rocketmq.common.sysflag.PullSysFlag;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.InvokeCallback;
|
|
@@ -144,7 +145,7 @@ public class BrokerOuterAPI {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
|
|
private final RemotingClient remotingClient;
|
|
private final TopAddressing topAddressing = new DefaultTopAddressing(MixAll.getWSAddr());
|
|
- private final BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
|
|
+ private final ExecutorService brokerOuterExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
|
|
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
|
|
private final ClientMetadata clientMetadata;
|
|
private final RpcClient rpcClient;
|
|
@@ -1092,7 +1093,7 @@ public class BrokerOuterAPI {
|
|
throw new MQBrokerException(response.getCode(), response.getRemark());
|
|
}
|
|
|
|
- public BrokerFixedThreadPoolExecutor getBrokerOuterExecutor() {
|
|
+ public ExecutorService getBrokerOuterExecutor() {
|
|
return brokerOuterExecutor;
|
|
}
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
|
index 297b14207..0c2e6507b 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
|
|
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -91,7 +90,7 @@ public class ScheduleMessageService extends ConfigManager {
|
|
public ScheduleMessageService(final BrokerController brokerController) {
|
|
this.brokerController = brokerController;
|
|
this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
|
|
- scheduledPersistService = new ScheduledThreadPoolExecutor(1,
|
|
+ scheduledPersistService = ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
|
|
}
|
|
|
|
@@ -134,9 +133,9 @@ public class ScheduleMessageService extends ConfigManager {
|
|
public void start() {
|
|
if (started.compareAndSet(false, true)) {
|
|
this.load();
|
|
- this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
|
|
+ this.deliverExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
|
|
if (this.enableAsyncDeliver) {
|
|
- this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
|
|
+ this.handleExecutorService = ThreadUtils.newScheduledThreadPool(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
|
|
}
|
|
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
|
|
Integer level = entry.getKey();
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
|
|
index b35564725..11bde5f5f 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
|
|
@@ -23,7 +23,6 @@ import java.util.Objects;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.locks.Lock;
|
|
@@ -36,6 +35,7 @@ import org.apache.rocketmq.common.MixAll;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
|
@@ -66,7 +66,7 @@ public class TopicRouteInfoManager {
|
|
}
|
|
|
|
public void start() {
|
|
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
|
|
+ this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("TopicRouteInfoManagerScheduledThread"));
|
|
|
|
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
|
|
try {
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
|
|
index 771d84300..982355d78 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
|
|
@@ -19,7 +19,6 @@ package org.apache.rocketmq.broker.transaction;
|
|
import io.netty.channel.Channel;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
@@ -27,6 +26,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.message.MessageConst;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
|
|
@@ -97,7 +97,7 @@ public abstract class AbstractTransactionalMessageCheckListener {
|
|
|
|
public synchronized void initExecutorService() {
|
|
if (executorService == null) {
|
|
- executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
|
|
+ executorService = ThreadUtils.newThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
|
|
new ThreadFactoryImpl("Transaction-msg-check-thread", brokerController.getBrokerIdentity()), new CallerRunsPolicy());
|
|
}
|
|
}
|
|
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
|
|
index 75ad961ce..6035a20ac 100644
|
|
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
|
|
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
|
|
@@ -23,9 +23,9 @@ import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
-import org.apache.rocketmq.broker.latency.FutureTaskExt;
|
|
import org.apache.rocketmq.common.BrokerConfig;
|
|
import org.apache.rocketmq.common.UtilAll;
|
|
+import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
|
|
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
|
|
import org.apache.rocketmq.remoting.netty.RequestTask;
|
|
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
|
|
index 5d0f7f9d7..31b547cf1 100644
|
|
--- a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
|
|
+++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
|
|
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.latency;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.TimeUnit;
|
|
+import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
import org.apache.rocketmq.remoting.netty.RequestTask;
|
|
import org.junit.Test;
|
|
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
|
|
index a720a5be3..6f19a9815 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
|
|
@@ -23,7 +23,6 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.Semaphore;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -33,6 +32,7 @@ import com.google.common.collect.Maps;
|
|
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.rocksdb.ColumnFamilyDescriptor;
|
|
@@ -82,8 +82,8 @@ public abstract class AbstractRocksDBStorage {
|
|
private volatile boolean closed;
|
|
|
|
private final Semaphore reloadPermit = new Semaphore(1);
|
|
- private final ScheduledExecutorService reloadScheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("RocksDBStorageReloadService_"));
|
|
- private final ThreadPoolExecutor manualCompactionThread = new ThreadPoolExecutor(
|
|
+ private final ScheduledExecutorService reloadScheduler = ThreadUtils.newScheduledThreadPool(1, new ThreadFactoryImpl("RocksDBStorageReloadService_"));
|
|
+ private final ThreadPoolExecutor manualCompactionThread = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
|
|
1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
|
|
new ArrayBlockingQueue(1),
|
|
new ThreadFactoryImpl("RocksDBManualCompactionService_"),
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
|
|
index 411da9221..7b68873a9 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/thread/FutureTaskExtThreadPoolExecutor.java
|
|
@@ -29,7 +29,8 @@ public class FutureTaskExtThreadPoolExecutor extends ThreadPoolExecutor {
|
|
|
|
public FutureTaskExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
|
|
TimeUnit unit,
|
|
- BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
|
|
+ BlockingQueue<Runnable> workQueue,
|
|
+ ThreadFactory threadFactory,
|
|
RejectedExecutionHandler handler) {
|
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
|
}
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
|
|
index 49d97a5d7..1bfabbffe 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
|
|
@@ -22,12 +22,12 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.UtilAll;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
|
|
@@ -36,7 +36,7 @@ public class ThreadPoolMonitor {
|
|
private static Logger waterMarkLogger = LoggerFactory.getLogger(ThreadPoolMonitor.class);
|
|
|
|
private static final List<ThreadPoolWrapper> MONITOR_EXECUTOR = new CopyOnWriteArrayList<>();
|
|
- private static final ScheduledExecutorService MONITOR_SCHEDULED = Executors.newSingleThreadScheduledExecutor(
|
|
+ private static final ScheduledExecutorService MONITOR_SCHEDULED = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryBuilder().setNameFormat("ThreadPoolMonitor-%d").build()
|
|
);
|
|
|
|
@@ -81,7 +81,7 @@ public class ThreadPoolMonitor {
|
|
String name,
|
|
int queueCapacity,
|
|
List<ThreadPoolStatusMonitor> threadPoolStatusMonitors) {
|
|
- ThreadPoolExecutor executor = new FutureTaskExtThreadPoolExecutor(
|
|
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
|
|
corePoolSize,
|
|
maximumPoolSize,
|
|
keepAliveTime,
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
|
|
index 4b366d4e3..1644c6360 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ThreadUtils.java
|
|
@@ -20,38 +20,94 @@ package org.apache.rocketmq.common.utils;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
+import java.util.concurrent.RejectedExecutionHandler;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.thread.FutureTaskExtThreadPoolExecutor;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
|
|
public final class ThreadUtils {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TOOLS_LOGGER_NAME);
|
|
|
|
- public static ExecutorService newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
|
|
- TimeUnit unit, BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) {
|
|
- return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
|
|
+ public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
|
|
+ return ThreadUtils.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
|
|
}
|
|
|
|
- public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
|
|
- return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon));
|
|
+ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
|
|
+ return ThreadUtils.newThreadPoolExecutor(1, threadFactory);
|
|
+ }
|
|
+
|
|
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
|
|
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, corePoolSize,
|
|
+ 0L, TimeUnit.MILLISECONDS,
|
|
+ new LinkedBlockingQueue<>(),
|
|
+ threadFactory);
|
|
+ }
|
|
+
|
|
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
|
|
+ int maximumPoolSize,
|
|
+ long keepAliveTime,
|
|
+ TimeUnit unit, BlockingQueue<Runnable> workQueue,
|
|
+ String processName,
|
|
+ boolean isDaemon) {
|
|
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon));
|
|
+ }
|
|
+
|
|
+ public static ExecutorService newThreadPoolExecutor(final int corePoolSize,
|
|
+ final int maximumPoolSize,
|
|
+ final long keepAliveTime,
|
|
+ final TimeUnit unit,
|
|
+ final BlockingQueue<Runnable> workQueue,
|
|
+ final ThreadFactory threadFactory) {
|
|
+ return ThreadUtils.newThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
|
|
+ }
|
|
+
|
|
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
|
|
+ int maximumPoolSize,
|
|
+ long keepAliveTime,
|
|
+ TimeUnit unit,
|
|
+ BlockingQueue<Runnable> workQueue,
|
|
+ ThreadFactory threadFactory,
|
|
+ RejectedExecutionHandler handler) {
|
|
+ return new FutureTaskExtThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
|
}
|
|
|
|
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
|
|
- return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon));
|
|
+ return ThreadUtils.newScheduledThreadPool(1, processName, isDaemon);
|
|
+ }
|
|
+
|
|
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
|
|
+ return ThreadUtils.newScheduledThreadPool(1, threadFactory);
|
|
+ }
|
|
+
|
|
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
|
|
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, Executors.defaultThreadFactory());
|
|
}
|
|
|
|
- public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
|
|
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, String processName,
|
|
boolean isDaemon) {
|
|
- return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon));
|
|
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, newThreadFactory(processName, isDaemon));
|
|
+ }
|
|
+
|
|
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
|
|
+ return ThreadUtils.newScheduledThreadPool(corePoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy());
|
|
+ }
|
|
+
|
|
+ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,
|
|
+ ThreadFactory threadFactory,
|
|
+ RejectedExecutionHandler handler) {
|
|
+ return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler);
|
|
}
|
|
|
|
public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) {
|
|
- return newGenericThreadFactory("Remoting-" + processName, isDaemon);
|
|
+ return newGenericThreadFactory("ThreadUtils-" + processName, isDaemon);
|
|
}
|
|
|
|
public static ThreadFactory newGenericThreadFactory(String processName) {
|
|
diff --git a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
|
|
index c6446f058..5b712bc30 100644
|
|
--- a/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
|
|
+++ b/container/src/main/java/org/apache/rocketmq/container/BrokerContainer.java
|
|
@@ -47,14 +47,12 @@ import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
public class BrokerContainer implements IBrokerContainer {
|
|
private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
|
|
|
|
- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new BasicThreadFactory.Builder()
|
|
.namingPattern("BrokerContainerScheduledThread")
|
|
.daemon(true)
|
|
@@ -143,7 +141,7 @@ public class BrokerContainer implements IBrokerContainer {
|
|
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.containerClientHouseKeepingService);
|
|
this.fastRemotingServer = this.remotingServer.newRemotingServer(this.nettyServerConfig.getListenPort() - 2);
|
|
|
|
- this.brokerContainerExecutor = new ThreadPoolExecutor(
|
|
+ this.brokerContainerExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
1,
|
|
1,
|
|
1000 * 60,
|
|
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
|
|
index 7c91e70da..3e6b0eba5 100644
|
|
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
|
|
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
|
|
@@ -25,8 +25,6 @@ import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
-import java.util.concurrent.RunnableFuture;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
@@ -34,8 +32,8 @@ import org.apache.rocketmq.common.ControllerConfig;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
-import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
|
|
import org.apache.rocketmq.controller.impl.DLedgerController;
|
|
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
|
|
@@ -93,18 +91,14 @@ public class ControllerManager {
|
|
|
|
public boolean initialize() {
|
|
this.controllerRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.controllerConfig.getControllerRequestThreadPoolQueueCapacity());
|
|
- this.controllerRequestExecutor = new ThreadPoolExecutor(
|
|
+ this.controllerRequestExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
this.controllerConfig.getControllerThreadPoolNums(),
|
|
this.controllerConfig.getControllerThreadPoolNums(),
|
|
1000 * 60,
|
|
TimeUnit.MILLISECONDS,
|
|
this.controllerRequestThreadPoolQueue,
|
|
- new ThreadFactoryImpl("ControllerRequestExecutorThread_")) {
|
|
- @Override
|
|
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
|
|
- return new FutureTaskExt<T>(runnable, value);
|
|
- }
|
|
- };
|
|
+ new ThreadFactoryImpl("ControllerRequestExecutorThread_"));
|
|
+
|
|
this.notifyService.initialize();
|
|
if (StringUtils.isEmpty(this.controllerConfig.getControllerDLegerPeers())) {
|
|
throw new IllegalArgumentException("Attribute value controllerDLegerPeers of ControllerConfig is null or empty");
|
|
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
|
|
index fa91f288e..33e4406e4 100644
|
|
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
|
|
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
|
|
@@ -32,7 +32,6 @@ import java.util.Map;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
@@ -44,6 +43,7 @@ import org.apache.rocketmq.common.ControllerConfig;
|
|
import org.apache.rocketmq.common.ServiceThread;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.controller.Controller;
|
|
import org.apache.rocketmq.controller.elect.ElectPolicy;
|
|
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
|
|
@@ -66,11 +66,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
import org.apache.rocketmq.remoting.protocol.ResponseCode;
|
|
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
|
|
-import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
|
|
-import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
|
|
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
|
|
+import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdRequestHeader;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
|
|
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
|
|
@@ -136,7 +136,7 @@ public class DLedgerController implements Controller {
|
|
this.dLedgerServer = new DLedgerServer(dLedgerConfig, nettyServerConfig, nettyClientConfig, channelEventListener);
|
|
this.dLedgerServer.registerStateMachine(this.statemachine);
|
|
this.dLedgerServer.getDLedgerLeaderElector().addRoleChangeHandler(this.roleHandler);
|
|
- this.scanInactiveMasterService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
|
|
+ this.scanInactiveMasterService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DLedgerController_scanInactiveService_"));
|
|
this.brokerLifecycleListeners = new ArrayList<>();
|
|
}
|
|
|
|
@@ -513,7 +513,7 @@ public class DLedgerController implements Controller {
|
|
class RoleChangeHandler implements DLedgerLeaderElector.RoleChangeHandler {
|
|
|
|
private final String selfId;
|
|
- private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
|
|
+ private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("DLedgerControllerRoleChangeHandler_"));
|
|
private volatile MemberState.Role currentRole = MemberState.Role.FOLLOWER;
|
|
|
|
public RoleChangeHandler(final String selfId) {
|
|
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
|
|
index 2fbddb9cd..6ebb2c994 100644
|
|
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
|
|
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
|
|
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.ControllerConfig;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
|
|
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
@@ -66,7 +67,7 @@ public class DefaultBrokerHeartbeatManager implements BrokerHeartbeatManager {
|
|
|
|
@Override
|
|
public void initialize() {
|
|
- this.scheduledService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
|
|
+ this.scheduledService = ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_scheduledService_"));
|
|
this.executor = Executors.newFixedThreadPool(2, new ThreadFactoryImpl("DefaultBrokerHeartbeatManager_executorService_"));
|
|
}
|
|
|
|
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
|
|
index 15c65ebec..be327cffa 100644
|
|
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
|
|
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
|
|
@@ -20,10 +20,7 @@ import java.util.Collections;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
-import java.util.concurrent.RunnableFuture;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
@@ -31,6 +28,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.namesrv.kvconfig.KVConfigManager;
|
|
@@ -62,10 +60,10 @@ public class NamesrvController {
|
|
private final NettyServerConfig nettyServerConfig;
|
|
private final NettyClientConfig nettyClientConfig;
|
|
|
|
- private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build());
|
|
|
|
- private final ScheduledExecutorService scanExecutorService = new ScheduledThreadPoolExecutor(1,
|
|
+ private final ScheduledExecutorService scanExecutorService = ThreadUtils.newScheduledThreadPool(1,
|
|
new BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build());
|
|
|
|
private final KVConfigManager kvConfigManager;
|
|
@@ -138,20 +136,10 @@ public class NamesrvController {
|
|
|
|
private void initiateThreadExecutors() {
|
|
this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
|
|
- this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {
|
|
- @Override
|
|
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
|
|
- return new FutureTaskExt<>(runnable, value);
|
|
- }
|
|
- };
|
|
+ this.defaultExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_"));
|
|
|
|
this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
|
|
- this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {
|
|
- @Override
|
|
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
|
|
- return new FutureTaskExt<>(runnable, value);
|
|
- }
|
|
- };
|
|
+ this.clientRequestExecutor = ThreadUtils.newThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_"));
|
|
}
|
|
|
|
private void initiateSslContext() {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
|
|
index 14330dd8d..a18cf7600 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
|
|
@@ -21,13 +21,13 @@ import java.util.Set;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
-import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
+import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
|
|
@@ -43,7 +43,7 @@ public class GrpcChannelManager implements StartAndShutdown {
|
|
protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
|
|
protected final ConcurrentMap<String /* nonce */, ResultFuture> resultNonceFutureMap = new ConcurrentHashMap<>();
|
|
|
|
- protected final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
|
|
+ protected final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryImpl("GrpcChannelManager_")
|
|
);
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
|
|
index bcc9edd09..fe07090d5 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
|
|
@@ -22,7 +22,6 @@ import io.netty.channel.Channel;
|
|
import java.util.List;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.CompletableFuture;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -33,6 +32,7 @@ import org.apache.rocketmq.common.future.FutureTaskExt;
|
|
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
|
|
import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
@@ -178,7 +178,7 @@ public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOu
|
|
new ThreadPoolHeadSlowTimeMillsMonitor(config.getRemotingWaitTimeMillsInDefaultQueue())
|
|
);
|
|
|
|
- this.timerExecutor = Executors.newSingleThreadScheduledExecutor(
|
|
+ this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
|
|
);
|
|
this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 10, TimeUnit.SECONDS);
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
|
|
index d2ddfc352..9786cec55 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
|
|
@@ -16,7 +16,6 @@
|
|
*/
|
|
package org.apache.rocketmq.proxy.service;
|
|
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.client.ClientChannelInfo;
|
|
@@ -27,23 +26,24 @@ import org.apache.rocketmq.broker.client.ProducerChangeListener;
|
|
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
|
|
import org.apache.rocketmq.broker.client.ProducerManager;
|
|
import org.apache.rocketmq.client.common.NameserverAccessConfig;
|
|
+import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
|
|
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
-import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
import org.apache.rocketmq.proxy.service.admin.AdminService;
|
|
import org.apache.rocketmq.proxy.service.admin.DefaultAdminService;
|
|
import org.apache.rocketmq.proxy.service.client.ClusterConsumerManager;
|
|
+import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
|
|
import org.apache.rocketmq.proxy.service.message.ClusterMessageService;
|
|
import org.apache.rocketmq.proxy.service.message.MessageService;
|
|
import org.apache.rocketmq.proxy.service.metadata.ClusterMetadataService;
|
|
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
|
|
-import org.apache.rocketmq.client.impl.mqclient.DoNothingClientRemotingProcessor;
|
|
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
|
|
-import org.apache.rocketmq.proxy.service.client.ProxyClientRemotingProcessor;
|
|
import org.apache.rocketmq.proxy.service.relay.ClusterProxyRelayService;
|
|
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
|
|
import org.apache.rocketmq.proxy.service.route.ClusterTopicRouteService;
|
|
@@ -73,7 +73,7 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
|
|
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
|
|
NameserverAccessConfig nameserverAccessConfig = new NameserverAccessConfig(proxyConfig.getNamesrvAddr(),
|
|
proxyConfig.getNamesrvDomain(), proxyConfig.getNamesrvDomainSubgroup());
|
|
- this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
|
|
+ this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(3);
|
|
|
|
this.messagingClientAPIFactory = new MQClientAPIFactory(
|
|
nameserverAccessConfig,
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
|
|
index 4d1ca7b66..59cd92685 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/LocalServiceManager.java
|
|
@@ -16,7 +16,6 @@
|
|
*/
|
|
package org.apache.rocketmq.proxy.service;
|
|
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
@@ -28,6 +27,7 @@ import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
import org.apache.rocketmq.proxy.service.admin.AdminService;
|
|
@@ -58,7 +58,7 @@ public class LocalServiceManager extends AbstractStartAndShutdown implements Ser
|
|
private final MQClientAPIFactory mqClientAPIFactory;
|
|
private final ChannelManager channelManager;
|
|
|
|
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
|
|
+ private final ScheduledExecutorService scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryImpl("LocalServiceManagerScheduledThread"));
|
|
|
|
public LocalServiceManager(BrokerController brokerController, RPCHook rpcHook) {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
index 69f44344a..207603fe8 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
|
|
@@ -24,7 +24,6 @@ import java.util.Set;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -42,20 +41,21 @@ import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
|
|
import org.apache.rocketmq.common.utils.StartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
-import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
|
|
import org.apache.rocketmq.proxy.common.ProxyContext;
|
|
import org.apache.rocketmq.proxy.common.ProxyException;
|
|
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
|
|
import org.apache.rocketmq.proxy.common.ReceiptHandleGroup;
|
|
+import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
|
|
+import org.apache.rocketmq.proxy.common.RenewEvent;
|
|
import org.apache.rocketmq.proxy.common.RenewStrategyPolicy;
|
|
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
|
|
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
|
|
import org.apache.rocketmq.proxy.config.ConfigurationManager;
|
|
import org.apache.rocketmq.proxy.config.ProxyConfig;
|
|
-import org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey;
|
|
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
|
|
import org.apache.rocketmq.remoting.protocol.subscription.RetryPolicy;
|
|
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
|
|
@@ -68,7 +68,7 @@ public class DefaultReceiptHandleManager extends AbstractStartAndShutdown implem
|
|
protected final StateEventListener<RenewEvent> eventListener;
|
|
protected final static RetryPolicy RENEW_POLICY = new RenewStrategyPolicy();
|
|
protected final ScheduledExecutorService scheduledExecutorService =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
|
|
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("RenewalScheduledThread_"));
|
|
protected final ThreadPoolExecutor renewalWorkerService;
|
|
|
|
public DefaultReceiptHandleManager(MetadataService metadataService, ConsumerManager consumerManager, StateEventListener<RenewEvent> eventListener) {
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
index caf62a1e0..ccf094c03 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
|
|
@@ -19,25 +19,24 @@ package org.apache.rocketmq.proxy.service.route;
|
|
import com.github.benmanes.caffeine.cache.CacheLoader;
|
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
|
import com.github.benmanes.caffeine.cache.LoadingCache;
|
|
+import com.google.common.base.Optional;
|
|
import java.time.Duration;
|
|
import java.util.List;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
-
|
|
-import com.google.common.base.Optional;
|
|
import org.apache.rocketmq.client.ClientConfig;
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
|
|
import org.apache.rocketmq.client.latency.MQFaultStrategy;
|
|
import org.apache.rocketmq.client.latency.Resolver;
|
|
import org.apache.rocketmq.client.latency.ServiceDetector;
|
|
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.message.MessageQueue;
|
|
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
|
|
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.proxy.common.Address;
|
|
@@ -63,7 +62,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown {
|
|
public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
|
|
ProxyConfig config = ConfigurationManager.getProxyConfig();
|
|
|
|
- this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
|
|
+ this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryImpl("TopicRouteService_")
|
|
);
|
|
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
|
index 8491f4354..64621dd6c 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
|
|
@@ -61,7 +61,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
@@ -71,6 +70,7 @@ import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.common.Pair;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.ChannelEventListener;
|
|
@@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
|
|
|
|
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactoryImpl("NettyClientPublicExecutor_"));
|
|
|
|
- this.scanExecutor = new ThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
|
|
+ this.scanExecutor = ThreadUtils.newThreadPoolExecutor(4, 10, 60, TimeUnit.SECONDS,
|
|
new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("NettyClientScan_thread_"));
|
|
|
|
if (eventLoopGroup != null) {
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
index e626260c9..aa0d46542 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
|
|
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.constant.HAProxyConstants;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.utils.BinaryUtil;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.ChannelEventListener;
|
|
@@ -83,7 +84,6 @@ import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
@@ -171,7 +171,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
|
|
}
|
|
|
|
private ScheduledExecutorService buildScheduleExecutor() {
|
|
- return new ScheduledThreadPoolExecutor(1,
|
|
+ return ThreadUtils.newScheduledThreadPool(1,
|
|
new ThreadFactoryImpl("NettyServerScheduler_", true),
|
|
new ThreadPoolExecutor.DiscardOldestPolicy());
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
index f2a54ddf6..02ea47f13 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
|
|
@@ -48,7 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
@@ -83,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
|
|
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
|
|
import org.apache.rocketmq.common.utils.QueueTypeUtils;
|
|
import org.apache.rocketmq.common.utils.ServiceProvider;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
|
|
@@ -205,7 +205,7 @@ public class DefaultMessageStore implements MessageStore {
|
|
private ConcurrentMap<String, TopicConfig> topicConfigTable;
|
|
|
|
private final ScheduledExecutorService scheduledCleanQueueExecutorService =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
|
|
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
|
|
|
|
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
|
|
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable) throws IOException {
|
|
@@ -253,7 +253,7 @@ public class DefaultMessageStore implements MessageStore {
|
|
this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog());
|
|
|
|
this.scheduledExecutorService =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
|
|
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
|
|
|
|
this.dispatcherList = new LinkedList<>();
|
|
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
|
|
@@ -2915,7 +2915,7 @@ public class DefaultMessageStore implements MessageStore {
|
|
private final ExecutorService batchDispatchRequestExecutor;
|
|
|
|
public MainBatchDispatchRequestService() {
|
|
- batchDispatchRequestExecutor = new ThreadPoolExecutor(
|
|
+ batchDispatchRequestExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
|
|
DefaultMessageStore.this.getMessageStoreConfig().getBatchDispatchRequestThreadPoolNums(),
|
|
1000 * 60,
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
|
|
index d5393fdca..f20bc3e28 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
|
|
@@ -17,10 +17,26 @@
|
|
|
|
package org.apache.rocketmq.store.ha.autoswitch;
|
|
|
|
-
|
|
+import java.io.IOException;
|
|
+import java.nio.channels.SocketChannel;
|
|
+import java.util.ArrayList;
|
|
+import java.util.HashSet;
|
|
+import java.util.Iterator;
|
|
+import java.util.List;
|
|
+import java.util.Map;
|
|
+import java.util.Objects;
|
|
+import java.util.Set;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.concurrent.ExecutorService;
|
|
+import java.util.concurrent.locks.Lock;
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
+import java.util.function.Consumer;
|
|
+import java.util.stream.Collectors;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.protocol.EpochEntry;
|
|
@@ -36,30 +52,12 @@ import org.apache.rocketmq.store.ha.HAClient;
|
|
import org.apache.rocketmq.store.ha.HAConnection;
|
|
import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
|
|
|
|
-import java.io.IOException;
|
|
-import java.nio.channels.SocketChannel;
|
|
-import java.util.ArrayList;
|
|
-import java.util.HashSet;
|
|
-import java.util.List;
|
|
-import java.util.Iterator;
|
|
-import java.util.Map;
|
|
-import java.util.Objects;
|
|
-import java.util.Set;
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
-import java.util.concurrent.ExecutorService;
|
|
-import java.util.concurrent.Executors;
|
|
-import java.util.concurrent.locks.Lock;
|
|
-import java.util.concurrent.locks.ReadWriteLock;
|
|
-import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
-import java.util.function.Consumer;
|
|
-import java.util.stream.Collectors;
|
|
-
|
|
/**
|
|
* SwitchAble ha service, support switch role to master or slave.
|
|
*/
|
|
public class AutoSwitchHAService extends DefaultHAService {
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
|
|
- private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
|
|
+ private final ExecutorService executorService = ThreadUtils.newSingleThreadExecutor(new ThreadFactoryImpl("AutoSwitchHAService_Executor_"));
|
|
private final ConcurrentHashMap<Long/*brokerId*/, Long/*lastCaughtUpTimestamp*/> connectionCaughtUpTimeTable = new ConcurrentHashMap<>();
|
|
private final List<Consumer<Set<Long/*brokerId*/>>> syncStateSetChangedListeners = new ArrayList<>();
|
|
private final Set<Long/*brokerId*/> syncStateSet = new HashSet<>();
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
|
|
index b37c90726..639084fa2 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
|
|
@@ -16,17 +16,25 @@
|
|
*/
|
|
package org.apache.rocketmq.store.kv;
|
|
|
|
-import java.util.Random;
|
|
+import java.io.File;
|
|
+import java.io.IOException;
|
|
+import java.nio.file.Files;
|
|
+import java.nio.file.Paths;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Objects;
|
|
import java.util.Optional;
|
|
+import java.util.Random;
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
+import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.TopicConfig;
|
|
import org.apache.rocketmq.common.attribute.CleanupPolicy;
|
|
import org.apache.rocketmq.common.constant.LoggerName;
|
|
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.DefaultMessageStore;
|
|
@@ -35,15 +43,6 @@ import org.apache.rocketmq.store.GetMessageResult;
|
|
import org.apache.rocketmq.store.SelectMappedBufferResult;
|
|
import org.apache.rocketmq.store.config.MessageStoreConfig;
|
|
|
|
-import java.io.File;
|
|
-import java.io.IOException;
|
|
-import java.nio.file.Files;
|
|
-import java.nio.file.Paths;
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
-import java.util.concurrent.Executors;
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.TimeUnit;
|
|
-
|
|
public class CompactionStore {
|
|
|
|
public static final String COMPACTION_DIR = "compaction";
|
|
@@ -76,7 +75,7 @@ public class CompactionStore {
|
|
this.positionMgr = new CompactionPositionMgr(compactionPath);
|
|
this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1, config.getCompactionThreadNum()));
|
|
|
|
- this.compactionSchedule = Executors.newScheduledThreadPool(this.compactionThreadNum,
|
|
+ this.compactionSchedule = ThreadUtils.newScheduledThreadPool(this.compactionThreadNum,
|
|
new ThreadFactoryImpl("compactionSchedule_"));
|
|
this.offsetMapSize = config.getMaxOffsetMapSize() / compactionThreadNum;
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
|
|
index 8d38503b3..d03d15d65 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
|
|
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.FutureTask;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
import org.apache.rocketmq.common.TopicConfig;
|
|
@@ -34,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
import org.apache.rocketmq.common.utils.QueueTypeUtils;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.CommitLog;
|
|
@@ -175,7 +175,7 @@ public class ConsumeQueueStore {
|
|
}
|
|
|
|
private ExecutorService buildExecutorService(BlockingQueue<Runnable> blockingQueue, String threadNamePrefix) {
|
|
- return new ThreadPoolExecutor(
|
|
+ return ThreadUtils.newThreadPoolExecutor(
|
|
this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
|
|
this.messageStore.getBrokerConfig().getRecoverThreadPoolNums(),
|
|
1000 * 60,
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
|
|
index 2dd3fc5b5..489d7b4fb 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
|
|
@@ -17,7 +17,6 @@
|
|
package org.apache.rocketmq.store.stats;
|
|
|
|
import java.util.HashMap;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import org.apache.commons.lang3.tuple.Pair;
|
|
import org.apache.rocketmq.common.BrokerConfig;
|
|
@@ -32,13 +31,14 @@ import org.apache.rocketmq.common.statistics.StatisticsItemScheduledPrinter;
|
|
import org.apache.rocketmq.common.statistics.StatisticsItemStateGetter;
|
|
import org.apache.rocketmq.common.statistics.StatisticsKindMeta;
|
|
import org.apache.rocketmq.common.statistics.StatisticsManager;
|
|
+import org.apache.rocketmq.common.stats.MomentStatsItemSet;
|
|
import org.apache.rocketmq.common.stats.Stats;
|
|
+import org.apache.rocketmq.common.stats.StatsItem;
|
|
+import org.apache.rocketmq.common.stats.StatsItemSet;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
-import org.apache.rocketmq.common.stats.MomentStatsItemSet;
|
|
-import org.apache.rocketmq.common.stats.StatsItem;
|
|
-import org.apache.rocketmq.common.stats.StatsItemSet;
|
|
|
|
public class BrokerStatsManager {
|
|
|
|
@@ -281,11 +281,11 @@ public class BrokerStatsManager {
|
|
|
|
private void initScheduleService() {
|
|
this.scheduledExecutorService =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
|
|
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerStatsThread", true, brokerConfig));
|
|
this.commercialExecutor =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
|
|
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
|
|
this.accountExecutor =
|
|
- Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
|
|
+ ThreadUtils.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
|
|
}
|
|
|
|
public MomentStatsItemSet getMomentStatsItemSetFallSize() {
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
|
index 181f7087a..0d50de65a 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
|
|
@@ -35,7 +35,6 @@ import java.util.Set;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ConcurrentSkipListSet;
|
|
import java.util.concurrent.CountDownLatch;
|
|
-import java.util.concurrent.Executors;
|
|
import java.util.concurrent.LinkedBlockingDeque;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -54,6 +53,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.store.ConsumeQueue;
|
|
@@ -174,11 +174,11 @@ public class TimerMessageStore {
|
|
this.lastBrokerRole = storeConfig.getBrokerRole();
|
|
|
|
if (messageStore instanceof DefaultMessageStore) {
|
|
- scheduler = Executors.newSingleThreadScheduledExecutor(
|
|
+ scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryImpl("TimerScheduledThread",
|
|
((DefaultMessageStore) messageStore).getBrokerIdentity()));
|
|
} else {
|
|
- scheduler = Executors.newSingleThreadScheduledExecutor(
|
|
+ scheduler = ThreadUtils.newSingleThreadScheduledExecutor(
|
|
new ThreadFactoryImpl("TimerScheduledThread"));
|
|
}
|
|
|
|
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
|
|
index f3d105bc6..080b7e385 100644
|
|
--- a/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
|
|
+++ b/test/src/main/java/org/apache/rocketmq/test/util/StatUtil.java
|
|
@@ -28,7 +28,6 @@ import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
-
|
|
import javax.annotation.Generated;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
|
index 6dd0e8846..65d586f43 100644
|
|
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
|
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
|
|
@@ -20,10 +20,10 @@ import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
-import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.rocketmq.common.ThreadFactoryImpl;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
|
|
public class TieredStoreExecutor {
|
|
|
|
@@ -43,20 +43,20 @@ public class TieredStoreExecutor {
|
|
public static ExecutorService compactIndexFileExecutor;
|
|
|
|
public static void init() {
|
|
- commonScheduledExecutor = new ScheduledThreadPoolExecutor(
|
|
+ commonScheduledExecutor = ThreadUtils.newScheduledThreadPool(
|
|
Math.max(4, Runtime.getRuntime().availableProcessors()),
|
|
new ThreadFactoryImpl("TieredCommonExecutor_"));
|
|
|
|
- commitExecutor = new ScheduledThreadPoolExecutor(
|
|
+ commitExecutor = ThreadUtils.newScheduledThreadPool(
|
|
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
|
|
new ThreadFactoryImpl("TieredCommitExecutor_"));
|
|
|
|
- cleanExpiredFileExecutor = new ScheduledThreadPoolExecutor(
|
|
+ cleanExpiredFileExecutor = ThreadUtils.newScheduledThreadPool(
|
|
Math.max(4, Runtime.getRuntime().availableProcessors()),
|
|
new ThreadFactoryImpl("TieredCleanFileExecutor_"));
|
|
|
|
dispatchThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
|
- dispatchExecutor = new ThreadPoolExecutor(
|
|
+ dispatchExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
Math.max(2, Runtime.getRuntime().availableProcessors()),
|
|
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
|
|
1000 * 60,
|
|
@@ -66,7 +66,7 @@ public class TieredStoreExecutor {
|
|
new ThreadPoolExecutor.DiscardOldestPolicy());
|
|
|
|
fetchDataThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
|
- fetchDataExecutor = new ThreadPoolExecutor(
|
|
+ fetchDataExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
|
|
Math.max(64, Runtime.getRuntime().availableProcessors() * 8),
|
|
1000 * 60,
|
|
@@ -75,7 +75,7 @@ public class TieredStoreExecutor {
|
|
new ThreadFactoryImpl("TieredFetchExecutor_"));
|
|
|
|
compactIndexFileThreadPoolQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
|
|
- compactIndexFileExecutor = new ThreadPoolExecutor(
|
|
+ compactIndexFileExecutor = ThreadUtils.newThreadPoolExecutor(
|
|
1,
|
|
1,
|
|
1000 * 60,
|
|
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
|
|
index fa3596d51..1ebff6d8a 100644
|
|
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
|
|
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
|
|
@@ -66,6 +66,7 @@ import org.apache.rocketmq.common.namesrv.NamesrvUtil;
|
|
import org.apache.rocketmq.common.topic.TopicValidator;
|
|
import org.apache.rocketmq.common.utils.NetworkUtil;
|
|
import org.apache.rocketmq.common.BoundaryType;
|
|
+import org.apache.rocketmq.common.utils.ThreadUtils;
|
|
import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.RPCHook;
|
|
@@ -193,7 +194,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
|
|
|
|
int threadPoolCoreSize = Integer.parseInt(System.getProperty("rocketmq.admin.threadpool.coresize", "20"));
|
|
|
|
- this.threadPoolExecutor = new ThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
|
|
+ this.threadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(threadPoolCoreSize, 100, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("DefaultMQAdminExtImpl_"));
|
|
|
|
break;
|
|
case RUNNING:
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From dad6b4dadfec7a58e78a6715ec16c2eb6b17ff27 Mon Sep 17 00:00:00 2001
|
|
From: Ziyi Tan <ajb459684460@gmail.com>
|
|
Date: Mon, 11 Sep 2023 14:34:10 +0800
|
|
Subject: [PATCH 2/6] [ISSUE #7334] `registerIncrementBrokerData` for single
|
|
topic update (#7335)
|
|
|
|
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
|
|
---
|
|
.../broker/topic/TopicConfigManager.java | 30 +++++++++++++++----
|
|
1 file changed, 25 insertions(+), 5 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
index 4e3c1736c..754605438 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
@@ -290,7 +290,11 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
|
|
if (createNew) {
|
|
- this.brokerController.registerBrokerAll(false, true, true);
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
}
|
|
|
|
return topicConfig;
|
|
@@ -394,7 +398,11 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
|
|
if (createNew) {
|
|
- this.brokerController.registerBrokerAll(false, true, true);
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
}
|
|
|
|
return topicConfig;
|
|
@@ -435,7 +443,11 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
|
|
if (createNew) {
|
|
- this.brokerController.registerBrokerAll(false, true, true);
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
}
|
|
|
|
return topicConfig;
|
|
@@ -461,7 +473,11 @@ public class TopicConfigManager extends ConfigManager {
|
|
dataVersion.nextVersion(stateMachineVersion);
|
|
|
|
this.persist();
|
|
- this.brokerController.registerBrokerAll(false, true, true);
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
}
|
|
}
|
|
|
|
@@ -484,7 +500,11 @@ public class TopicConfigManager extends ConfigManager {
|
|
dataVersion.nextVersion(stateMachineVersion);
|
|
|
|
this.persist();
|
|
- this.brokerController.registerBrokerAll(false, true, true);
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
}
|
|
}
|
|
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 0dbd0772b99f618f757d42cd64542b83e2100e4f Mon Sep 17 00:00:00 2001
|
|
From: Ziyi Tan <ajb459684460@gmail.com>
|
|
Date: Mon, 11 Sep 2023 15:48:07 +0800
|
|
Subject: [PATCH 3/6] [ISSUE #7326] Split the request to register to the
|
|
nameserver (#7325)
|
|
|
|
Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
|
|
---
|
|
.../rocketmq/broker/BrokerController.java | 41 +++++++++++--------
|
|
.../broker/topic/TopicConfigManager.java | 21 ++++++++++
|
|
.../apache/rocketmq/common/BrokerConfig.java | 24 +++++++++++
|
|
.../test/route/CreateAndUpdateTopicIT.java | 31 ++++++++++++++
|
|
4 files changed, 99 insertions(+), 18 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
index 275b64b1a..9e49f636d 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
@@ -1765,29 +1765,34 @@ public class BrokerController {
|
|
}
|
|
|
|
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
|
|
+ ConcurrentMap<String, TopicConfig> topicConfigMap = this.getTopicConfigManager().getTopicConfigTable();
|
|
+ ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
|
|
|
|
- TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
|
|
-
|
|
- topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
|
|
- topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
|
|
-
|
|
- topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
|
|
- entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
|
|
- ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
|
-
|
|
- if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|
|
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
|
|
- ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
|
|
- for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
|
|
- TopicConfig tmp =
|
|
+ for (TopicConfig topicConfig : topicConfigMap.values()) {
|
|
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|
|
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
|
|
+ topicConfigTable.put(topicConfig.getTopicName(),
|
|
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
|
|
- topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
|
|
- topicConfigTable.put(topicConfig.getTopicName(), tmp);
|
|
+ topicConfig.getPerm() & getBrokerConfig().getBrokerPermission()));
|
|
+ } else {
|
|
+ topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
|
|
+ }
|
|
+
|
|
+ if (this.brokerConfig.isEnableSplitRegistration()
|
|
+ && topicConfigTable.size() >= this.brokerConfig.getSplitRegistrationSize()) {
|
|
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildSerializeWrapper(topicConfigTable);
|
|
+ doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
|
|
+ topicConfigTable.clear();
|
|
}
|
|
- topicConfigWrapper.setTopicConfigTable(topicConfigTable);
|
|
}
|
|
|
|
- if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
|
|
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream()
|
|
+ .map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue())))
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
+
|
|
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().
|
|
+ buildSerializeWrapper(topicConfigTable, topicQueueMappingInfoMap);
|
|
+ if (this.brokerConfig.isEnableSplitRegistration() || forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
|
|
this.getBrokerAddr(),
|
|
this.brokerConfig.getBrokerName(),
|
|
this.brokerConfig.getBrokerId(),
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
index 754605438..8537929be 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
|
+import com.google.common.collect.Maps;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.rocketmq.broker.BrokerController;
|
|
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
|
|
@@ -47,7 +48,9 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
|
|
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
|
|
import org.apache.rocketmq.remoting.protocol.DataVersion;
|
|
import org.apache.rocketmq.remoting.protocol.body.KVTable;
|
|
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
|
|
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
|
|
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
|
|
|
|
import static com.google.common.base.Preconditions.checkNotNull;
|
|
|
|
@@ -609,6 +612,24 @@ public class TopicConfigManager extends ConfigManager {
|
|
return topicConfigSerializeWrapper;
|
|
}
|
|
|
|
+ public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(final ConcurrentMap<String, TopicConfig> topicConfigTable) {
|
|
+ return buildSerializeWrapper(topicConfigTable, Maps.newHashMap());
|
|
+ }
|
|
+
|
|
+ public TopicConfigAndMappingSerializeWrapper buildSerializeWrapper(
|
|
+ final ConcurrentMap<String, TopicConfig> topicConfigTable,
|
|
+ final Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap
|
|
+ ) {
|
|
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();
|
|
+ topicConfigWrapper.setTopicConfigTable(topicConfigTable);
|
|
+ topicConfigWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
|
|
+ topicConfigWrapper.setDataVersion(this.getDataVersion());
|
|
+ if (this.brokerController.getBrokerConfig().isEnableSplitRegistration()) {
|
|
+ this.getDataVersion().nextVersion();
|
|
+ }
|
|
+ return topicConfigWrapper;
|
|
+ }
|
|
+
|
|
@Override
|
|
public String encode() {
|
|
return encode(false);
|
|
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
|
index 45d26b29c..0d248c4e1 100644
|
|
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
|
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
|
|
@@ -396,6 +396,14 @@ public class BrokerConfig extends BrokerIdentity {
|
|
|
|
private boolean enableMixedMessageType = false;
|
|
|
|
+ /**
|
|
+ * This flag and deleteTopicWithBrokerRegistration flag in the NameServer cannot be set to true at the same time,
|
|
+ * otherwise there will be a loss of routing
|
|
+ */
|
|
+ private boolean enableSplitRegistration = false;
|
|
+
|
|
+ private int splitRegistrationSize = 800;
|
|
+
|
|
public long getMaxPopPollingSize() {
|
|
return maxPopPollingSize;
|
|
}
|
|
@@ -1731,4 +1739,20 @@ public class BrokerConfig extends BrokerIdentity {
|
|
public void setEnableMixedMessageType(boolean enableMixedMessageType) {
|
|
this.enableMixedMessageType = enableMixedMessageType;
|
|
}
|
|
+
|
|
+ public boolean isEnableSplitRegistration() {
|
|
+ return enableSplitRegistration;
|
|
+ }
|
|
+
|
|
+ public void setEnableSplitRegistration(boolean enableSplitRegistration) {
|
|
+ this.enableSplitRegistration = enableSplitRegistration;
|
|
+ }
|
|
+
|
|
+ public int getSplitRegistrationSize() {
|
|
+ return splitRegistrationSize;
|
|
+ }
|
|
+
|
|
+ public void setSplitRegistrationSize(int splitRegistrationSize) {
|
|
+ this.splitRegistrationSize = splitRegistrationSize;
|
|
+ }
|
|
}
|
|
diff --git a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
|
|
index 7e3c7b871..2370e68c0 100644
|
|
--- a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
|
|
+++ b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
|
|
@@ -17,6 +17,7 @@
|
|
|
|
package org.apache.rocketmq.test.route;
|
|
|
|
+import org.apache.rocketmq.common.TopicConfig;
|
|
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
|
|
import org.apache.rocketmq.test.base.BaseConf;
|
|
import org.apache.rocketmq.test.util.MQAdminTestUtils;
|
|
@@ -111,4 +112,34 @@ public class CreateAndUpdateTopicIT extends BaseConf {
|
|
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
|
|
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
|
|
}
|
|
+
|
|
+ @Test
|
|
+ public void testCreateOrUpdateTopic_EnableSplitRegistration() {
|
|
+ brokerController1.getBrokerConfig().setEnableSplitRegistration(true);
|
|
+ brokerController2.getBrokerConfig().setEnableSplitRegistration(true);
|
|
+ brokerController3.getBrokerConfig().setEnableSplitRegistration(true);
|
|
+
|
|
+ String testTopic = "test-topic-";
|
|
+
|
|
+ for (int i = 0; i < 1000; i++) {
|
|
+ TopicConfig topicConfig = new TopicConfig(testTopic + i, 8, 8);
|
|
+ brokerController1.getTopicConfigManager().updateTopicConfig(topicConfig);
|
|
+ brokerController2.getTopicConfigManager().updateTopicConfig(topicConfig);
|
|
+ brokerController3.getTopicConfigManager().updateTopicConfig(topicConfig);
|
|
+ }
|
|
+
|
|
+ brokerController1.registerBrokerAll(false, true, true);
|
|
+ brokerController2.registerBrokerAll(false, true, true);
|
|
+ brokerController3.registerBrokerAll(false, true, true);
|
|
+
|
|
+ for (int i = 0; i < 1000; i++) {
|
|
+ TopicRouteData route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic + i);
|
|
+ assertThat(route.getBrokerDatas()).hasSize(3);
|
|
+ assertThat(route.getQueueDatas()).hasSize(3);
|
|
+ }
|
|
+
|
|
+ brokerController1.getBrokerConfig().setEnableSplitRegistration(false);
|
|
+ brokerController2.getBrokerConfig().setEnableSplitRegistration(false);
|
|
+ brokerController3.getBrokerConfig().setEnableSplitRegistration(false);
|
|
+ }
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From a9e353285cea762b0c5eab567bdfa8e5c8c2d279 Mon Sep 17 00:00:00 2001
|
|
From: rongtong <jinrongtong5@163.com>
|
|
Date: Mon, 11 Sep 2023 15:55:18 +0800
|
|
Subject: [PATCH 4/6] Add the configuration of topicQueueLock number to better
|
|
support different scenarios (#7317)
|
|
|
|
---
|
|
.../main/java/org/apache/rocketmq/store/CommitLog.java | 2 +-
|
|
.../java/org/apache/rocketmq/store/TopicQueueLock.java | 8 ++++++++
|
|
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++++
|
|
3 files changed, 19 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
index e6ee3bacc..456bf2b86 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
|
|
@@ -122,7 +122,7 @@ public class CommitLog implements Swappable {
|
|
|
|
this.flushDiskWatcher = new FlushDiskWatcher();
|
|
|
|
- this.topicQueueLock = new TopicQueueLock();
|
|
+ this.topicQueueLock = new TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
|
|
|
|
this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
|
|
}
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
|
|
index a78eeed23..5a131b5c3 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
|
|
@@ -34,6 +34,14 @@ public class TopicQueueLock {
|
|
}
|
|
}
|
|
|
|
+ public TopicQueueLock(int size) {
|
|
+ this.size = size;
|
|
+ this.lockList = new ArrayList<>(size);
|
|
+ for (int i = 0; i < this.size; i++) {
|
|
+ this.lockList.add(new ReentrantLock());
|
|
+ }
|
|
+ }
|
|
+
|
|
public void lock(String topicQueueKey) {
|
|
Lock lock = this.lockList.get((topicQueueKey.hashCode() & 0x7fffffff) % this.size);
|
|
lock.lock();
|
|
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
|
|
index efb728ac0..9fa448043 100644
|
|
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
|
|
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
|
|
@@ -401,6 +401,8 @@ public class MessageStoreConfig {
|
|
private long memTableFlushInterval = 60 * 60 * 1000L;
|
|
private boolean enableRocksDBLog = false;
|
|
|
|
+ private int topicQueueLockNum = 32;
|
|
+
|
|
public boolean isDebugLockEnable() {
|
|
return debugLockEnable;
|
|
}
|
|
@@ -1751,4 +1753,12 @@ public class MessageStoreConfig {
|
|
public void setEnableRocksDBLog(boolean enableRocksDBLog) {
|
|
this.enableRocksDBLog = enableRocksDBLog;
|
|
}
|
|
+
|
|
+ public int getTopicQueueLockNum() {
|
|
+ return topicQueueLockNum;
|
|
+ }
|
|
+
|
|
+ public void setTopicQueueLockNum(int topicQueueLockNum) {
|
|
+ this.topicQueueLockNum = topicQueueLockNum;
|
|
+ }
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 57f04c95d3a2ba6b91583058a6e4eda209f72d6e Mon Sep 17 00:00:00 2001
|
|
From: guyinyou <36399867+guyinyou@users.noreply.github.com>
|
|
Date: Mon, 11 Sep 2023 18:23:25 +0800
|
|
Subject: [PATCH 5/6] [ISSUE #7343] Rollback modifications to registerProcessor
|
|
|
|
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
|
|
---
|
|
.../java/org/apache/rocketmq/broker/BrokerController.java | 4 ++--
|
|
1 file changed, 2 insertions(+), 2 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
index 9e49f636d..13a3feb4e 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
|
|
@@ -827,6 +827,8 @@ public class BrokerController {
|
|
|
|
initializeResources();
|
|
|
|
+ registerProcessor();
|
|
+
|
|
initializeScheduledTasks();
|
|
|
|
initialTransaction();
|
|
@@ -1687,8 +1689,6 @@ public class BrokerController {
|
|
}
|
|
}
|
|
}, 10, 5, TimeUnit.SECONDS);
|
|
-
|
|
- registerProcessor();
|
|
}
|
|
|
|
protected void scheduleSendHeartbeat() {
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From dad6ad09d13dadc36b6342671c77f619bbb8c522 Mon Sep 17 00:00:00 2001
|
|
From: Ao Qiao <qiao_ao@foxmail.com>
|
|
Date: Tue, 12 Sep 2023 08:28:45 +0800
|
|
Subject: [PATCH 6/6] [ISSUE #7340] Abstract Duplicate code into a method in
|
|
`TopicConfigManager` (#7341)
|
|
|
|
---
|
|
.../broker/topic/TopicConfigManager.java | 44 ++++++-------------
|
|
1 file changed, 14 insertions(+), 30 deletions(-)
|
|
|
|
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
index 8537929be..511d29e12 100644
|
|
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
|
|
@@ -293,11 +293,7 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
|
|
if (createNew) {
|
|
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
- this.brokerController.registerSingleTopicAll(topicConfig);
|
|
- } else {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
- }
|
|
+ registerBrokerData(topicConfig);
|
|
}
|
|
|
|
return topicConfig;
|
|
@@ -337,11 +333,7 @@ public class TopicConfigManager extends ConfigManager {
|
|
log.error("createTopicIfAbsent ", e);
|
|
}
|
|
if (createNew && register) {
|
|
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
- this.brokerController.registerSingleTopicAll(topicConfig);
|
|
- } else {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
- }
|
|
+ registerBrokerData(topicConfig);
|
|
}
|
|
return getTopicConfig(topicConfig.getTopicName());
|
|
}
|
|
@@ -401,11 +393,7 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
|
|
if (createNew) {
|
|
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
- this.brokerController.registerSingleTopicAll(topicConfig);
|
|
- } else {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
- }
|
|
+ registerBrokerData(topicConfig);
|
|
}
|
|
|
|
return topicConfig;
|
|
@@ -446,11 +434,7 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
|
|
if (createNew) {
|
|
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
- this.brokerController.registerSingleTopicAll(topicConfig);
|
|
- } else {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
- }
|
|
+ registerBrokerData(topicConfig);
|
|
}
|
|
|
|
return topicConfig;
|
|
@@ -476,11 +460,7 @@ public class TopicConfigManager extends ConfigManager {
|
|
dataVersion.nextVersion(stateMachineVersion);
|
|
|
|
this.persist();
|
|
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
- this.brokerController.registerSingleTopicAll(topicConfig);
|
|
- } else {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
- }
|
|
+ registerBrokerData(topicConfig);
|
|
}
|
|
}
|
|
|
|
@@ -503,11 +483,7 @@ public class TopicConfigManager extends ConfigManager {
|
|
dataVersion.nextVersion(stateMachineVersion);
|
|
|
|
this.persist();
|
|
- if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
- this.brokerController.registerSingleTopicAll(topicConfig);
|
|
- } else {
|
|
- this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
- }
|
|
+ registerBrokerData(topicConfig);
|
|
}
|
|
}
|
|
|
|
@@ -699,6 +675,14 @@ public class TopicConfigManager extends ConfigManager {
|
|
}
|
|
}
|
|
|
|
+ private void registerBrokerData(TopicConfig topicConfig) {
|
|
+ if (brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
|
|
+ this.brokerController.registerSingleTopicAll(topicConfig);
|
|
+ } else {
|
|
+ this.brokerController.registerIncrementBrokerData(topicConfig, dataVersion);
|
|
+ }
|
|
+ }
|
|
+
|
|
public boolean containsTopic(String topic) {
|
|
return topicConfigTable.containsKey(topic);
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|