188 lines
9.6 KiB
Diff
188 lines
9.6 KiB
Diff
From 3968c186a59db96701ade8c343bc6a5d31ee2d24 Mon Sep 17 00:00:00 2001
|
|
From: weihubeats <weihu@apache.org>
|
|
Date: Fri, 20 Oct 2023 14:49:00 +0800
|
|
Subject: [PATCH 1/2] [ISSUE #7231] Fix: proxy client language error (#7200)
|
|
|
|
* Adding null does not update
|
|
|
|
* add langeuga code
|
|
|
|
* add langeuga code
|
|
|
|
* add langeuga code
|
|
|
|
* add langeuga code
|
|
|
|
* add langeuga code
|
|
|
|
* Rerun ci
|
|
|
|
* Rerun ci
|
|
|
|
* Rerun ci
|
|
|
|
* remove redundant package imports
|
|
|
|
* redundant line
|
|
|
|
* modify the parameter passed as proxyContext to language
|
|
|
|
* format
|
|
---
|
|
.../proxy/service/message/LocalMessageService.java | 12 ++++++------
|
|
.../proxy/service/message/LocalRemotingCommand.java | 8 ++++++--
|
|
.../rocketmq/remoting/protocol/LanguageCode.java | 11 +++++++++++
|
|
3 files changed, 23 insertions(+), 8 deletions(-)
|
|
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
|
index ca7dcc9eb..aaa688fee 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
|
@@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService {
|
|
body = message.getBody();
|
|
messageId = MessageClientIDSetter.getUniqID(message);
|
|
}
|
|
- RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
|
|
+ RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
|
|
request.setBody(body);
|
|
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
|
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
|
|
@@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService {
|
|
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
|
|
SimpleChannel channel = channelManager.createChannel(ctx);
|
|
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
|
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
|
|
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage());
|
|
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
|
try {
|
|
RemotingCommand response = brokerController.getSendMessageProcessor()
|
|
@@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService {
|
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
|
SimpleChannel channel = channelManager.createChannel(ctx);
|
|
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
|
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
|
|
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage());
|
|
try {
|
|
brokerController.getEndTransactionProcessor()
|
|
.processRequest(channelHandlerContext, command);
|
|
@@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService {
|
|
public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
|
|
PopMessageRequestHeader requestHeader, long timeoutMillis) {
|
|
requestHeader.setBornTime(System.currentTimeMillis());
|
|
- RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
|
|
+ RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
|
|
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
|
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
|
|
InvocationContext invocationContext = new InvocationContext(future);
|
|
@@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService {
|
|
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
|
|
SimpleChannel channel = channelManager.createChannel(ctx);
|
|
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
|
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
|
|
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage());
|
|
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
|
try {
|
|
RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor()
|
|
@@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService {
|
|
AckMessageRequestHeader requestHeader, long timeoutMillis) {
|
|
SimpleChannel channel = channelManager.createChannel(ctx);
|
|
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
|
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
|
|
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage());
|
|
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
|
try {
|
|
RemotingCommand response = brokerController.getAckMessageProcessor()
|
|
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
|
index 73048dbbc..915cafcd5 100644
|
|
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
|
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
|
@@ -16,16 +16,19 @@
|
|
*/
|
|
package org.apache.rocketmq.proxy.service.message;
|
|
|
|
-import java.util.HashMap;
|
|
import org.apache.rocketmq.remoting.CommandCustomHeader;
|
|
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
|
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
|
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
|
|
|
+import java.util.HashMap;
|
|
+
|
|
public class LocalRemotingCommand extends RemotingCommand {
|
|
|
|
- public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
|
|
+ public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) {
|
|
LocalRemotingCommand cmd = new LocalRemotingCommand();
|
|
cmd.setCode(code);
|
|
+ cmd.setLanguage(LanguageCode.getCode(language));
|
|
cmd.writeCustomHeader(customHeader);
|
|
cmd.setExtFields(new HashMap<>());
|
|
setCmdVersion(cmd);
|
|
@@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand {
|
|
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
|
|
return classHeader.cast(readCustomHeader());
|
|
}
|
|
+
|
|
}
|
|
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
|
|
index 19280f996..2df9fbf02 100644
|
|
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
|
|
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
|
|
@@ -17,6 +17,11 @@
|
|
|
|
package org.apache.rocketmq.remoting.protocol;
|
|
|
|
+import java.util.Arrays;
|
|
+import java.util.Map;
|
|
+import java.util.function.Function;
|
|
+import java.util.stream.Collectors;
|
|
+
|
|
public enum LanguageCode {
|
|
JAVA((byte) 0),
|
|
CPP((byte) 1),
|
|
@@ -50,4 +55,10 @@ public enum LanguageCode {
|
|
public byte getCode() {
|
|
return code;
|
|
}
|
|
+
|
|
+ private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity()));
|
|
+
|
|
+ public static LanguageCode getCode(String language) {
|
|
+ return MAP.get(language);
|
|
+ }
|
|
}
|
|
--
|
|
2.32.0.windows.2
|
|
|
|
|
|
From 8f020b397a3afdd75429ae91e3624812b8ffc9e1 Mon Sep 17 00:00:00 2001
|
|
From: Ao Qiao <qiao_ao@foxmail.com>
|
|
Date: Mon, 23 Oct 2023 16:34:10 +0800
|
|
Subject: [PATCH 2/2] [ISSUE #7489] Code comment enhancement in example (#7490)
|
|
|
|
* Doc: How to debug in Idea
|
|
|
|
* en
|
|
|
|
* enhance code comment
|
|
---
|
|
.../java/org/apache/rocketmq/example/simple/PullConsumer.java | 2 +-
|
|
1 file changed, 1 insertion(+), 1 deletion(-)
|
|
|
|
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
|
|
index 5ac8d247d..e1a02aa26 100644
|
|
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
|
|
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
|
|
@@ -75,7 +75,7 @@ public class PullConsumer {
|
|
|
|
if (msgs != null && !msgs.isEmpty()) {
|
|
this.doSomething(msgs);
|
|
- //update offset to broker
|
|
+ //update offset to local memory, eventually to broker
|
|
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
|
|
//print pull tps
|
|
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
|
|
--
|
|
2.32.0.windows.2
|
|
|