package org.apache.kafka.tools;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.SplittableRandom;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.tools.ProducerPerformance;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/kafka/tools/ProducerPerformanceTest.class */
public class ProducerPerformanceTest {

    @Mock
    KafkaProducer<byte[], byte[]> producerMock;

    @Spy
    ProducerPerformance producerPerformanceSpy;

    private File createTempFile(String str) throws IOException {
        File createTempFile = File.createTempFile("ProducerPerformanceTest", ".tmp");
        createTempFile.deleteOnExit();
        Files.write(createTempFile.toPath(), str.getBytes(), new OpenOption[0]);
        return createTempFile;
    }

    @Test
    public void testReadPayloadFile() throws Exception {
        List readPayloadFile = ProducerPerformance.readPayloadFile(createTempFile("Hello\nKafka").getAbsolutePath(), "\n");
        Assertions.assertEquals(2, readPayloadFile.size());
        Assertions.assertEquals("Hello", new String((byte[]) readPayloadFile.get(0)));
        Assertions.assertEquals("Kafka", new String((byte[]) readPayloadFile.get(1)));
    }

    @Test
    public void testReadProps() throws Exception {
        Properties readProps = ProducerPerformance.readProps(Collections.singletonList("bootstrap.servers=localhost:9000"), createTempFile("acks=1").getAbsolutePath(), "1234", true);
        Assertions.assertNotNull(readProps);
        Assertions.assertEquals(6, readProps.size());
    }

    @Test
    public void testNumberOfCallsForSendAndClose() throws IOException {
        ((KafkaProducer) Mockito.doReturn((Object) null).when(this.producerMock)).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        ((ProducerPerformance) Mockito.doReturn(this.producerMock).when(this.producerPerformanceSpy)).createKafkaProducer((Properties) ArgumentMatchers.any(Properties.class));
        this.producerPerformanceSpy.start(new String[]{"--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", "100", "--producer-props", "bootstrap.servers=localhost:9000"});
        ((KafkaProducer) Mockito.verify(this.producerMock, Mockito.times(5))).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        ((KafkaProducer) Mockito.verify(this.producerMock, Mockito.times(1))).close();
    }

    @Test
    public void testNumberOfSuccessfulSendAndClose() throws IOException {
        ((ProducerPerformance) Mockito.doReturn(this.producerMock).when(this.producerPerformanceSpy)).createKafkaProducer((Properties) ArgumentMatchers.any(Properties.class));
        ((KafkaProducer) Mockito.doAnswer(invocationOnMock -> {
            this.producerPerformanceSpy.cb.onCompletion((RecordMetadata) null, (Exception) null);
            return null;
        }).when(this.producerMock)).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        this.producerPerformanceSpy.start(new String[]{"--topic", "Hello-Kafka", "--num-records", "10", "--throughput", "1", "--record-size", "100", "--producer-props", "bootstrap.servers=localhost:9000"});
        ((KafkaProducer) Mockito.verify(this.producerMock, Mockito.times(10))).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        Assertions.assertEquals(10L, this.producerPerformanceSpy.stats.totalCount());
        ((KafkaProducer) Mockito.verify(this.producerMock, Mockito.times(1))).close();
    }

    @Test
    public void testNumberOfFailedSendAndClose() throws IOException {
        ((ProducerPerformance) Mockito.doReturn(this.producerMock).when(this.producerPerformanceSpy)).createKafkaProducer((Properties) ArgumentMatchers.any(Properties.class));
        ((KafkaProducer) Mockito.doAnswer(invocationOnMock -> {
            this.producerPerformanceSpy.cb.onCompletion((RecordMetadata) null, new AuthorizationException("not authorized."));
            return null;
        }).when(this.producerMock)).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        this.producerPerformanceSpy.start(new String[]{"--topic", "Hello-Kafka", "--num-records", "10", "--throughput", "1", "--record-size", "100", "--producer-props", "bootstrap.servers=localhost:9000"});
        ((KafkaProducer) Mockito.verify(this.producerMock, Mockito.times(10))).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
        Assertions.assertEquals(0L, this.producerPerformanceSpy.stats.currentWindowCount());
        Assertions.assertEquals(0L, this.producerPerformanceSpy.stats.totalCount());
        ((KafkaProducer) Mockito.verify(this.producerMock, Mockito.times(1))).close();
    }

    @Test
    public void testUnexpectedArg() {
        String[] strArr = {"--test", "test", "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", "100", "--producer-props", "bootstrap.servers=localhost:9000"};
        ArgumentParser argParser = ProducerPerformance.argParser();
        Assertions.assertEquals("unrecognized arguments: '--test'", Assertions.assertThrows(ArgumentParserException.class, () -> {
            argParser.parseArgs(strArr);
        }).getMessage());
    }

    @Test
    public void testGenerateRandomPayloadByPayloadFile() {
        byte[] bytes = "Hello Kafka".getBytes(StandardCharsets.UTF_8);
        ArrayList arrayList = new ArrayList();
        arrayList.add(bytes);
        Assertions.assertEquals("Hello Kafka", new String(ProducerPerformance.generateRandomPayload((Integer) null, arrayList, (byte[]) null, new SplittableRandom(0L))));
    }

    @Test
    public void testGenerateRandomPayloadByRecordSize() {
        Integer num = 100;
        for (byte b : ProducerPerformance.generateRandomPayload(num, new ArrayList(), new byte[num.intValue()], new SplittableRandom(0L))) {
            Assertions.assertNotEquals(0, b);
        }
    }

    @Test
    public void testGenerateRandomPayloadException() {
        Integer num = null;
        byte[] bArr = null;
        ArrayList arrayList = new ArrayList();
        SplittableRandom splittableRandom = new SplittableRandom(0L);
        Assertions.assertEquals("no payload File Path or record Size provided", ((IllegalArgumentException) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            ProducerPerformance.generateRandomPayload(num, arrayList, bArr, splittableRandom);
        })).getMessage());
    }

    @Test
    public void testClientIdOverride() throws Exception {
        Properties readProps = ProducerPerformance.readProps(Collections.singletonList("client.id=producer-1"), (String) null, "1234", true);
        Assertions.assertNotNull(readProps);
        Assertions.assertEquals("producer-1", readProps.getProperty("client.id"));
    }

    @Test
    public void testDefaultClientId() throws Exception {
        Properties readProps = ProducerPerformance.readProps(Collections.singletonList("acks=1"), (String) null, "1234", true);
        Assertions.assertNotNull(readProps);
        Assertions.assertEquals("perf-producer-client", readProps.getProperty("client.id"));
    }

    @Test
    public void testStatsInitializationWithLargeNumRecords() throws Exception {
        long j = Long.MAX_VALUE;
        Assertions.assertDoesNotThrow(() -> {
            return new ProducerPerformance.Stats(j, 5000);
        });
    }

    @Test
    public void testStatsCorrectness() throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ProducerPerformance.Stats stats = new ProducerPerformance.Stats(1000000L, 5000);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 1000000) {
                newSingleThreadExecutor.shutdown();
                Assertions.assertTrue(newSingleThreadExecutor.awaitTermination(60L, TimeUnit.SECONDS), "should have terminated");
                Assertions.assertEquals(1000000L, stats.totalCount());
                Assertions.assertEquals(1000000L, stats.iteration());
                Assertions.assertEquals(500000, stats.index());
                Assertions.assertEquals(100000000L, stats.bytes());
                return;
            }
            ProducerPerformance.PerfCallback perfCallback = new ProducerPerformance.PerfCallback(0L, 100, stats);
            CompletableFuture.runAsync(() -> {
                perfCallback.onCompletion((RecordMetadata) null, (Exception) null);
            }, newSingleThreadExecutor);
            j = j2 + 1;
        }
    }
}
