kafka/0007-fix-payload-incorrectly.patch

44 lines
2.1 KiB
Diff
Raw Permalink Normal View History

diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 12a0fccea8..3c5f63df18 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -112,12 +112,10 @@ public class ProducerPerformance {
/* setup perf test */
byte[] payload = null;
- Random random = new Random(0);
if (recordSize != null) {
payload = new byte[recordSize];
- for (int i = 0; i < payload.length; ++i)
- payload[i] = (byte) (random.nextInt(26) + 65);
}
+ Random random = new Random(0);
ProducerRecord<byte[], byte[]> record;
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
@@ -127,15 +125,20 @@ public class ProducerPerformance {
int currentTransactionSize = 0;
long transactionStartTime = 0;
for (long i = 0; i < numRecords; i++) {
+ if (payloadFilePath != null) {
+ payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
+ } else if (recordSize != null) {
+ for (int j = 0; j < payload.length; ++j)
+ payload[j] = (byte) (random.nextInt(26) + 65);
+ } else {
+ throw new IllegalArgumentException("no payload File Path or record Size provided");
+ }
+
if (transactionsEnabled && currentTransactionSize == 0) {
producer.beginTransaction();
transactionStartTime = System.currentTimeMillis();
}
-
- if (payloadFilePath != null) {
- payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
- }
record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis();