/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.shell;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClusterLinkRecord;
import org.apache.kafka.common.metadata.ClusterLinkRecordJsonConverter;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EncryptedEnvelopeRecord;
import org.apache.kafka.common.metadata.InstallMetadataEncryptorRecord;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.metadata.MirrorTopicRecordJsonConverter;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.ClusterLink;
import org.apache.kafka.metadata.MetadataEncryptor;
import org.apache.kafka.metadata.MetadataEncryptorFactory;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.NoOpMetadataEncryptor;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.shell.MetadataNode;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataNodeManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
    private final Data data = new Data();
    private final LogListener logListener = new LogListener();
    private final KafkaEventQueue queue;
    private final MetadataEncryptorFactory encryptorFactory;
    private volatile MetadataProvenance provenance;
    private volatile MetadataDelta delta;
    private volatile MetadataImage image;
    private volatile boolean encryptorNotSupported = false;
    private volatile boolean recordsSkipped = false;

    public MetadataNodeManager(MetadataEncryptorFactory encryptorFactory) {
        this.encryptorFactory = encryptorFactory;
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.registerModule((Module)new Jdk8Module());
        this.queue = new KafkaEventQueue(Time.SYSTEM, new LogContext("[node-manager-event-queue] "), "");
        this.provenance = MetadataProvenance.EMPTY;
        this.delta = this.newMetadataDelta(MetadataImage.EMPTY);
    }

    public void setup(Object source) throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.appendEvent("createShellNodes", () -> {
            MetadataNode.DirectoryNode directory = this.data.root().mkdirs("shell");
            directory.create("release").setContents(AppInfoParser.getVersion());
            directory.create("commitId").setContents(AppInfoParser.getCommitId());
            directory.create("source").setContents(source);
            this.data.root().mkdirs("brokers");
            this.data.root().mkdirs("configs");
            this.data.root().mkdirs("features");
            this.data.root().mkdirs("topics");
            this.data.root().mkdirs("topicIds");
            this.data.root().mkdirs("links");
            this.data.root().mkdirs("linkIds");
            future.complete(null);
        }, future);
        future.get();
    }

    private MetadataDelta newMetadataDelta(MetadataImage image) {
        return new MetadataDelta.Builder().setImage(image).setMetadataEncryptorFactory(this.encryptorFactory).build();
    }

    public LogListener logListener() {
        return this.logListener;
    }

    public boolean recordsSkippedDuringLoading() {
        return this.recordsSkipped;
    }

    Data getData() {
        return this.data;
    }

    @Override
    public void close() throws Exception {
        this.queue.close();
    }

    public void visit(Consumer<Data> consumer) throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.appendEvent("visit", () -> {
            consumer.accept(this.data);
            future.complete(null);
        }, future);
        future.get();
    }

    private void appendEvent(final String name, final Runnable runnable, final CompletableFuture<?> future) {
        this.queue.append(new EventQueue.Event(){

            public void run() throws Exception {
                runnable.run();
            }

            public void handleException(Throwable e) {
                log.error("Unexpected error while handling event " + name, e);
                if (future != null) {
                    future.completeExceptionally(e);
                }
            }
        });
    }

    void refreshNodes() {
        this.image = this.delta.apply(this.provenance);
        this.delta = this.newMetadataDelta(this.image);
        HashSet children = new HashSet(this.data.root.children().keySet());
        children.remove("shell");
        children.forEach(arg_0 -> MetadataNodeManager.lambda$refreshNodes$2(this.data.root, arg_0));
        ImageWriterOptions imageWriterOptions = new ImageWriterOptions.Builder().setMetadataVersion(this.image.features().metadataVersion()).build();
        RecordListWriter clientQuotaWriter = new RecordListWriter();
        this.image.clientQuotas().write((ImageWriter)clientQuotaWriter, imageWriterOptions);
        for (ApiMessageAndVersion record : clientQuotaWriter.records()) {
            ClientQuotaRecord recordClientQuotas = (ClientQuotaRecord)record.message();
            List<String> directoriesClientQuotas = MetadataNodeManager.clientQuotaRecordDirectories(recordClientQuotas.entity());
            MetadataNode.DirectoryNode nodeClientQuotas = this.data.root;
            Iterator<String> iterator = directoriesClientQuotas.iterator();
            while (iterator.hasNext()) {
                String dirClientQuotas = iterator.next();
                nodeClientQuotas = nodeClientQuotas.mkdirs(dirClientQuotas);
            }
            if (recordClientQuotas.remove()) {
                nodeClientQuotas.rmrf(recordClientQuotas.key());
                continue;
            }
            nodeClientQuotas.create(recordClientQuotas.key()).setContents(recordClientQuotas.value() + "");
        }
        MetadataNode.DirectoryNode producerIdsDirectory = this.data.root.mkdirs("producerIds");
        producerIdsDirectory.create("nextBlockStartId").setContents(String.valueOf(this.image.producerIds().highestSeenProducerId()));
        MetadataNode.DirectoryNode aclsById = this.data.root.mkdirs("acls", "by-id");
        RecordListWriter aclsWriter = new RecordListWriter();
        this.image.acls().write((ImageWriter)aclsWriter, imageWriterOptions);
        for (ApiMessageAndVersion record : aclsWriter.records()) {
            AccessControlEntryRecord recordAcls = (AccessControlEntryRecord)record.message();
            StandardAcl acl = StandardAcl.fromRecord((AccessControlEntryRecord)recordAcls);
            aclsById.create(recordAcls.id().toString()).setContents(acl);
            List<String> aclsByTypeDirectory = MetadataNodeManager.aclPath(recordAcls.resourceType(), recordAcls.patternType(), recordAcls.resourceName(), recordAcls.id());
            MetadataNode.DirectoryNode node = this.data.root;
            for (String directory : aclsByTypeDirectory) {
                node = node.mkdirs(directory);
            }
            node.create(recordAcls.id().toString()).setContents(acl);
        }
        MetadataNode.DirectoryNode configsDirectory = this.data.root.mkdirs("configs");
        RecordListWriter configsWriter = new RecordListWriter();
        this.image.configs().write((ImageWriter)configsWriter, imageWriterOptions, (MetadataEncryptor)NoOpMetadataEncryptor.INSTANCE);
        for (ApiMessageAndVersion record : configsWriter.records()) {
            ConfigRecord recordConfigs = (ConfigRecord)record.message();
            String typeString = "";
            switch (ConfigResource.Type.forId((byte)recordConfigs.resourceType())) {
                case BROKER: {
                    typeString = "broker";
                    break;
                }
                case TOPIC: {
                    typeString = "topic";
                    break;
                }
                case CLUSTER_LINK: {
                    typeString = "cluster-link";
                    break;
                }
                default: {
                    throw new RuntimeException("Error processing CONFIG_RECORD: Can't handle ConfigResource.Type " + recordConfigs.resourceType());
                }
            }
            configsDirectory.mkdirs(typeString).mkdirs(recordConfigs.resourceName().isEmpty() ? "<default>" : recordConfigs.resourceName()).create(recordConfigs.name()).setContents(recordConfigs.value());
        }
        MetadataNode.DirectoryNode metadataQuorumDirectory = this.data.root.mkdirs("metadataQuorum");
        metadataQuorumDirectory.create("snapshot-id").setContents(Snapshots.filenameFromSnapshotId((OffsetAndEpoch)this.image.highestOffsetAndEpoch()));
        MetadataNode.DirectoryNode brokersDir = this.data.root.mkdirs("brokers");
        this.image.cluster().brokers().forEach((brokerId, brokerReg) -> {
            MetadataNode.DirectoryNode brokerDir = brokersDir.mkdirs(String.valueOf(brokerId));
            brokerDir.create("isFenced").setContents(String.valueOf(brokerReg.fenced()));
            RegisterBrokerRecord brokRecord = (RegisterBrokerRecord)brokerReg.toRecord(imageWriterOptions).message();
            brokerDir.create("registration").setContents(String.valueOf(brokRecord));
        });
        MetadataNode.DirectoryNode topicsNamesDir = this.data.root.mkdirs("topics");
        MetadataNode.DirectoryNode topicsIdsDir = this.data.root.mkdirs("topicIds");
        this.image.topics().topicsById().forEach((name, topic) -> this.handlePartitions(String.valueOf(name), (TopicImage)topic, topicsIdsDir));
        this.image.topics().topicsByName().forEach((name, topic) -> this.handlePartitions((String)name, (TopicImage)topic, topicsNamesDir));
        MetadataNode.DirectoryNode linkIdsDir = this.data.root.mkdirs("linkIds");
        this.image.clusterLinks().linksById().forEach((linkId, link) -> this.handleLink((ClusterLink)link, linkIdsDir, linkId.toString()));
        MetadataNode.DirectoryNode linksNamesDir = this.data.root.mkdirs("links");
        this.image.clusterLinks().linksByName().forEach((linkName, link) -> this.handleLink((ClusterLink)link, linksNamesDir, (String)linkName));
    }

    private void handleLink(ClusterLink clusterLink, MetadataNode.DirectoryNode node, String dirName) {
        MetadataNode.DirectoryNode nameNode = node.mkdirs(dirName);
        ClusterLinkRecord record = clusterLink.toRecord();
        JsonNode json = ClusterLinkRecordJsonConverter.write((ClusterLinkRecord)record, (short)1);
        nameNode.create("data").setContents(json.toPrettyString());
    }

    private void handlePartitions(String name, TopicImage topic, MetadataNode.DirectoryNode node) {
        MetadataNode.DirectoryNode nameNode = node.mkdirs(name);
        nameNode.create("id").setContents(String.valueOf(topic.id()));
        nameNode.create("name").setContents(topic.name());
        topic.mirrorTopic().ifPresent(mirrorTopic -> {
            MirrorTopicRecord mirrorTopicRecord = MirrorTopic.toSnapshotRecord((MirrorTopic)mirrorTopic, (String)topic.name());
            nameNode.create("mirror").setContents(MirrorTopicRecordJsonConverter.write((MirrorTopicRecord)mirrorTopicRecord, (short)0).toPrettyString());
        });
        topic.partitions().forEach((id, value) -> {
            MetadataNode.DirectoryNode partitionNode = nameNode.mkdirs(String.valueOf(id));
            PartitionRecord record = (PartitionRecord)value.toRecord(topic.id(), id.intValue()).message();
            JsonNode json = PartitionRecordJsonConverter.write((PartitionRecord)record, (short)0);
            partitionNode.create("data").setContents(json.toPrettyString());
        });
    }

    void handleMessage(ApiMessage message, long offset, int epoch, long lastContainedLogTimeMs) {
        try {
            ApiMessage messageToReplay = message;
            if (message instanceof InstallMetadataEncryptorRecord) {
                Uuid keyId = ((InstallMetadataEncryptorRecord)message).keyId();
                if (!this.encryptorFactory.hasKeyConfiguration(keyId)) {
                    log.warn("Skipping installation of encryptor from record {} at offset {} since we have no configuration for the encryption key", (Object)message, (Object)offset);
                    this.encryptorNotSupported = true;
                    this.recordsSkipped = true;
                    messageToReplay = new NoOpRecord();
                }
            } else if (message instanceof EncryptedEnvelopeRecord && this.encryptorNotSupported) {
                log.debug("Skipping record {} at offset {} since the encryptor could not be installed", (Object)message, (Object)offset);
                messageToReplay = new NoOpRecord();
                this.recordsSkipped = true;
            }
            this.delta.replay(messageToReplay);
            this.provenance = new MetadataProvenance(offset, epoch, lastContainedLogTimeMs);
        }
        catch (Exception e) {
            this.recordsSkipped = true;
            log.error("Error processing record at offset={} and epoch={}: {}", new Object[]{offset, epoch, message, e});
        }
    }

    static List<String> aclPath(byte resourceType, byte patternType, String resourceName, Uuid id) {
        return MetadataNodeManager.aclPath(ResourceType.fromCode((byte)resourceType), PatternType.fromCode((byte)patternType), resourceName, id);
    }

    static List<String> aclPath(ResourceType resourceType, PatternType patternType, String resourceName, Uuid id) {
        ArrayList<String> results = new ArrayList<String>();
        results.add("acls");
        results.add("by-type");
        if (resourceType.isUnknown()) {
            throw new RuntimeException("Unable to identify a valid ACL resource type for ACL " + id.toString());
        }
        results.add(resourceType.toString().toLowerCase(Locale.ROOT));
        switch (patternType) {
            case LITERAL: {
                if (resourceName.equals("*")) {
                    results.add("wildcard");
                    break;
                }
                results.add("literal");
                results.add(resourceName);
                break;
            }
            case PREFIXED: {
                results.add("prefixed");
                results.add(resourceName);
                break;
            }
            default: {
                throw new RuntimeException("Unable to identify a valid ACL pattern type for ACL " + id.toString());
            }
        }
        return results;
    }

    static List<String> clientQuotaRecordDirectories(List<ClientQuotaRecord.EntityData> entityData) {
        ArrayList<String> result = new ArrayList<String>();
        result.add("client-quotas");
        TreeMap entries = new TreeMap();
        entityData.forEach(e -> entries.put(e.entityType(), e));
        for (Map.Entry entry : entries.entrySet()) {
            result.add((String)entry.getKey());
            result.add(((ClientQuotaRecord.EntityData)entry.getValue()).entityName() == null ? "<default>" : ((ClientQuotaRecord.EntityData)entry.getValue()).entityName());
        }
        return result;
    }

    private static /* synthetic */ void lambda$refreshNodes$2(MetadataNode.DirectoryNode rec$, String xva$0) {
        rec$.rmrf(xva$0);
    }

    public class LogListener
    implements RaftClient.Listener<ApiMessageAndVersion> {
        public MetadataImage currentImage() {
            return MetadataNodeManager.this.image;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
            try {
                while (reader.hasNext()) {
                    Batch batch = (Batch)reader.next();
                    log.debug("Processing committed batch {}", (Object)batch);
                    long recordOffset = batch.baseOffset();
                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
                        MetadataNodeManager.this.handleMessage(messageAndVersion.message(), recordOffset, batch.epoch(), batch.appendTimestamp());
                        ++recordOffset;
                    }
                }
                MetadataNodeManager.this.refreshNodes();
            }
            finally {
                reader.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
            OffsetAndEpoch snapshotId = reader.snapshotId();
            try {
                while (reader.hasNext()) {
                    Batch batch = (Batch)reader.next();
                    log.debug("Processing batch {} from snapshot {}", (Object)batch, (Object)snapshotId);
                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
                        MetadataNodeManager.this.handleMessage(messageAndVersion.message(), snapshotId.offset(), snapshotId.epoch(), reader.lastContainedLogTimestamp());
                    }
                }
                MetadataNodeManager.this.refreshNodes();
            }
            finally {
                log.trace("closing snapshot reader for snapshot {}", (Object)snapshotId);
                reader.close();
            }
        }

        public void handleLeaderChange(LeaderAndEpoch leader) {
            MetadataNodeManager.this.appendEvent("handleNewLeader", () -> {
                log.debug("handleNewLeader " + leader);
                MetadataNode.DirectoryNode dir = MetadataNodeManager.this.data.root.mkdirs("metadataQuorum");
                dir.create("leader").setContents(leader.toString());
            }, null);
        }

        public void beginShutdown() {
            log.debug("Metadata log listener sent beginShutdown");
        }
    }

    public static class Data {
        private final MetadataNode.DirectoryNode root = new MetadataNode.DirectoryNode();
        private String workingDirectory = "/";

        public MetadataNode.DirectoryNode root() {
            return this.root;
        }

        public String workingDirectory() {
            return this.workingDirectory;
        }

        public void setWorkingDirectory(String workingDirectory) {
            this.workingDirectory = workingDirectory;
        }
    }
}

