102 lines
4.5 KiB
Diff
102 lines
4.5 KiB
Diff
From 547bab53e4956653fbea32d1b88af5dc3b582db4 Mon Sep 17 00:00:00 2001
|
|
From: root <root@localhost.localdomain>
|
|
Date: Mon, 22 Aug 2022 16:05:52 +0800
|
|
Subject: [PATCH] FLUME-3428 - Validate the parameter
|
|
|
|
author Ralph Goers <rgoers@apache.org>
|
|
Sat, 30 Jul 2022 16:16:17 +0800 (01:16 -0700)
|
|
committer Ralph Goers <rgoers@apache.org>
|
|
Sat, 30 Jul 2022 16:16:17 +0800 (01:16 -0700)
|
|
---
|
|
.../flume/source/jms/JMSMessageConsumer.java | 15 +++++++++++++++
|
|
.../source/jms/JMSMessageConsumerTestBase.java | 8 +++++++-
|
|
.../flume/source/jms/TestJMSMessageConsumer.java | 6 ++++++
|
|
3 files changed, 28 insertions(+), 1 deletion(-)
|
|
|
|
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
|
|
index 645cbcc..5375bd0 100644
|
|
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
|
|
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java
|
|
@@ -35,11 +35,14 @@ import javax.jms.Session;
|
|
import javax.jms.Topic;
|
|
import javax.naming.InitialContext;
|
|
import javax.naming.NamingException;
|
|
+import java.net.URI;
|
|
+import java.net.URISyntaxException;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
|
|
class JMSMessageConsumer {
|
|
private static final Logger logger = LoggerFactory.getLogger(JMSMessageConsumer.class);
|
|
+ private static final String JAVA_SCHEME = "java";
|
|
|
|
private final int batchSize;
|
|
private final long pollTimeout;
|
|
@@ -99,6 +102,14 @@ class JMSMessageConsumer {
|
|
throw new IllegalStateException(String.valueOf(destinationType));
|
|
}
|
|
} else {
|
|
+ try {
|
|
+ URI uri = new URI(destinationName);
|
|
+ String scheme = uri.getScheme();
|
|
+ assertTrue(scheme == null || scheme.equals(JAVA_SCHEME),
|
|
+ "Unsupported JNDI URI: " + destinationName);
|
|
+ } catch (URISyntaxException ex) {
|
|
+ logger.warn("Invalid JNDI URI - {}", destinationName);
|
|
+ }
|
|
destination = (Destination) initialContext.lookup(destinationName);
|
|
}
|
|
} catch (JMSException e) {
|
|
@@ -209,4 +220,8 @@ class JMSMessageConsumer {
|
|
logger.error("Could not destroy connection", e);
|
|
}
|
|
}
|
|
+
|
|
+ private void assertTrue(boolean arg, String msg) {
|
|
+ Preconditions.checkArgument(arg, msg);
|
|
+ }
|
|
}
|
|
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
|
|
index b3bce78..aa96458 100644
|
|
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
|
|
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
|
|
@@ -129,11 +129,17 @@ public abstract class JMSMessageConsumerTestBase {
|
|
}
|
|
}
|
|
|
|
- JMSMessageConsumer create() {
|
|
+ JMSMessageConsumer create(JMSDestinationType destinationType,
|
|
+ JMSDestinationLocator destinationLocator, String destinationName) {
|
|
return new JMSMessageConsumer(WONT_USE, connectionFactory, destinationName,
|
|
destinationLocator, destinationType, messageSelector, batchSize,
|
|
pollTimeout, converter, userName, password, Optional.<String>absent(), false, "");
|
|
}
|
|
+
|
|
+ JMSMessageConsumer create() {
|
|
+ return create(this.destinationType, this.destinationLocator, this.destinationName);
|
|
+ }
|
|
+
|
|
@After
|
|
public void tearDown() throws Exception {
|
|
beforeTearDown();
|
|
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
|
|
index e7c5f29..636ffc3 100644
|
|
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
|
|
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
|
|
@@ -93,6 +93,12 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase {
|
|
verify(connection).close();
|
|
}
|
|
}
|
|
+
|
|
+ @Test(expected = IllegalArgumentException.class)
|
|
+ public void testInvalidDestination() throws Exception {
|
|
+ create(null, JMSDestinationLocator.JNDI, "ldap://localhost:389/test");
|
|
+ }
|
|
+
|
|
@Test(expected = IllegalArgumentException.class)
|
|
public void testInvalidBatchSizeZero() throws Exception {
|
|
batchSize = 0;
|
|
--
|
|
2.33.0
|
|
|