diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 12a0fccea8..3c5f63df18 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -112,12 +112,10 @@ public class ProducerPerformance { /* setup perf test */ byte[] payload = null; - Random random = new Random(0); if (recordSize != null) { payload = new byte[recordSize]; - for (int i = 0; i < payload.length; ++i) - payload[i] = (byte) (random.nextInt(26) + 65); } + Random random = new Random(0); ProducerRecord record; Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); @@ -127,15 +125,20 @@ public class ProducerPerformance { int currentTransactionSize = 0; long transactionStartTime = 0; for (long i = 0; i < numRecords; i++) { + if (payloadFilePath != null) { + payload = payloadByteList.get(random.nextInt(payloadByteList.size())); + } else if (recordSize != null) { + for (int j = 0; j < payload.length; ++j) + payload[j] = (byte) (random.nextInt(26) + 65); + } else { + throw new IllegalArgumentException("no payload File Path or record Size provided"); + } + if (transactionsEnabled && currentTransactionSize == 0) { producer.beginTransaction(); transactionStartTime = System.currentTimeMillis(); } - - if (payloadFilePath != null) { - payload = payloadByteList.get(random.nextInt(payloadByteList.size())); - } record = new ProducerRecord<>(topicName, payload); long sendStartMs = System.currentTimeMillis();