rocketmq/patch028-backport-Fix-proxy-client-language-error.patch
2023-12-08 16:50:34 +08:00

188 lines
9.6 KiB
Diff

From 3968c186a59db96701ade8c343bc6a5d31ee2d24 Mon Sep 17 00:00:00 2001
From: weihubeats <weihu@apache.org>
Date: Fri, 20 Oct 2023 14:49:00 +0800
Subject: [PATCH 1/2] [ISSUE #7231] Fix: proxy client language error (#7200)
* Adding null does not update
* add langeuga code
* add langeuga code
* add langeuga code
* add langeuga code
* add langeuga code
* Rerun ci
* Rerun ci
* Rerun ci
* remove redundant package imports
* redundant line
* modify the parameter passed as proxyContext to language
* format
---
.../proxy/service/message/LocalMessageService.java | 12 ++++++------
.../proxy/service/message/LocalRemotingCommand.java | 8 ++++++--
.../rocketmq/remoting/protocol/LanguageCode.java | 11 +++++++++++
3 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index ca7dcc9eb..aaa688fee 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService {
body = message.getBody();
messageId = MessageClientIDSetter.getUniqID(message);
}
- RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+ RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
request.setBody(body);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
@@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService {
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getSendMessageProcessor()
@@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService {
CompletableFuture<Void> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage());
try {
brokerController.getEndTransactionProcessor()
.processRequest(channelHandlerContext, command);
@@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService {
public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
PopMessageRequestHeader requestHeader, long timeoutMillis) {
requestHeader.setBornTime(System.currentTimeMillis());
- RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
+ RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
InvocationContext invocationContext = new InvocationContext(future);
@@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService {
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor()
@@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService {
AckMessageRequestHeader requestHeader, long timeoutMillis) {
SimpleChannel channel = channelManager.createChannel(ctx);
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage());
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
RemotingCommand response = brokerController.getAckMessageProcessor()
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
index 73048dbbc..915cafcd5 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.proxy.service.message;
-import java.util.HashMap;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.util.HashMap;
+
public class LocalRemotingCommand extends RemotingCommand {
- public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+ public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) {
LocalRemotingCommand cmd = new LocalRemotingCommand();
cmd.setCode(code);
+ cmd.setLanguage(LanguageCode.getCode(language));
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
@@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand {
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
return classHeader.cast(readCustomHeader());
}
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
index 19280f996..2df9fbf02 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
@@ -17,6 +17,11 @@
package org.apache.rocketmq.remoting.protocol;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
public enum LanguageCode {
JAVA((byte) 0),
CPP((byte) 1),
@@ -50,4 +55,10 @@ public enum LanguageCode {
public byte getCode() {
return code;
}
+
+ private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity()));
+
+ public static LanguageCode getCode(String language) {
+ return MAP.get(language);
+ }
}
--
2.32.0.windows.2
From 8f020b397a3afdd75429ae91e3624812b8ffc9e1 Mon Sep 17 00:00:00 2001
From: Ao Qiao <qiao_ao@foxmail.com>
Date: Mon, 23 Oct 2023 16:34:10 +0800
Subject: [PATCH 2/2] [ISSUE #7489] Code comment enhancement in example (#7490)
* Doc: How to debug in Idea
* en
* enhance code comment
---
.../java/org/apache/rocketmq/example/simple/PullConsumer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index 5ac8d247d..e1a02aa26 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -75,7 +75,7 @@ public class PullConsumer {
if (msgs != null && !msgs.isEmpty()) {
this.doSomething(msgs);
- //update offset to broker
+ //update offset to local memory, eventually to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
//print pull tps
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
--
2.32.0.windows.2