/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.shell.confluent;

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.raft.KafkaMetadataLog;
import kafka.raft.MetadataLogConfig;
import kafka.server.KafkaRaftServer;
import kafka.testkit.ControllerNode;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.util.ClusterMetadataSource;
import org.apache.kafka.metadata.util.LocalMetadataLogReader;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.shell.command.Commands;
import org.apache.kafka.shell.confluent.ShellSourceManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=120L)
@Tag(value="integration")
public class MetadataShellIntegrationTest {
    private static final MetadataLogConfig METADATA_LOG_CONFIG = new MetadataLogConfig(102400, 102400, 10000L, Long.MAX_VALUE, Long.MAX_VALUE, 0x800000, 0x800000, 60000L, 1);

    static void assertLinesMatch(List<String> expected, List<String> actual) {
        for (int i = 0; i < Math.max(expected.size(), actual.size()); ++i) {
            boolean matched;
            String expectedLine = i < expected.size() ? expected.get(i) : "";
            String actualLine = i < actual.size() ? actual.get(i) : "";
            boolean bl = matched = expectedLine.startsWith("^") ? Pattern.matches(expectedLine, actualLine) : expectedLine.equals(actualLine);
            if (matched) continue;
            throw new RuntimeException("Mismatch on line " + i + ": Expected: " + expectedLine + ", but got: " + actualLine + ". FULL OUTPUT: " + String.join((CharSequence)"\n", actual));
        }
    }

    static void assertCommandOutput(Shell shell, List<String> args, String ... expected) throws Exception {
        try (CaptureStream captureStream = new CaptureStream();){
            shell.run(captureStream.out, args);
            MetadataShellIntegrationTest.assertLinesMatch(Arrays.asList(expected), captureStream.outputLines());
        }
    }

    private static String getFirstMetadataLogFile(KafkaClusterTestKit cluster) throws Exception {
        String metadataDir = ((ControllerNode)cluster.nodes().controllerNodes().values().iterator().next()).metadataDirectory() + File.separator + KafkaRaftServer.MetadataTopic() + "-0";
        ArrayList<Path> paths = new ArrayList<Path>();
        for (Path path : Files.newDirectoryStream(Paths.get(metadataDir, new String[0]), p -> p.getFileName().toString().endsWith(".log"))) {
            paths.add(path);
        }
        Assertions.assertEquals((int)1, (int)paths.size(), (String)"Expected exactly one .log file");
        return metadataDir;
    }

    private static KafkaClusterTestKit createCluster1() throws Exception {
        KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumBrokerNodes(3).setNumControllerNodes(1).build()).build();
        try {
            cluster.format();
            cluster.startup();
            try (Admin admin = Admin.create((Properties)cluster.clientProperties());){
                ArrayList<NewTopic> newTopics = new ArrayList<NewTopic>();
                HashMap<Integer, List<Integer>> fooPartitions = new HashMap<Integer, List<Integer>>();
                fooPartitions.put(0, Arrays.asList(0, 1, 2));
                fooPartitions.put(1, Arrays.asList(1, 2, 0));
                newTopics.add(new NewTopic("foo", fooPartitions));
                HashMap<Integer, List<Integer>> barPartitions = new HashMap<Integer, List<Integer>>();
                barPartitions.put(0, Arrays.asList(2, 0, 1));
                newTopics.add(new NewTopic("bar", barPartitions));
                admin.createTopics(newTopics).all().get();
            }
            cluster.shutdownAll();
            return cluster;
        }
        catch (Throwable e) {
            cluster.close();
            throw e;
        }
    }

    @Test
    void testLoadSnapshotAndList() throws Exception {
        try (KafkaClusterTestKit cluster = MetadataShellIntegrationTest.createCluster1();){
            File dataDir = new File(MetadataShellIntegrationTest.getFirstMetadataLogFile(cluster));
            try (ClosableScheduler scheduler = new ClosableScheduler();
                 KafkaMetadataLog metadataLog = KafkaMetadataLog.createWithoutRecovery((TopicPartition)Topic.CLUSTER_METADATA_TOPIC_PARTITION, (Uuid)Uuid.METADATA_TOPIC_ID, (File)dataDir, (Time)Time.SYSTEM, (Metrics)new Metrics(), (Scheduler)scheduler.scheduler, (MetadataLogConfig)METADATA_LOG_CONFIG, l -> {}, o -> {});
                 LocalMetadataLogReader reader = new LocalMetadataLogReader((ReplicatedLog)metadataLog, OptionalLong.of(Long.MAX_VALUE));
                 Shell shell = new Shell((ClusterMetadataSource)reader, KafkaConfigSchema.EMPTY, true);){
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls"), "image", "local");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "image"), "acls", "cells", "clientQuotas", "cluster", "clusterLinks", "configs", "encryptor", "features", "producerIds", "provenance", "replicaExclusions", "scram", "tenants", "topics");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("find", "image/cluster"), "image/cluster", "image/cluster/0", "image/cluster/1", "image/cluster/2");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "image/topics/byName/foo"), "0", "1", "id", "name");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "image/topics/byName/bar"), "0", "id", "name");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "image/topics/byName/baz"), "ls: image/topics/byName/baz: no such file or directory.");
            }
        }
    }

    @Test
    void testLoadSnapshotAndTryPathCommands() throws Exception {
        try (KafkaClusterTestKit cluster = MetadataShellIntegrationTest.createCluster1();){
            File dataDir = new File(MetadataShellIntegrationTest.getFirstMetadataLogFile(cluster));
            try (ClosableScheduler scheduler = new ClosableScheduler();
                 KafkaMetadataLog metadataLog = KafkaMetadataLog.createWithoutRecovery((TopicPartition)Topic.CLUSTER_METADATA_TOPIC_PARTITION, (Uuid)Uuid.METADATA_TOPIC_ID, (File)dataDir, (Time)Time.SYSTEM, (Metrics)new Metrics(), (Scheduler)scheduler.scheduler, (MetadataLogConfig)METADATA_LOG_CONFIG, l -> {}, o -> {});
                 LocalMetadataLogReader reader = new LocalMetadataLogReader((ReplicatedLog)metadataLog, OptionalLong.of(Long.MAX_VALUE));
                 Shell shell = new Shell((ClusterMetadataSource)reader, KafkaConfigSchema.EMPTY, true);){
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("cd", "image/cluster"), "");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("pwd"), "/image/cluster");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls"), "0", "1", "2");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("ls", "."), "0", "1", "2");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("man", "pwd"), "pwd: Print the current working directory.", "", "usage: pwd");
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("cd"), new String[0]);
                MetadataShellIntegrationTest.assertCommandOutput(shell, Arrays.asList("pwd"), "/");
            }
        }
    }

    static class CaptureStream
    implements AutoCloseable {
        private final ByteArrayOutputStream out = new ByteArrayOutputStream();
        private final PrintStream stream = new PrintStream((OutputStream)this.out, true, "utf8");

        CaptureStream() throws Exception {
        }

        @Override
        public void close() throws Exception {
            Utils.closeQuietly((AutoCloseable)this.stream, (String)"stream");
            Utils.closeQuietly((AutoCloseable)this.out, (String)"out");
        }

        List<String> outputLines() throws IOException {
            this.out.flush();
            return Arrays.asList(this.out.toString().split("\n"));
        }
    }

    static class Shell
    implements AutoCloseable {
        private final boolean interactive;
        private final ShellSourceManager sourceManager;

        Shell(ClusterMetadataSource source, KafkaConfigSchema configSchema, boolean interactive) throws Exception {
            this.interactive = interactive;
            this.sourceManager = new ShellSourceManager(source, new Properties(), configSchema, Optional.empty());
            try {
                this.sourceManager.start();
                this.sourceManager.waitUntilCaughtUp();
            }
            catch (Exception e) {
                this.sourceManager.close();
                throw e;
            }
        }

        void run(OutputStream stream, List<String> args) throws Exception {
            Commands commands = new Commands(this.interactive);
            try (PrintWriter writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(stream, StandardCharsets.UTF_8)));){
                Commands.Handler handler = commands.parseCommand(args);
                handler.run(Optional.empty(), writer, this.sourceManager.state());
                writer.flush();
            }
        }

        @Override
        public void close() throws Exception {
            this.sourceManager.close();
        }
    }

    static class ClosableScheduler
    implements AutoCloseable {
        final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler", false);

        ClosableScheduler() {
        }

        @Override
        public void close() throws Exception {
            this.scheduler.shutdown();
        }
    }
}

