!49 Cast SMT should allow null value records to pass through
From: @sundapeng001 Reviewed-by: @hu-zongtang Signed-off-by: @hu-zongtang
This commit is contained in:
commit
026ed38997
63
0008-Cast-SMT-allow-null.patch
Normal file
63
0008-Cast-SMT-allow-null.patch
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
|
||||||
|
index e872b336e8..7ffd0a90f3 100644
|
||||||
|
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
|
||||||
|
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
|
||||||
|
@@ -116,6 +116,10 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public R apply(R record) {
|
||||||
|
+ if (operatingValue(record) == null) {
|
||||||
|
+ return record;
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
if (operatingSchema(record) == null) {
|
||||||
|
return applySchemaless(record);
|
||||||
|
} else {
|
||||||
|
diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
|
||||||
|
index ae90c1956b..d25fd8cf2a 100644
|
||||||
|
--- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
|
||||||
|
+++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
|
||||||
|
@@ -88,6 +88,43 @@ public class CastTest {
|
||||||
|
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32")));
|
||||||
|
}
|
||||||
|
|
||||||
|
+ @Test
|
||||||
|
+ public void castNullValueRecordWithSchema() {
|
||||||
|
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||||
|
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||||
|
+ Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null);
|
||||||
|
+ SourceRecord transformed = xformValue.apply(original);
|
||||||
|
+ assertEquals(original, transformed);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ @Test
|
||||||
|
+ public void castNullValueRecordSchemaless() {
|
||||||
|
+ xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||||
|
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||||
|
+ Schema.STRING_SCHEMA, "key", null, null);
|
||||||
|
+ SourceRecord transformed = xformValue.apply(original);
|
||||||
|
+ assertEquals(original, transformed);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ @Test
|
||||||
|
+ public void castNullKeyRecordWithSchema() {
|
||||||
|
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||||
|
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||||
|
+ Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value");
|
||||||
|
+ SourceRecord transformed = xformKey.apply(original);
|
||||||
|
+ assertEquals(original, transformed);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+ @Test
|
||||||
|
+ public void castNullKeyRecordSchemaless() {
|
||||||
|
+ xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||||
|
+ SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||||
|
+ null, null, Schema.STRING_SCHEMA, "value");
|
||||||
|
+ SourceRecord transformed = xformKey.apply(original);
|
||||||
|
+ assertEquals(original, transformed);
|
||||||
|
+ }
|
||||||
|
+
|
||||||
|
+
|
||||||
|
@Test
|
||||||
|
public void castWholeRecordKeyWithSchema() {
|
||||||
|
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
|
||||||
@ -4,7 +4,7 @@
|
|||||||
|
|
||||||
Name: kafka
|
Name: kafka
|
||||||
Version: 2.8.2
|
Version: 2.8.2
|
||||||
Release: 7
|
Release: 8
|
||||||
Summary: A Distributed Streaming Platform.
|
Summary: A Distributed Streaming Platform.
|
||||||
|
|
||||||
License: Apache-2.0
|
License: Apache-2.0
|
||||||
@ -19,6 +19,7 @@ Patch3: 0004-CVE-2022-42004.patch
|
|||||||
Patch4: 0005-CVE-2016-3189.patch
|
Patch4: 0005-CVE-2016-3189.patch
|
||||||
Patch5: 0006-NPE-subscriptionState.patch
|
Patch5: 0006-NPE-subscriptionState.patch
|
||||||
Patch6: 0007-fix-payload-incorrectly.patch
|
Patch6: 0007-fix-payload-incorrectly.patch
|
||||||
|
Patch7: 0008-Cast-SMT-allow-null.patch
|
||||||
|
|
||||||
BuildRequires: systemd java-1.8.0-openjdk-devel
|
BuildRequires: systemd java-1.8.0-openjdk-devel
|
||||||
Provides: kafka = %{version}
|
Provides: kafka = %{version}
|
||||||
@ -70,6 +71,8 @@ cp -pr licenses/* $RPM_BUILD_ROOT%{kafka_home}/licenses
|
|||||||
rm -rf %{buildroot}
|
rm -rf %{buildroot}
|
||||||
|
|
||||||
%changelog
|
%changelog
|
||||||
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-8
|
||||||
|
- Cast SMT should allow null value records to pass through
|
||||||
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-7
|
* Fri Dec 08 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-7
|
||||||
- Fix using random payload in ProducerPerformance incorrectly
|
- Fix using random payload in ProducerPerformance incorrectly
|
||||||
* Mon Nov 27 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-6
|
* Mon Nov 27 2023 sundapeng <sundapeng_yewu@cmss.chinamobile.com> - 2.8.2-6
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user