/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.shell;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.shell.MetadataNode;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MetadataNodeManager
implements AutoCloseable {
    private static final int NO_LEADER_CHANGE = -2;
    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
    private final Data data = new Data();
    private final LogListener logListener = new LogListener();
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final KafkaEventQueue queue;

    public MetadataNodeManager() {
        this.objectMapper.registerModule((Module)new Jdk8Module());
        this.queue = new KafkaEventQueue(Time.SYSTEM, new LogContext("[node-manager-event-queue] "), "");
    }

    public void setup() throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.appendEvent("createShellNodes", () -> {
            MetadataNode.DirectoryNode directory = this.data.root().mkdirs("local");
            directory.create("version").setContents(AppInfoParser.getVersion());
            directory.create("commitId").setContents(AppInfoParser.getCommitId());
            future.complete(null);
        }, future);
        future.get();
    }

    public LogListener logListener() {
        return this.logListener;
    }

    Data getData() {
        return this.data;
    }

    @Override
    public void close() throws Exception {
        this.queue.close();
    }

    public void visit(Consumer<Data> consumer) throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.appendEvent("visit", () -> {
            consumer.accept(this.data);
            future.complete(null);
        }, future);
        future.get();
    }

    private void appendEvent(final String name, final Runnable runnable, final CompletableFuture<?> future) {
        this.queue.append(new EventQueue.Event(){

            public void run() throws Exception {
                runnable.run();
            }

            public void handleException(Throwable e) {
                log.error("Unexpected error while handling event " + name, e);
                if (future != null) {
                    future.completeExceptionally(e);
                }
            }
        });
    }

    void handleMessage(ApiMessage message) {
        try {
            MetadataRecordType type = MetadataRecordType.fromId((short)message.apiKey());
            this.handleCommitImpl(type, message);
        }
        catch (Exception e) {
            log.error("Error processing record of type " + message.apiKey(), (Throwable)e);
        }
    }

    private void handleCommitImpl(MetadataRecordType type, ApiMessage message) throws Exception {
        switch (type) {
            case REGISTER_BROKER_RECORD: {
                MetadataNode.DirectoryNode brokersNode = this.data.root.mkdirs("brokers");
                RegisterBrokerRecord record = (RegisterBrokerRecord)message;
                MetadataNode.DirectoryNode brokerNode = brokersNode.mkdirs(Integer.toString(record.brokerId()));
                MetadataNode.FileNode registrationNode = brokerNode.create("registration");
                registrationNode.setContents(record.toString());
                brokerNode.create("isFenced").setContents("true");
                break;
            }
            case UNREGISTER_BROKER_RECORD: {
                UnregisterBrokerRecord record = (UnregisterBrokerRecord)message;
                this.data.root.rmrf("brokers", Integer.toString(record.brokerId()));
                break;
            }
            case TOPIC_RECORD: {
                TopicRecord record = (TopicRecord)message;
                MetadataNode.DirectoryNode topicsDirectory = this.data.root.mkdirs("topics");
                MetadataNode.DirectoryNode topicDirectory = topicsDirectory.mkdirs(record.name());
                topicDirectory.create("id").setContents(record.topicId().toString());
                topicDirectory.create("name").setContents(record.name().toString());
                MetadataNode.DirectoryNode topicIdsDirectory = this.data.root.mkdirs("topicIds");
                topicIdsDirectory.addChild(record.topicId().toString(), topicDirectory);
                break;
            }
            case PARTITION_RECORD: {
                PartitionRecord record = (PartitionRecord)message;
                MetadataNode.DirectoryNode topicDirectory = this.data.root.mkdirs("topicIds").mkdirs(record.topicId().toString());
                MetadataNode.DirectoryNode partitionDirectory = topicDirectory.mkdirs(Integer.toString(record.partitionId()));
                JsonNode node = PartitionRecordJsonConverter.write((PartitionRecord)record, (short)0);
                partitionDirectory.create("data").setContents(node.toPrettyString());
                break;
            }
            case CONFIG_RECORD: {
                ConfigRecord record = (ConfigRecord)message;
                String typeString = "";
                switch (ConfigResource.Type.forId((byte)record.resourceType())) {
                    case BROKER: {
                        typeString = "broker";
                        break;
                    }
                    case TOPIC: {
                        typeString = "topic";
                        break;
                    }
                    default: {
                        throw new RuntimeException("Error processing CONFIG_RECORD: Can't handle ConfigResource.Type " + record.resourceType());
                    }
                }
                MetadataNode.DirectoryNode configDirectory = this.data.root.mkdirs("configs").mkdirs(typeString).mkdirs(record.resourceName());
                if (record.value() == null) {
                    configDirectory.rmrf(record.name());
                    break;
                }
                configDirectory.create(record.name()).setContents(record.value());
                break;
            }
            case PARTITION_CHANGE_RECORD: {
                PartitionChangeRecord record = (PartitionChangeRecord)message;
                MetadataNode.FileNode file = this.data.root.file("topicIds", record.topicId().toString(), Integer.toString(record.partitionId()), "data");
                JsonNode node = this.objectMapper.readTree(file.contents());
                PartitionRecord partition = PartitionRecordJsonConverter.read((JsonNode)node, (short)0);
                if (record.isr() != null) {
                    partition.setIsr(record.isr());
                }
                if (record.leader() != -2) {
                    partition.setLeader(record.leader());
                    partition.setLeaderEpoch(partition.leaderEpoch() + 1);
                }
                partition.setPartitionEpoch(partition.partitionEpoch() + 1);
                file.setContents(PartitionRecordJsonConverter.write((PartitionRecord)partition, (short)0).toPrettyString());
                break;
            }
            case FENCE_BROKER_RECORD: {
                FenceBrokerRecord record = (FenceBrokerRecord)message;
                this.data.root.mkdirs("brokers", Integer.toString(record.id())).create("isFenced").setContents("true");
                break;
            }
            case UNFENCE_BROKER_RECORD: {
                UnfenceBrokerRecord record = (UnfenceBrokerRecord)message;
                this.data.root.mkdirs("brokers", Integer.toString(record.id())).create("isFenced").setContents("false");
                break;
            }
            case REMOVE_TOPIC_RECORD: {
                RemoveTopicRecord record = (RemoveTopicRecord)message;
                MetadataNode.DirectoryNode topicsDirectory = this.data.root.directory("topicIds", record.topicId().toString());
                String name = topicsDirectory.file("name").contents();
                this.data.root.rmrf("topics", name);
                this.data.root.rmrf("topicIds", record.topicId().toString());
                break;
            }
            case CLIENT_QUOTA_RECORD: {
                ClientQuotaRecord record = (ClientQuotaRecord)message;
                List<String> directories = MetadataNodeManager.clientQuotaRecordDirectories(record.entity());
                MetadataNode.DirectoryNode node = this.data.root;
                for (String directory : directories) {
                    node = node.mkdirs(directory);
                }
                if (record.remove()) {
                    node.rmrf(record.key());
                    break;
                }
                node.create(record.key()).setContents(record.value() + "");
                break;
            }
            default: {
                throw new RuntimeException("Unhandled metadata record type");
            }
        }
    }

    static List<String> clientQuotaRecordDirectories(List<ClientQuotaRecord.EntityData> entityData) {
        ArrayList<String> result = new ArrayList<String>();
        result.add("client-quotas");
        TreeMap entries = new TreeMap();
        entityData.forEach(e -> entries.put(e.entityType(), e));
        for (Map.Entry entry : entries.entrySet()) {
            result.add((String)entry.getKey());
            result.add(((ClientQuotaRecord.EntityData)entry.getValue()).entityName() == null ? "<default>" : ((ClientQuotaRecord.EntityData)entry.getValue()).entityName());
        }
        return result;
    }

    class LogListener
    implements RaftClient.Listener<ApiMessageAndVersion> {
        LogListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
            try {
                while (reader.hasNext()) {
                    Batch batch = (Batch)reader.next();
                    log.debug("handleCommits " + batch.records() + " at offset " + batch.lastOffset());
                    MetadataNode.DirectoryNode dir = MetadataNodeManager.this.data.root.mkdirs("metadataQuorum");
                    dir.create("offset").setContents(String.valueOf(batch.lastOffset()));
                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
                        MetadataNodeManager.this.handleMessage(messageAndVersion.message());
                    }
                }
            }
            finally {
                reader.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
            try {
                while (reader.hasNext()) {
                    Batch batch = (Batch)reader.next();
                    for (ApiMessageAndVersion messageAndVersion : batch) {
                        MetadataNodeManager.this.handleMessage(messageAndVersion.message());
                    }
                }
            }
            finally {
                reader.close();
            }
        }

        public void handleLeaderChange(LeaderAndEpoch leader) {
            MetadataNodeManager.this.appendEvent("handleNewLeader", () -> {
                log.debug("handleNewLeader " + leader);
                MetadataNode.DirectoryNode dir = MetadataNodeManager.this.data.root.mkdirs("metadataQuorum");
                dir.create("leader").setContents(leader.toString());
            }, null);
        }

        public void beginShutdown() {
            log.debug("Metadata log listener sent beginShutdown");
        }
    }

    public static class Data {
        private final MetadataNode.DirectoryNode root = new MetadataNode.DirectoryNode();
        private String workingDirectory = "/";

        public MetadataNode.DirectoryNode root() {
            return this.root;
        }

        public String workingDirectory() {
            return this.workingDirectory;
        }

        public void setWorkingDirectory(String workingDirectory) {
            this.workingDirectory = workingDirectory;
        }
    }
}

