/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.GetOffsetShell;
import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(serverProperties={@ClusterConfigProperty(key="auto.create.topics.enable", value="false"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="offsets.topic.num.partitions", value="4")})
public class GetOffsetShellTest {
    private final int topicCount = 4;
    private final ClusterInstance cluster;

    public GetOffsetShellTest(ClusterInstance cluster) {
        this.cluster = cluster;
    }

    private String getTopicName(int i) {
        return "topic" + i;
    }

    private String getRemoteLogStorageEnabledTopicName(int i) {
        return "topicRLS" + i;
    }

    private void setUp() {
        this.setupTopics(this::getTopicName, Collections.emptyMap());
        this.sendProducerRecords(this::getTopicName);
    }

    private void setUpRemoteLogTopics() {
        HashMap<String, String> rlsConfigs = new HashMap<String, String>();
        rlsConfigs.put("remote.storage.enable", "true");
        rlsConfigs.put("local.retention.bytes", "1");
        rlsConfigs.put("internal.segment.bytes", "100");
        this.setupTopics(this::getRemoteLogStorageEnabledTopicName, rlsConfigs);
        this.sendProducerRecords(this::getRemoteLogStorageEnabledTopicName);
    }

    private void setupTopics(Function<Integer, String> topicName, Map<String, String> configs) {
        try (Admin admin = this.cluster.admin();){
            ArrayList topics = new ArrayList();
            IntStream.range(0, 5).forEach(i -> topics.add(new NewTopic((String)topicName.apply(i), i, 1).configs(configs)));
            admin.createTopics(topics);
        }
    }

    private void sendProducerRecords(Function<Integer, String> topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.cluster.bootstrapServers());
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        try (KafkaProducer producer = new KafkaProducer(props);){
            IntStream.range(0, 5).forEach(i -> IntStream.range(0, i * i).forEach(msgCount -> Assertions.assertDoesNotThrow(() -> (RecordMetadata)producer.send(new ProducerRecord((String)topicName.apply(i), Integer.valueOf(msgCount % i), null, (Object)("val" + msgCount))).get())));
        }
    }

    private static List<ClusterConfig> withRemoteStorage() {
        HashMap<String, String> serverProperties = new HashMap<String, String>();
        serverProperties.put("rlmm.config.remote.log.metadata.topic.replication.factor", "1");
        serverProperties.put("rlmm.config.remote.log.metadata.topic.num.partitions", "1");
        serverProperties.put("remote.log.storage.system.enable", "true");
        serverProperties.put("remote.log.storage.manager.class.name", LocalTieredStorage.class.getName());
        serverProperties.put("remote.log.manager.task.interval.ms", "1000");
        serverProperties.put("log.retention.check.interval.ms", "1000");
        serverProperties.put("log.initial.task.delay.ms", "100");
        serverProperties.put("remote.log.metadata.manager.listener.name", "EXTERNAL");
        return Collections.singletonList(ClusterConfig.defaultBuilder().setTypes(Stream.of(Type.KRAFT, Type.CO_KRAFT).collect(Collectors.toSet())).setServerProperties(serverProperties).build());
    }

    private void createConsumerAndPoll() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.cluster.bootstrapServers());
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        props.put("group.id", "test-consumer-group");
        props.put("auto.offset.reset", "earliest");
        try (KafkaConsumer consumer = new KafkaConsumer(props);){
            ArrayList<String> topics = new ArrayList<String>();
            for (int i = 0; i < 5; ++i) {
                topics.add(this.getTopicName(i));
            }
            consumer.subscribe(topics);
            consumer.poll(Duration.ofMillis(1000L));
        }
    }

    @ClusterTest
    public void testNoFilterOptions() {
        this.setUp();
        List<Row> output = this.executeAndParse(new String[0]);
        Assertions.assertEquals(this.expectedTestTopicOffsets(), output);
    }

    @ClusterTest
    public void testInternalExcluded() {
        this.setUp();
        List<Row> output = this.executeAndParse("--exclude-internal-topics");
        Assertions.assertEquals(this.expectedTestTopicOffsets(), output);
    }

    @ClusterTest
    public void testTopicNameArg() {
        this.setUp();
        IntStream.range(1, 5).forEach(i -> {
            List<Row> offsets = this.executeAndParse("--topic", this.getTopicName(i));
            Assertions.assertEquals(this.expectedOffsetsForTopic(i), offsets, () -> "Offset output did not match for " + this.getTopicName(i));
        });
    }

    @ClusterTest
    public void testTopicPatternArg() {
        this.setUp();
        List<Row> offsets = this.executeAndParse("--topic", "topic.*");
        Assertions.assertEquals(this.expectedTestTopicOffsets(), offsets);
    }

    @ClusterTest
    public void testPartitionsArg() {
        this.setUp();
        List<Row> offsets = this.executeAndParse("--partitions", "0,1");
        Assertions.assertEquals(this.expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
    }

    @ClusterTest
    public void testTopicPatternArgWithPartitionsArg() {
        this.setUp();
        List<Row> offsets = this.executeAndParse("--topic", "topic.*", "--partitions", "0,1");
        Assertions.assertEquals(this.expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
    }

    @ClusterTest
    public void testTopicPartitionsArg() {
        this.setUp();
        this.createConsumerAndPoll();
        List<Row> offsets = this.executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3");
        List<Row> expected = Arrays.asList(new Row("__consumer_offsets", 3, 0L), new Row("topic1", 0, 1L), new Row("topic2", 1, 2L), new Row("topic3", 2, 3L), new Row("topic4", 2, 4L));
        Assertions.assertEquals(expected, offsets);
    }

    @ClusterTest
    public void testGetLatestOffsets() {
        this.setUp();
        for (String time : new String[]{"-1", "latest"}) {
            List<Row> offsets = this.executeAndParse("--topic-partitions", "topic.*:0", "--time", time);
            List<Row> expected = Arrays.asList(new Row("topic1", 0, 1L), new Row("topic2", 0, 2L), new Row("topic3", 0, 3L), new Row("topic4", 0, 4L));
            Assertions.assertEquals(expected, offsets);
        }
    }

    @ClusterTest
    public void testGetEarliestOffsets() {
        this.setUp();
        for (String time : new String[]{"-2", "earliest"}) {
            List<Row> offsets = this.executeAndParse("--topic-partitions", "topic.*:0", "--time", time);
            List<Row> expected = Arrays.asList(new Row("topic1", 0, 0L), new Row("topic2", 0, 0L), new Row("topic3", 0, 0L), new Row("topic4", 0, 0L));
            Assertions.assertEquals(expected, offsets);
        }
    }

    @ClusterTest
    public void testGetOffsetsByMaxTimestamp() {
        this.setUp();
        for (String time : new String[]{"-3", "max-timestamp"}) {
            List<Row> offsets = this.executeAndParse("--topic-partitions", "topic.*", "--time", time);
            offsets.forEach(row -> Assertions.assertTrue((row.offset >= 0L && row.offset <= (long)Integer.parseInt(row.name.replace("topic", "")) ? 1 : 0) != 0));
        }
    }

    @ClusterTemplate(value="withRemoteStorage")
    public void testGetOffsetsByEarliestLocalSpec() throws InterruptedException {
        this.setUp();
        this.setUpRemoteLogTopics();
        for (String time : new String[]{"-4", "earliest-local"}) {
            TestUtils.waitForCondition(() -> Arrays.asList(new Row("topic1", 0, 0L), new Row("topic2", 0, 0L), new Row("topic3", 0, 0L), new Row("topic4", 0, 0L)).equals(this.executeAndParse("--topic-partitions", "topic\\d+.*:0", "--time", time)), (String)"testGetOffsetsByEarliestLocalSpec get topics with remote log disabled result not match");
            TestUtils.waitForCondition(() -> Arrays.asList(new Row("topicRLS1", 0, 0L), new Row("topicRLS2", 0, 1L), new Row("topicRLS3", 0, 2L), new Row("topicRLS4", 0, 3L)).equals(this.executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)), (String)"testGetOffsetsByEarliestLocalSpec get topics with remote log enabled result not match");
        }
    }

    @ClusterTemplate(value="withRemoteStorage")
    public void testGetOffsetsByLatestTieredSpec() throws InterruptedException {
        this.setUp();
        this.setUpRemoteLogTopics();
        for (String time : new String[]{"-5", "latest-tiered"}) {
            Assertions.assertEquals(Collections.emptyList(), this.executeAndParse("--topic-partitions", "topic\\d+:0", "--time", time));
            TestUtils.waitForCondition(() -> Arrays.asList(new Row("topicRLS2", 0, 0L), new Row("topicRLS3", 0, 1L), new Row("topicRLS4", 0, 2L)).equals(this.executeAndParse("--topic-partitions", "topicRLS.*:0", "--time", time)), (String)"testGetOffsetsByLatestTieredSpec result not match");
        }
    }

    @ClusterTest
    public void testGetOffsetsByTimestamp() {
        this.setUp();
        String time = String.valueOf(System.currentTimeMillis() / 2L);
        List<Row> offsets = this.executeAndParse("--topic-partitions", "topic.*:0", "--time", time);
        List<Row> expected = Arrays.asList(new Row("topic1", 0, 0L), new Row("topic2", 0, 0L), new Row("topic3", 0, 0L), new Row("topic4", 0, 0L));
        Assertions.assertEquals(expected, offsets);
    }

    @ClusterTest
    public void testNoOffsetIfTimestampGreaterThanLatestRecord() {
        this.setUp();
        String time = String.valueOf(System.currentTimeMillis() * 2L);
        List<Row> offsets = this.executeAndParse("--topic-partitions", "topic.*", "--time", time);
        Assertions.assertEquals(new ArrayList(), offsets);
    }

    @ClusterTest
    public void testTopicPartitionsArgWithInternalExcluded() {
        this.setUp();
        List<Row> offsets = this.executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics");
        List<Row> expected = Arrays.asList(new Row("topic1", 0, 1L), new Row("topic2", 1, 2L), new Row("topic3", 2, 3L), new Row("topic4", 2, 4L));
        Assertions.assertEquals(expected, offsets);
    }

    @ClusterTest
    public void testTopicPartitionsArgWithInternalIncluded() {
        this.setUp();
        this.createConsumerAndPoll();
        List<Row> offsets = this.executeAndParse("--topic-partitions", "__.*:0");
        Assertions.assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), offsets);
    }

    @ClusterTest
    public void testTopicPartitionsNotFoundForNonExistentTopic() {
        this.assertExitCodeIsOne("--topic", "some_nonexistent_topic");
    }

    @ClusterTest
    public void testTopicPartitionsNotFoundForExcludedInternalTopic() {
        this.assertExitCodeIsOne("--topic", "some_nonexistent_topic:*");
    }

    @ClusterTest
    public void testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern() {
        this.assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--exclude-internal-topics");
    }

    @ClusterTest
    public void testTopicPartitionsFlagWithTopicFlagCauseExit() {
        this.assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--topic", "topic1");
    }

    @ClusterTest
    public void testTopicPartitionsFlagWithPartitionsFlagCauseExit() {
        this.assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--partitions", "0");
    }

    @ClusterTest
    public void testPrintHelp() {
        Exit.setExitProcedure((statusCode, message) -> {});
        try {
            String out = ToolsTestUtils.captureStandardErr(() -> GetOffsetShell.mainNoExit((String[])new String[]{"--help"}));
            Assertions.assertTrue((boolean)out.startsWith("An interactive shell for getting topic-partition offsets."));
        }
        finally {
            Exit.resetExitProcedure();
        }
    }

    @ClusterTest
    public void testPrintVersion() {
        String out = ToolsTestUtils.captureStandardOut(() -> GetOffsetShell.mainNoExit((String[])new String[]{"--version"}));
        Assertions.assertEquals((Object)AppInfoParser.getVersion(), (Object)out);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assertExitCodeIsOne(String ... args) {
        int[] exitStatus = new int[1];
        Exit.setExitProcedure((statusCode, message) -> {
            exitStatus[0] = statusCode;
            throw new RuntimeException();
        });
        try {
            GetOffsetShell.main((String[])this.addBootstrapServer(args));
        }
        catch (RuntimeException runtimeException) {
        }
        finally {
            Exit.resetExitProcedure();
        }
        Assertions.assertEquals((int)1, (int)exitStatus[0]);
    }

    private List<Row> expectedOffsetsWithInternal() {
        List consOffsets = IntStream.range(0, 4).mapToObj(i -> new Row("__consumer_offsets", i, 0L)).collect(Collectors.toList());
        return Stream.concat(consOffsets.stream(), this.expectedTestTopicOffsets().stream()).collect(Collectors.toList());
    }

    private List<Row> expectedTestTopicOffsets() {
        ArrayList<Row> offsets = new ArrayList<Row>(5);
        for (int i = 0; i < 5; ++i) {
            offsets.addAll(this.expectedOffsetsForTopic(i));
        }
        return offsets;
    }

    private List<Row> expectedOffsetsForTopic(int i) {
        String name = this.getTopicName(i);
        return IntStream.range(0, i).mapToObj(p -> new Row(name, p, Long.valueOf(i))).collect(Collectors.toList());
    }

    private List<Row> executeAndParse(String ... args) {
        String out = ToolsTestUtils.captureStandardOut(() -> GetOffsetShell.mainNoExit((String[])this.addBootstrapServer(args)));
        return Arrays.stream(out.split(System.lineSeparator())).map(i -> i.split(":")).filter(i -> ((String[])i).length >= 2).map(line -> new Row(line[0], Integer.parseInt(line[1]), ((String[])line).length == 2 || line[2].isEmpty() ? null : Long.valueOf(Long.parseLong(line[2])))).collect(Collectors.toList());
    }

    private String[] addBootstrapServer(String ... args) {
        ArrayList<String> newArgs = new ArrayList<String>(Arrays.asList(args));
        newArgs.add("--bootstrap-server");
        newArgs.add(this.cluster.bootstrapServers());
        return newArgs.toArray(new String[0]);
    }

    static class Row {
        private final String name;
        private final int partition;
        private final Long offset;

        public Row(String name, int partition, Long offset) {
            this.name = name;
            this.partition = partition;
            this.offset = offset;
        }

        public String toString() {
            return "Row[name:" + this.name + ",partition:" + this.partition + ",offset:" + this.offset;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Row)) {
                return false;
            }
            Row r = (Row)o;
            return this.name.equals(r.name) && this.partition == r.partition && Objects.equals(this.offset, r.offset);
        }

        public int hashCode() {
            return Objects.hash(this.name, this.partition, this.offset);
        }
    }
}

