From 547bab53e4956653fbea32d1b88af5dc3b582db4 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 22 Aug 2022 16:05:52 +0800 Subject: [PATCH] FLUME-3428 - Validate the parameter author Ralph Goers Sat, 30 Jul 2022 16:16:17 +0800 (01:16 -0700) committer Ralph Goers Sat, 30 Jul 2022 16:16:17 +0800 (01:16 -0700) --- .../flume/source/jms/JMSMessageConsumer.java | 15 +++++++++++++++ .../source/jms/JMSMessageConsumerTestBase.java | 8 +++++++- .../flume/source/jms/TestJMSMessageConsumer.java | 6 ++++++ 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java index 645cbcc..5375bd0 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java @@ -35,11 +35,14 @@ import javax.jms.Session; import javax.jms.Topic; import javax.naming.InitialContext; import javax.naming.NamingException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; class JMSMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(JMSMessageConsumer.class); + private static final String JAVA_SCHEME = "java"; private final int batchSize; private final long pollTimeout; @@ -99,6 +102,14 @@ class JMSMessageConsumer { throw new IllegalStateException(String.valueOf(destinationType)); } } else { + try { + URI uri = new URI(destinationName); + String scheme = uri.getScheme(); + assertTrue(scheme == null || scheme.equals(JAVA_SCHEME), + "Unsupported JNDI URI: " + destinationName); + } catch (URISyntaxException ex) { + logger.warn("Invalid JNDI URI - {}", destinationName); + } destination = (Destination) initialContext.lookup(destinationName); } } catch (JMSException e) { @@ -209,4 +220,8 @@ class JMSMessageConsumer { logger.error("Could not destroy connection", e); } } + + private void assertTrue(boolean arg, String msg) { + Preconditions.checkArgument(arg, msg); + } } diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java index b3bce78..aa96458 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java @@ -129,11 +129,17 @@ public abstract class JMSMessageConsumerTestBase { } } - JMSMessageConsumer create() { + JMSMessageConsumer create(JMSDestinationType destinationType, + JMSDestinationLocator destinationLocator, String destinationName) { return new JMSMessageConsumer(WONT_USE, connectionFactory, destinationName, destinationLocator, destinationType, messageSelector, batchSize, pollTimeout, converter, userName, password, Optional.absent(), false, ""); } + + JMSMessageConsumer create() { + return create(this.destinationType, this.destinationLocator, this.destinationName); + } + @After public void tearDown() throws Exception { beforeTearDown(); diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java index e7c5f29..636ffc3 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java @@ -93,6 +93,12 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase { verify(connection).close(); } } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDestination() throws Exception { + create(null, JMSDestinationLocator.JNDI, "ldap://localhost:389/test"); + } + @Test(expected = IllegalArgumentException.class) public void testInvalidBatchSizeZero() throws Exception { batchSize = 0; -- 2.33.0