package org.apache.kafka.tools.consumer;

import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.tools.consumer.ConsoleConsumer;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/tools/consumer/ConsoleConsumerTest.class */
public class ConsoleConsumerTest {
    @BeforeEach
    public void setup() {
        ConsoleConsumer.messageCount = 0;
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() throws IOException {
        MockTime mockTime = new MockTime();
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        Mockito.when(consumer.poll(Duration.ofMillis(1000L))).thenAnswer(invocationOnMock -> {
            mockTime.sleep(501L);
            return ConsumerRecords.EMPTY;
        });
        ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(new String[]{"--bootstrap-server", "localhost:9092", "--topic", "test", "--timeout-ms", String.valueOf(1000)}), consumer);
        consumerWrapper.getClass();
        Assertions.assertThrows(TimeoutException.class, consumerWrapper::receive);
    }

    @Test
    public void shouldResetUnConsumedOffsetsBeforeExit() throws IOException {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        TopicPartition topicPartition = new TopicPartition("test", 0);
        TopicPartition topicPartition2 = new TopicPartition("test", 1);
        ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(new String[]{"--bootstrap-server", "localhost:9092", "--topic", "test", "--timeout-ms", "1000"}), mockConsumer);
        mockConsumer.rebalance(Arrays.asList(topicPartition, topicPartition2));
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 0L);
        hashMap.put(topicPartition2, 0L);
        mockConsumer.updateBeginningOffsets(hashMap);
        for (int i = 0; i < 700; i++) {
            mockConsumer.addRecord(new ConsumerRecord("test", i % 2, i / 2, "key".getBytes(), "value".getBytes()));
        }
        MessageFormatter messageFormatter = (MessageFormatter) Mockito.mock(MessageFormatter.class);
        ConsoleConsumer.process(123, messageFormatter, consumerWrapper, System.out, false);
        Assertions.assertEquals(700, mockConsumer.position(topicPartition) + mockConsumer.position(topicPartition2));
        consumerWrapper.resetUnconsumedOffsets();
        Assertions.assertEquals(123, mockConsumer.position(topicPartition) + mockConsumer.position(topicPartition2));
        ((MessageFormatter) Mockito.verify(messageFormatter, Mockito.times(123))).writeTo((ConsumerRecord) ArgumentMatchers.any(), (PrintStream) ArgumentMatchers.any());
        consumerWrapper.cleanup();
    }

    @Test
    public void shouldLimitReadsToMaxMessageLimit() {
        ConsoleConsumer.ConsumerWrapper consumerWrapper = (ConsoleConsumer.ConsumerWrapper) Mockito.mock(ConsoleConsumer.ConsumerWrapper.class);
        MessageFormatter messageFormatter = (MessageFormatter) Mockito.mock(MessageFormatter.class);
        Mockito.when(consumerWrapper.receive()).thenReturn(new ConsumerRecord(ConsumerGroupCommandTest.TOPIC, 1, 1L, new byte[0], new byte[0]));
        ConsoleConsumer.process(10, messageFormatter, consumerWrapper, System.out, true);
        ((ConsoleConsumer.ConsumerWrapper) Mockito.verify(consumerWrapper, Mockito.times(10))).receive();
        ((MessageFormatter) Mockito.verify(messageFormatter, Mockito.times(10))).writeTo((ConsumerRecord) ArgumentMatchers.any(), (PrintStream) ArgumentMatchers.any());
        consumerWrapper.cleanup();
    }

    @Test
    public void shouldStopWhenOutputCheckErrorFails() {
        ConsoleConsumer.ConsumerWrapper consumerWrapper = (ConsoleConsumer.ConsumerWrapper) Mockito.mock(ConsoleConsumer.ConsumerWrapper.class);
        MessageFormatter messageFormatter = (MessageFormatter) Mockito.mock(MessageFormatter.class);
        PrintStream printStream = (PrintStream) Mockito.mock(PrintStream.class);
        Mockito.when(consumerWrapper.receive()).thenReturn(new ConsumerRecord(ConsumerGroupCommandTest.TOPIC, 1, 1L, new byte[0], new byte[0]));
        Mockito.when(Boolean.valueOf(printStream.checkError())).thenReturn(true);
        ConsoleConsumer.process(-1, messageFormatter, consumerWrapper, printStream, true);
        ((MessageFormatter) Mockito.verify(messageFormatter)).writeTo((ConsumerRecord) ArgumentMatchers.any(), (PrintStream) ArgumentMatchers.eq(printStream));
        ((ConsoleConsumer.ConsumerWrapper) Mockito.verify(consumerWrapper)).receive();
        ((PrintStream) Mockito.verify(printStream)).checkError();
        consumerWrapper.cleanup();
    }

    @Test
    public void shouldSeekWhenOffsetIsSet() throws IOException {
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        TopicPartition topicPartition = new TopicPartition("test", 0);
        ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(new String[]{"--bootstrap-server", "localhost:9092", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--timeout-ms", "1000"}), consumer);
        ((Consumer) Mockito.verify(consumer)).assign((Collection) ArgumentMatchers.eq(Collections.singletonList(topicPartition)));
        ((Consumer) Mockito.verify(consumer)).seekToEnd((Collection) ArgumentMatchers.eq(Collections.singletonList(topicPartition)));
        consumerWrapper.cleanup();
        Mockito.reset(new Consumer[]{consumer});
        ConsoleConsumer.ConsumerWrapper consumerWrapper2 = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(new String[]{"--bootstrap-server", "localhost:9092", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--offset", "123", "--timeout-ms", "1000"}), consumer);
        ((Consumer) Mockito.verify(consumer)).assign((Collection) ArgumentMatchers.eq(Collections.singletonList(topicPartition)));
        ((Consumer) Mockito.verify(consumer)).seek((TopicPartition) ArgumentMatchers.eq(topicPartition), ArgumentMatchers.eq(123L));
        consumerWrapper2.cleanup();
        Mockito.reset(new Consumer[]{consumer});
        ConsoleConsumer.ConsumerWrapper consumerWrapper3 = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(new String[]{"--bootstrap-server", "localhost:9092", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--offset", "earliest", "--timeout-ms", "1000"}), consumer);
        ((Consumer) Mockito.verify(consumer)).assign((Collection) ArgumentMatchers.eq(Collections.singletonList(topicPartition)));
        ((Consumer) Mockito.verify(consumer)).seekToBeginning((Collection) ArgumentMatchers.eq(Collections.singletonList(topicPartition)));
        consumerWrapper3.cleanup();
        Mockito.reset(new Consumer[]{consumer});
    }

    @Test
    public void shouldWorkWithoutTopicOption() throws IOException {
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(new String[]{"--bootstrap-server", "localhost:9092", "--include", "includeTest*", "--from-beginning"}), consumer);
        ((Consumer) Mockito.verify(consumer)).subscribe((Pattern) ArgumentMatchers.any(Pattern.class));
        consumerWrapper.cleanup();
    }
}
