!33 fix proxy client language error
From: @zhiliatox Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
8b5e76093f
187
patch028-backport-Fix-proxy-client-language-error.patch
Normal file
187
patch028-backport-Fix-proxy-client-language-error.patch
Normal file
@ -0,0 +1,187 @@
|
||||
From 3968c186a59db96701ade8c343bc6a5d31ee2d24 Mon Sep 17 00:00:00 2001
|
||||
From: weihubeats <weihu@apache.org>
|
||||
Date: Fri, 20 Oct 2023 14:49:00 +0800
|
||||
Subject: [PATCH 1/2] [ISSUE #7231] Fix: proxy client language error (#7200)
|
||||
|
||||
* Adding null does not update
|
||||
|
||||
* add langeuga code
|
||||
|
||||
* add langeuga code
|
||||
|
||||
* add langeuga code
|
||||
|
||||
* add langeuga code
|
||||
|
||||
* add langeuga code
|
||||
|
||||
* Rerun ci
|
||||
|
||||
* Rerun ci
|
||||
|
||||
* Rerun ci
|
||||
|
||||
* remove redundant package imports
|
||||
|
||||
* redundant line
|
||||
|
||||
* modify the parameter passed as proxyContext to language
|
||||
|
||||
* format
|
||||
---
|
||||
.../proxy/service/message/LocalMessageService.java | 12 ++++++------
|
||||
.../proxy/service/message/LocalRemotingCommand.java | 8 ++++++--
|
||||
.../rocketmq/remoting/protocol/LanguageCode.java | 11 +++++++++++
|
||||
3 files changed, 23 insertions(+), 8 deletions(-)
|
||||
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
||||
index ca7dcc9eb..aaa688fee 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
|
||||
@@ -104,7 +104,7 @@ public class LocalMessageService implements MessageService {
|
||||
body = message.getBody();
|
||||
messageId = MessageClientIDSetter.getUniqID(message);
|
||||
}
|
||||
- RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
|
||||
+ RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader, ctx.getLanguage());
|
||||
request.setBody(body);
|
||||
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
||||
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
|
||||
@@ -162,7 +162,7 @@ public class LocalMessageService implements MessageService {
|
||||
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
|
||||
SimpleChannel channel = channelManager.createChannel(ctx);
|
||||
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
||||
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
|
||||
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader, ctx.getLanguage());
|
||||
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
||||
try {
|
||||
RemotingCommand response = brokerController.getSendMessageProcessor()
|
||||
@@ -181,7 +181,7 @@ public class LocalMessageService implements MessageService {
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
SimpleChannel channel = channelManager.createChannel(ctx);
|
||||
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
||||
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
|
||||
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader, ctx.getLanguage());
|
||||
try {
|
||||
brokerController.getEndTransactionProcessor()
|
||||
.processRequest(channelHandlerContext, command);
|
||||
@@ -196,7 +196,7 @@ public class LocalMessageService implements MessageService {
|
||||
public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMessageQueue messageQueue,
|
||||
PopMessageRequestHeader requestHeader, long timeoutMillis) {
|
||||
requestHeader.setBornTime(System.currentTimeMillis());
|
||||
- RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader);
|
||||
+ RemotingCommand request = LocalRemotingCommand.createRequestCommand(RequestCode.POP_MESSAGE, requestHeader, ctx.getLanguage());
|
||||
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
||||
SimpleChannel channel = channelManager.createInvocationChannel(ctx);
|
||||
InvocationContext invocationContext = new InvocationContext(future);
|
||||
@@ -307,7 +307,7 @@ public class LocalMessageService implements MessageService {
|
||||
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
|
||||
SimpleChannel channel = channelManager.createChannel(ctx);
|
||||
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
||||
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader);
|
||||
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, requestHeader, ctx.getLanguage());
|
||||
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
||||
try {
|
||||
RemotingCommand response = brokerController.getChangeInvisibleTimeProcessor()
|
||||
@@ -346,7 +346,7 @@ public class LocalMessageService implements MessageService {
|
||||
AckMessageRequestHeader requestHeader, long timeoutMillis) {
|
||||
SimpleChannel channel = channelManager.createChannel(ctx);
|
||||
ChannelHandlerContext channelHandlerContext = channel.getChannelHandlerContext();
|
||||
- RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
|
||||
+ RemotingCommand command = LocalRemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader, ctx.getLanguage());
|
||||
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
|
||||
try {
|
||||
RemotingCommand response = brokerController.getAckMessageProcessor()
|
||||
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
||||
index 73048dbbc..915cafcd5 100644
|
||||
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
||||
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalRemotingCommand.java
|
||||
@@ -16,16 +16,19 @@
|
||||
*/
|
||||
package org.apache.rocketmq.proxy.service.message;
|
||||
|
||||
-import java.util.HashMap;
|
||||
import org.apache.rocketmq.remoting.CommandCustomHeader;
|
||||
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
|
||||
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
|
||||
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
|
||||
|
||||
+import java.util.HashMap;
|
||||
+
|
||||
public class LocalRemotingCommand extends RemotingCommand {
|
||||
|
||||
- public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
|
||||
+ public static LocalRemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader, String language) {
|
||||
LocalRemotingCommand cmd = new LocalRemotingCommand();
|
||||
cmd.setCode(code);
|
||||
+ cmd.setLanguage(LanguageCode.getCode(language));
|
||||
cmd.writeCustomHeader(customHeader);
|
||||
cmd.setExtFields(new HashMap<>());
|
||||
setCmdVersion(cmd);
|
||||
@@ -37,4 +40,5 @@ public class LocalRemotingCommand extends RemotingCommand {
|
||||
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
|
||||
return classHeader.cast(readCustomHeader());
|
||||
}
|
||||
+
|
||||
}
|
||||
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
|
||||
index 19280f996..2df9fbf02 100644
|
||||
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
|
||||
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
|
||||
@@ -17,6 +17,11 @@
|
||||
|
||||
package org.apache.rocketmq.remoting.protocol;
|
||||
|
||||
+import java.util.Arrays;
|
||||
+import java.util.Map;
|
||||
+import java.util.function.Function;
|
||||
+import java.util.stream.Collectors;
|
||||
+
|
||||
public enum LanguageCode {
|
||||
JAVA((byte) 0),
|
||||
CPP((byte) 1),
|
||||
@@ -50,4 +55,10 @@ public enum LanguageCode {
|
||||
public byte getCode() {
|
||||
return code;
|
||||
}
|
||||
+
|
||||
+ private static final Map<String, LanguageCode> MAP = Arrays.stream(LanguageCode.values()).collect(Collectors.toMap(LanguageCode::name, Function.identity()));
|
||||
+
|
||||
+ public static LanguageCode getCode(String language) {
|
||||
+ return MAP.get(language);
|
||||
+ }
|
||||
}
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
|
||||
From 8f020b397a3afdd75429ae91e3624812b8ffc9e1 Mon Sep 17 00:00:00 2001
|
||||
From: Ao Qiao <qiao_ao@foxmail.com>
|
||||
Date: Mon, 23 Oct 2023 16:34:10 +0800
|
||||
Subject: [PATCH 2/2] [ISSUE #7489] Code comment enhancement in example (#7490)
|
||||
|
||||
* Doc: How to debug in Idea
|
||||
|
||||
* en
|
||||
|
||||
* enhance code comment
|
||||
---
|
||||
.../java/org/apache/rocketmq/example/simple/PullConsumer.java | 2 +-
|
||||
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||
|
||||
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
|
||||
index 5ac8d247d..e1a02aa26 100644
|
||||
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
|
||||
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
|
||||
@@ -75,7 +75,7 @@ public class PullConsumer {
|
||||
|
||||
if (msgs != null && !msgs.isEmpty()) {
|
||||
this.doSomething(msgs);
|
||||
- //update offset to broker
|
||||
+ //update offset to local memory, eventually to broker
|
||||
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
|
||||
//print pull tps
|
||||
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
|
||||
--
|
||||
2.32.0.windows.2
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
Summary: Cloud-Native, Distributed Messaging and Streaming
|
||||
Name: rocketmq
|
||||
Version: 5.1.5
|
||||
Release: 28
|
||||
Release: 29
|
||||
License: Apache-2.0
|
||||
Group: Applications/Message
|
||||
URL: https://rocketmq.apache.org/
|
||||
@ -37,6 +37,7 @@ Patch0024: patch024-backport-some-format.patch
|
||||
Patch0025: patch025-backport-Fix-channel-connect-issue.patch
|
||||
Patch0026: patch026-backport-AddBroker-removes-parsing-configuration-from-body.patch
|
||||
Patch0027: patch027-backport-Utilizing-cache-to-avoid-duplicate-parsing.patch
|
||||
Patch0028: patch028-backport-Fix-proxy-client-language-error.patch
|
||||
BuildRequires: java-1.8.0-openjdk-devel, maven, maven-local, git
|
||||
Requires: java-1.8.0-openjdk-devel
|
||||
|
||||
@ -77,6 +78,9 @@ exit 0
|
||||
|
||||
|
||||
%changelog
|
||||
* Fri Dec 8 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-29
|
||||
- backport fix proxy client language error
|
||||
|
||||
* Fri Dec 8 2023 ShiZhili <shizhili_yewu@cmss.chinamobile.com> - 5.1.3-28
|
||||
- backport Utilizing cache to avoid duplicate parsing
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user