/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.consumer;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValueJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKeyJsonConverter;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValueJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.tools.consumer.ConsoleConsumer;
import org.apache.kafka.tools.consumer.ConsoleConsumerOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.verification.VerificationMode;

public class ConsoleConsumerTest {
    private final String topic = "test-topic";
    private final String groupId = "test-group";
    private final String transactionId = "transactional-id";
    private final ObjectMapper objectMapper = new ObjectMapper();

    @BeforeEach
    public void setup() {
        ConsoleConsumer.messageCount = 0;
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenTimeoutIsReached() throws IOException {
        MockTime time = new MockTime();
        int timeoutMs = 1000;
        Consumer mockConsumer = (Consumer)Mockito.mock(Consumer.class);
        Mockito.when((Object)mockConsumer.poll(Duration.ofMillis(1000L))).thenAnswer(arg_0 -> ConsoleConsumerTest.lambda$shouldThrowTimeoutExceptionWhenTimeoutIsReached$0((Time)time, arg_0));
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "--topic", "test", "--timeout-ms", String.valueOf(1000)};
        ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
        Assertions.assertThrows(TimeoutException.class, () -> ((ConsoleConsumer.ConsumerWrapper)consumer).receive());
    }

    @Test
    public void shouldResetUnConsumedOffsetsBeforeExit() throws IOException {
        String topic = "test";
        int maxMessages = 123;
        int totalMessages = 700;
        long startOffset = 0L;
        MockConsumer mockConsumer = new MockConsumer(AutoOffsetResetStrategy.EARLIEST.name());
        TopicPartition tp1 = new TopicPartition(topic, 0);
        TopicPartition tp2 = new TopicPartition(topic, 1);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "--topic", topic, "--timeout-ms", "1000"};
        ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), (Consumer)mockConsumer);
        mockConsumer.rebalance(Arrays.asList(tp1, tp2));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp1, startOffset);
        offsets.put(tp2, startOffset);
        mockConsumer.updateBeginningOffsets(offsets);
        for (int i = 0; i < totalMessages; ++i) {
            mockConsumer.addRecord(new ConsumerRecord(topic, i % 2, (long)(i / 2), (Object)"key".getBytes(), (Object)"value".getBytes()));
        }
        MessageFormatter formatter = (MessageFormatter)Mockito.mock(MessageFormatter.class);
        ConsoleConsumer.process((int)maxMessages, (MessageFormatter)formatter, (ConsoleConsumer.ConsumerWrapper)consumer, (PrintStream)System.out, (boolean)false);
        Assertions.assertEquals((long)totalMessages, (long)(mockConsumer.position(tp1) + mockConsumer.position(tp2)));
        consumer.resetUnconsumedOffsets();
        Assertions.assertEquals((long)maxMessages, (long)(mockConsumer.position(tp1) + mockConsumer.position(tp2)));
        ((MessageFormatter)Mockito.verify((Object)formatter, (VerificationMode)Mockito.times((int)maxMessages))).writeTo((ConsumerRecord)ArgumentMatchers.any(), (PrintStream)ArgumentMatchers.any());
        consumer.cleanup();
    }

    @Test
    public void shouldLimitReadsToMaxMessageLimit() {
        ConsoleConsumer.ConsumerWrapper consumer = (ConsoleConsumer.ConsumerWrapper)Mockito.mock(ConsoleConsumer.ConsumerWrapper.class);
        MessageFormatter formatter = (MessageFormatter)Mockito.mock(MessageFormatter.class);
        ConsumerRecord record = new ConsumerRecord("foo", 1, 1L, (Object)new byte[0], (Object)new byte[0]);
        int messageLimit = 10;
        Mockito.when((Object)consumer.receive()).thenReturn((Object)record);
        ConsoleConsumer.process((int)messageLimit, (MessageFormatter)formatter, (ConsoleConsumer.ConsumerWrapper)consumer, (PrintStream)System.out, (boolean)true);
        ((ConsoleConsumer.ConsumerWrapper)Mockito.verify((Object)consumer, (VerificationMode)Mockito.times((int)messageLimit))).receive();
        ((MessageFormatter)Mockito.verify((Object)formatter, (VerificationMode)Mockito.times((int)messageLimit))).writeTo((ConsumerRecord)ArgumentMatchers.any(), (PrintStream)ArgumentMatchers.any());
        consumer.cleanup();
    }

    @Test
    public void shouldStopWhenOutputCheckErrorFails() {
        ConsoleConsumer.ConsumerWrapper consumer = (ConsoleConsumer.ConsumerWrapper)Mockito.mock(ConsoleConsumer.ConsumerWrapper.class);
        MessageFormatter formatter = (MessageFormatter)Mockito.mock(MessageFormatter.class);
        PrintStream printStream = (PrintStream)Mockito.mock(PrintStream.class);
        ConsumerRecord record = new ConsumerRecord("foo", 1, 1L, (Object)new byte[0], (Object)new byte[0]);
        Mockito.when((Object)consumer.receive()).thenReturn((Object)record);
        Mockito.when((Object)printStream.checkError()).thenReturn((Object)true);
        ConsoleConsumer.process((int)-1, (MessageFormatter)formatter, (ConsoleConsumer.ConsumerWrapper)consumer, (PrintStream)printStream, (boolean)true);
        ((MessageFormatter)Mockito.verify((Object)formatter)).writeTo((ConsumerRecord)ArgumentMatchers.any(), (PrintStream)ArgumentMatchers.eq((Object)printStream));
        ((ConsoleConsumer.ConsumerWrapper)Mockito.verify((Object)consumer)).receive();
        ((PrintStream)Mockito.verify((Object)printStream)).checkError();
        consumer.cleanup();
    }

    @Test
    public void shouldSeekWhenOffsetIsSet() throws IOException {
        Consumer mockConsumer = (Consumer)Mockito.mock(Consumer.class);
        TopicPartition tp0 = new TopicPartition("test", 0);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "--topic", tp0.topic(), "--partition", String.valueOf(tp0.partition()), "--timeout-ms", "1000"};
        ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
        ((Consumer)Mockito.verify((Object)mockConsumer)).assign((Collection)ArgumentMatchers.eq(Collections.singletonList(tp0)));
        ((Consumer)Mockito.verify((Object)mockConsumer)).seekToEnd((Collection)ArgumentMatchers.eq(Collections.singletonList(tp0)));
        consumer.cleanup();
        Mockito.reset((Object[])new Consumer[]{mockConsumer});
        args = new String[]{"--bootstrap-server", "localhost:9092", "--topic", tp0.topic(), "--partition", String.valueOf(tp0.partition()), "--offset", "123", "--timeout-ms", "1000"};
        consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
        ((Consumer)Mockito.verify((Object)mockConsumer)).assign((Collection)ArgumentMatchers.eq(Collections.singletonList(tp0)));
        ((Consumer)Mockito.verify((Object)mockConsumer)).seek((TopicPartition)ArgumentMatchers.eq((Object)tp0), ArgumentMatchers.eq((long)123L));
        consumer.cleanup();
        Mockito.reset((Object[])new Consumer[]{mockConsumer});
        args = new String[]{"--bootstrap-server", "localhost:9092", "--topic", tp0.topic(), "--partition", String.valueOf(tp0.partition()), "--offset", "earliest", "--timeout-ms", "1000"};
        consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
        ((Consumer)Mockito.verify((Object)mockConsumer)).assign((Collection)ArgumentMatchers.eq(Collections.singletonList(tp0)));
        ((Consumer)Mockito.verify((Object)mockConsumer)).seekToBeginning((Collection)ArgumentMatchers.eq(Collections.singletonList(tp0)));
        consumer.cleanup();
        Mockito.reset((Object[])new Consumer[]{mockConsumer});
    }

    @Test
    public void shouldWorkWithoutTopicOption() throws IOException {
        Consumer mockConsumer = (Consumer)Mockito.mock(Consumer.class);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "--include", "includeTest*", "--from-beginning"};
        ConsoleConsumer.ConsumerWrapper consumer = new ConsoleConsumer.ConsumerWrapper(new ConsoleConsumerOptions(args), mockConsumer);
        ((Consumer)Mockito.verify((Object)mockConsumer)).subscribe((Pattern)ArgumentMatchers.any(Pattern.class));
        consumer.cleanup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ClusterTest(brokers=3)
    public void testTransactionLogMessageFormatter(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            NewTopic newTopic = new NewTopic("test-topic", 1, 1);
            admin.createTopics(Collections.singleton(newTopic));
            this.produceMessagesWithTxn(cluster);
            String[] transactionLogMessageFormatter = this.createConsoleConsumerArgs(cluster, "__transaction_state", "org.apache.kafka.tools.consumer.TransactionLogMessageFormatter");
            ConsoleConsumerOptions options = new ConsoleConsumerOptions(transactionLogMessageFormatter);
            ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(options, this.createTxnConsumer(cluster));
            try (ByteArrayOutputStream out = new ByteArrayOutputStream();
                 PrintStream output = new PrintStream(out);){
                ConsoleConsumer.process((int)1, (MessageFormatter)options.formatter(), (ConsoleConsumer.ConsumerWrapper)consumerWrapper, (PrintStream)output, (boolean)true);
                JsonNode jsonNode = this.objectMapper.reader().readTree(out.toByteArray());
                JsonNode keyNode = jsonNode.get("key");
                TransactionLogKey logKey = TransactionLogKeyJsonConverter.read((JsonNode)keyNode.get("data"), (short)0);
                Assertions.assertNotNull((Object)logKey);
                Assertions.assertEquals((Object)"transactional-id", (Object)logKey.transactionalId());
                JsonNode valueNode = jsonNode.get("value");
                TransactionLogValue logValue = TransactionLogValueJsonConverter.read((JsonNode)valueNode.get("data"), (short)1);
                Assertions.assertNotNull((Object)logValue);
                Assertions.assertEquals((long)0L, (long)logValue.producerId());
                Assertions.assertEquals((int)0, (int)logValue.transactionStatus());
            }
            finally {
                consumerWrapper.cleanup();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ClusterTest(brokers=3)
    public void testOffsetsMessageFormatter(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            NewTopic newTopic = new NewTopic("test-topic", 1, 1);
            admin.createTopics(Collections.singleton(newTopic));
            this.produceMessages(cluster);
            String[] offsetsMessageFormatter = this.createConsoleConsumerArgs(cluster, "__consumer_offsets", "org.apache.kafka.tools.consumer.OffsetsMessageFormatter");
            ConsoleConsumerOptions options = new ConsoleConsumerOptions(offsetsMessageFormatter);
            ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(options, this.createOffsetConsumer(cluster));
            try (ByteArrayOutputStream out = new ByteArrayOutputStream();
                 PrintStream output = new PrintStream(out);){
                ConsoleConsumer.process((int)1, (MessageFormatter)options.formatter(), (ConsoleConsumer.ConsumerWrapper)consumerWrapper, (PrintStream)output, (boolean)true);
                JsonNode jsonNode = this.objectMapper.reader().readTree(out.toByteArray());
                JsonNode keyNode = jsonNode.get("key");
                OffsetCommitKey offsetCommitKey = OffsetCommitKeyJsonConverter.read((JsonNode)keyNode.get("data"), (short)0);
                Assertions.assertNotNull((Object)offsetCommitKey);
                Assertions.assertEquals((Object)"__consumer_offsets", (Object)offsetCommitKey.topic());
                Assertions.assertEquals((Object)"test-group", (Object)offsetCommitKey.group());
                JsonNode valueNode = jsonNode.get("value");
                OffsetCommitValue offsetCommitValue = OffsetCommitValueJsonConverter.read((JsonNode)valueNode.get("data"), (short)4);
                Assertions.assertNotNull((Object)offsetCommitValue);
                Assertions.assertEquals((long)0L, (long)offsetCommitValue.offset());
                Assertions.assertEquals((int)-1, (int)offsetCommitValue.leaderEpoch());
                Assertions.assertNotNull((Object)offsetCommitValue.metadata());
                Assertions.assertEquals((long)-1L, (long)offsetCommitValue.expireTimestamp());
            }
            finally {
                consumerWrapper.cleanup();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ClusterTest(brokers=3)
    public void testGroupMetadataMessageFormatter(ClusterInstance cluster) throws Exception {
        try (Admin admin = cluster.admin();){
            NewTopic newTopic = new NewTopic("test-topic", 1, 1);
            admin.createTopics(Collections.singleton(newTopic));
            this.produceMessages(cluster);
            String[] groupMetadataMessageFormatter = this.createConsoleConsumerArgs(cluster, "__consumer_offsets", "org.apache.kafka.tools.consumer.GroupMetadataMessageFormatter");
            ConsoleConsumerOptions options = new ConsoleConsumerOptions(groupMetadataMessageFormatter);
            ConsoleConsumer.ConsumerWrapper consumerWrapper = new ConsoleConsumer.ConsumerWrapper(options, this.createGroupMetaDataConsumer(cluster));
            try (ByteArrayOutputStream out = new ByteArrayOutputStream();
                 PrintStream output = new PrintStream(out);){
                ConsoleConsumer.process((int)1, (MessageFormatter)options.formatter(), (ConsoleConsumer.ConsumerWrapper)consumerWrapper, (PrintStream)output, (boolean)true);
                JsonNode jsonNode = this.objectMapper.reader().readTree(out.toByteArray());
                JsonNode keyNode = jsonNode.get("key");
                GroupMetadataKey groupMetadataKey = GroupMetadataKeyJsonConverter.read((JsonNode)keyNode.get("data"), (short)0);
                Assertions.assertNotNull((Object)groupMetadataKey);
                Assertions.assertEquals((Object)"test-group", (Object)groupMetadataKey.group());
                JsonNode valueNode = jsonNode.get("value");
                GroupMetadataValue groupMetadataValue = GroupMetadataValueJsonConverter.read((JsonNode)valueNode.get("data"), (short)4);
                Assertions.assertNotNull((Object)groupMetadataValue);
                Assertions.assertEquals((Object)"", (Object)groupMetadataValue.protocolType());
                Assertions.assertEquals((int)0, (int)groupMetadataValue.generation());
                Assertions.assertNull((Object)groupMetadataValue.protocol());
                Assertions.assertNull((Object)groupMetadataValue.leader());
                Assertions.assertEquals((int)0, (int)groupMetadataValue.members().size());
            }
            finally {
                consumerWrapper.cleanup();
            }
        }
    }

    private void produceMessagesWithTxn(ClusterInstance cluster) {
        try (Producer<byte[], byte[]> producer = this.createTxnProducer(cluster);){
            producer.initTransactions();
            producer.beginTransaction();
            producer.send(new ProducerRecord("test-topic", (Object)new byte[100000]));
            producer.commitTransaction();
        }
    }

    private void produceMessages(ClusterInstance cluster) {
        try (KafkaProducer producer = new KafkaProducer(this.producerProps(cluster));){
            producer.send(new ProducerRecord("test-topic", (Object)new byte[100000]));
        }
    }

    private String[] createConsoleConsumerArgs(ClusterInstance cluster, String topic, String formatter) {
        return new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--topic", topic, "--formatter", formatter};
    }

    private Producer<byte[], byte[]> createTxnProducer(ClusterInstance cluster) {
        Properties props = this.producerProps(cluster);
        props.put("enable.idempotence", "true");
        props.put("acks", "all");
        props.put("transactional.id", "transactional-id");
        return new KafkaProducer(props);
    }

    private Consumer<byte[], byte[]> createTxnConsumer(ClusterInstance cluster) {
        Properties props = this.consumerProps(cluster);
        props.put("isolation.level", "read_committed");
        props.put("auto.offset.reset", "earliest");
        return new KafkaConsumer(props);
    }

    private Consumer<byte[], byte[]> createOffsetConsumer(ClusterInstance cluster) {
        Properties props = this.consumerProps(cluster);
        props.put("exclude.internal.topics", "false");
        return new KafkaConsumer(props);
    }

    private Consumer<byte[], byte[]> createGroupMetaDataConsumer(ClusterInstance cluster) {
        Properties props = this.consumerProps(cluster);
        props.put("auto.offset.reset", "earliest");
        return new KafkaConsumer(props);
    }

    private Properties producerProps(ClusterInstance cluster) {
        Properties props = new Properties();
        props.put("bootstrap.servers", cluster.bootstrapServers());
        props.put("key.serializer", ByteArraySerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        return props;
    }

    private Properties consumerProps(ClusterInstance cluster) {
        Properties props = new Properties();
        props.put("bootstrap.servers", cluster.bootstrapServers());
        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        props.put("partition.assignment.strategy", RangeAssignor.class.getName());
        props.put("group.id", "test-group");
        return props;
    }

    private static /* synthetic */ Object lambda$shouldThrowTimeoutExceptionWhenTimeoutIsReached$0(Time time, InvocationOnMock invocation) throws Throwable {
        time.sleep(501L);
        return ConsumerRecords.EMPTY;
    }
}

