diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java index e872b336e8..7ffd0a90f3 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java @@ -116,6 +116,10 @@ public abstract class Cast> implements Transformation @Override public R apply(R record) { + if (operatingValue(record) == null) { + return record; + } + if (operatingSchema(record) == null) { return applySchemaless(record); } else { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java index ae90c1956b..d25fd8cf2a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java @@ -88,6 +88,43 @@ public class CastTest { assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32"))); } + @Test + public void castNullValueRecordWithSchema() { + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null); + SourceRecord transformed = xformValue.apply(original); + assertEquals(original, transformed); + } + + @Test + public void castNullValueRecordSchemaless() { + xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + Schema.STRING_SCHEMA, "key", null, null); + SourceRecord transformed = xformValue.apply(original); + assertEquals(original, transformed); + } + + @Test + public void castNullKeyRecordWithSchema() { + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value"); + SourceRecord transformed = xformKey.apply(original); + assertEquals(original, transformed); + } + + @Test + public void castNullKeyRecordSchemaless() { + xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64")); + SourceRecord original = new SourceRecord(null, null, "topic", 0, + null, null, Schema.STRING_SCHEMA, "value"); + SourceRecord transformed = xformKey.apply(original); + assertEquals(original, transformed); + } + + @Test public void castWholeRecordKeyWithSchema() { xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));