/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.FunctionMetaDataTopicTailer;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionMetaDataManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionMetaDataManager.class);
    @VisibleForTesting
    final Map<String, Map<String, Map<String, Function.FunctionMetaData>>> functionMetaDataMap = new ConcurrentHashMap<String, Map<String, Map<String, Function.FunctionMetaData>>>();
    private final SchedulerManager schedulerManager;
    private final WorkerConfig workerConfig;
    private final PulsarClient pulsarClient;
    private final ErrorNotifier errorNotifier;
    private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;
    private Producer exclusiveLeaderProducer;
    private volatile MessageId lastMessageSeen = MessageId.earliest;
    private static final String versionTag = "version";
    private CompletableFuture<Void> isInitialized = new CompletableFuture();

    public FunctionMetaDataManager(WorkerConfig workerConfig, SchedulerManager schedulerManager, PulsarClient pulsarClient, ErrorNotifier errorNotifier) throws PulsarClientException {
        this.workerConfig = workerConfig;
        this.pulsarClient = pulsarClient;
        this.schedulerManager = schedulerManager;
        this.errorNotifier = errorNotifier;
        this.exclusiveLeaderProducer = null;
    }

    public synchronized void initialize() {
        try (Reader reader = FunctionMetaDataTopicTailer.createReader(this.workerConfig, this.pulsarClient.newReader(), MessageId.earliest);){
            while (reader.hasMessageAvailable()) {
                this.processMetaDataTopicMessage(reader.readNext());
            }
            this.isInitialized.complete(null);
        }
        catch (Exception e) {
            log.error("Failed to initialize meta data store", (Throwable)e);
            throw new RuntimeException("Failed to initialize Metadata Manager", e);
        }
        log.info("FunctionMetaData Manager initialization complete");
    }

    public synchronized void start() {
        if (this.exclusiveLeaderProducer == null) {
            try {
                this.initializeTailer();
            }
            catch (PulsarClientException e) {
                throw new RuntimeException("Could not start MetaData topic tailer", e);
            }
        }
    }

    @Override
    public void close() throws Exception {
        if (this.functionMetaDataTopicTailer != null) {
            this.functionMetaDataTopicTailer.close();
        }
        if (this.exclusiveLeaderProducer != null) {
            this.exclusiveLeaderProducer.close();
        }
    }

    public synchronized Function.FunctionMetaData getFunctionMetaData(String tenant, String namespace, String functionName) {
        return this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
    }

    public synchronized List<Function.FunctionMetaData> getAllFunctionMetaData() {
        LinkedList<Function.FunctionMetaData> ret = new LinkedList<Function.FunctionMetaData>();
        for (Map<String, Map<String, Function.FunctionMetaData>> i : this.functionMetaDataMap.values()) {
            for (Map<String, Function.FunctionMetaData> j : i.values()) {
                ret.addAll(j.values());
            }
        }
        return ret;
    }

    public synchronized Collection<Function.FunctionMetaData> listFunctions(String tenant, String namespace) {
        LinkedList<Function.FunctionMetaData> ret = new LinkedList<Function.FunctionMetaData>();
        if (!this.functionMetaDataMap.containsKey(tenant)) {
            return ret;
        }
        if (!this.functionMetaDataMap.get(tenant).containsKey(namespace)) {
            return ret;
        }
        for (Function.FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) {
            ret.add(functionMetaData);
        }
        return ret;
    }

    public synchronized boolean containsFunction(String tenant, String namespace, String functionName) {
        return this.containsFunctionMetaData(tenant, namespace, functionName);
    }

    public synchronized void updateFunctionOnLeader(Function.FunctionMetaData functionMetaData, boolean delete) throws IllegalStateException, IllegalArgumentException {
        byte[] toWrite;
        if (this.exclusiveLeaderProducer == null) {
            throw new IllegalStateException("Not the leader");
        }
        boolean needsScheduling = delete ? this.proccessDeregister(functionMetaData) : this.processUpdate(functionMetaData);
        if (this.workerConfig.getUseCompactedMetadataTopic().booleanValue()) {
            toWrite = delete ? "".getBytes() : functionMetaData.toByteArray();
        } else {
            Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder().setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE).setFunctionMetaData(functionMetaData).setWorkerId(this.workerConfig.getWorkerId()).setRequestId(UUID.randomUUID().toString()).build();
            toWrite = serviceRequest.toByteArray();
        }
        try {
            TypedMessageBuilder<byte[]> builder = this.exclusiveLeaderProducer.newMessage().value(toWrite).property(versionTag, Long.toString(functionMetaData.getVersion()));
            if (this.workerConfig.getUseCompactedMetadataTopic().booleanValue()) {
                builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
            }
            this.lastMessageSeen = builder.send();
        }
        catch (Exception e) {
            log.error("Could not write into Function Metadata topic", (Throwable)e);
            throw new IllegalStateException("Internal Error updating function at the leader", e);
        }
        if (needsScheduling) {
            this.schedulerManager.schedule();
        }
    }

    public Producer<byte[]> acquireExclusiveWrite(Supplier<Boolean> isLeader) throws WorkerUtils.NotLeaderAnymore {
        return WorkerUtils.createExclusiveProducerWithRetry(this.pulsarClient, this.workerConfig.getFunctionMetadataTopic(), this.workerConfig.getWorkerId() + "-leader", isLeader, 1000);
    }

    public void acquireLeadership(Producer<byte[]> exclusiveProducer) {
        log.info("FunctionMetaDataManager becoming leader by creating exclusive producer");
        if (this.exclusiveLeaderProducer != null) {
            log.error("FunctionMetaData Manager entered invalid state");
            this.errorNotifier.triggerError(new IllegalStateException());
        }
        this.exclusiveLeaderProducer = exclusiveProducer;
        FunctionMetaDataTopicTailer tailer = this.functionMetaDataTopicTailer;
        this.functionMetaDataTopicTailer = null;
        if (tailer != null) {
            try {
                tailer.stopWhenNoMoreMessages().get();
            }
            catch (Exception e) {
                log.error("Error while waiting for metadata tailer thread to finish", (Throwable)e);
                this.errorNotifier.triggerError(e);
            }
            tailer.close();
        }
        log.info("FunctionMetaDataManager done becoming leader");
    }

    public synchronized void giveupLeadership() {
        log.info("FunctionMetaDataManager giving up leadership by closing exclusive producer");
        try {
            this.exclusiveLeaderProducer.close();
            this.exclusiveLeaderProducer = null;
            this.initializeTailer();
        }
        catch (PulsarClientException e) {
            log.error("Error closing exclusive producer", (Throwable)e);
            this.errorNotifier.triggerError(e);
        }
    }

    public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
        try {
            if (this.workerConfig.getUseCompactedMetadataTopic().booleanValue()) {
                this.processCompactedMetaDataTopicMessage(message);
            } else {
                this.processUncompactedMetaDataTopicMessage(message);
            }
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        this.lastMessageSeen = message.getMessageId();
    }

    private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
        Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
        if (log.isDebugEnabled()) {
            log.debug("Received Service Request: {}", (Object)serviceRequest);
        }
        switch (serviceRequest.getServiceRequestType()) {
            case UPDATE: {
                this.processUpdate(serviceRequest.getFunctionMetaData());
                break;
            }
            case DELETE: {
                this.proccessDeregister(serviceRequest.getFunctionMetaData());
                break;
            }
            default: {
                log.warn("Received request with unrecognized type: {}", (Object)serviceRequest);
            }
        }
    }

    private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
        long version = Long.valueOf(message.getProperty(versionTag));
        String tenant = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
        String namespace = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
        String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
        if (message.getData() == null || message.getData().length == 0) {
            this.proccessDeregister(tenant, namespace, functionName, version);
        } else {
            Function.FunctionMetaData functionMetaData = Function.FunctionMetaData.parseFrom(message.getData());
            this.processUpdate(functionMetaData);
        }
    }

    private boolean containsFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        return this.containsFunctionMetaData(functionMetaData.getFunctionDetails());
    }

    private boolean containsFunctionMetaData(Function.FunctionDetails functionDetails) {
        return this.containsFunctionMetaData(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
    }

    private boolean containsFunctionMetaData(String tenant, String namespace, String functionName) {
        return this.functionMetaDataMap.containsKey(tenant) && this.functionMetaDataMap.get(tenant).containsKey(namespace) && this.functionMetaDataMap.get(tenant).get(namespace).containsKey(functionName);
    }

    @VisibleForTesting
    synchronized boolean proccessDeregister(Function.FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {
        String functionName = deregisterRequestFs.getFunctionDetails().getName();
        String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
        String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
        return this.proccessDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
    }

    synchronized boolean proccessDeregister(String tenant, String namespace, String functionName, long version) throws IllegalArgumentException {
        boolean needsScheduling = false;
        log.debug("Process deregister request: {}/{}/{}/{}", new Object[]{tenant, namespace, functionName, version});
        if (this.containsFunctionMetaData(tenant, namespace, functionName)) {
            if (!this.isRequestOutdated(tenant, namespace, functionName, version)) {
                this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
                needsScheduling = true;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("{}/{}/{} Ignoring outdated request version: {}", new Object[]{tenant, namespace, functionName, version});
                }
                throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
            }
        }
        return needsScheduling;
    }

    @VisibleForTesting
    synchronized boolean processUpdate(Function.FunctionMetaData updateRequestFs) throws IllegalArgumentException {
        log.debug("Process update request: {}", (Object)updateRequestFs);
        boolean needsScheduling = false;
        if (!this.containsFunctionMetaData(updateRequestFs)) {
            this.setFunctionMetaData(updateRequestFs);
            needsScheduling = true;
        } else if (!this.isRequestOutdated(updateRequestFs)) {
            this.setFunctionMetaData(updateRequestFs);
            needsScheduling = true;
        } else {
            throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again.");
        }
        return needsScheduling;
    }

    private boolean isRequestOutdated(Function.FunctionMetaData requestFunctionMetaData) {
        Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
        return this.isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), requestFunctionMetaData.getVersion());
    }

    private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
        Function.FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);
        return currentFunctionMetaData.getVersion() >= version;
    }

    @VisibleForTesting
    void setFunctionMetaData(Function.FunctionMetaData functionMetaData) {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        if (!this.functionMetaDataMap.containsKey(functionDetails.getTenant())) {
            this.functionMetaDataMap.put(functionDetails.getTenant(), new ConcurrentHashMap());
        }
        if (!this.functionMetaDataMap.get(functionDetails.getTenant()).containsKey(functionDetails.getNamespace())) {
            this.functionMetaDataMap.get(functionDetails.getTenant()).put(functionDetails.getNamespace(), new ConcurrentHashMap());
        }
        this.functionMetaDataMap.get(functionDetails.getTenant()).get(functionDetails.getNamespace()).put(functionDetails.getName(), functionMetaData);
    }

    private void initializeTailer() throws PulsarClientException {
        this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, this.pulsarClient.newReader(), this.workerConfig, this.lastMessageSeen, this.errorNotifier);
        this.functionMetaDataTopicTailer.start();
        log.info("MetaData Manager Tailer started");
    }

    public MessageId getLastMessageSeen() {
        return this.lastMessageSeen;
    }

    public CompletableFuture<Void> getIsInitialized() {
        return this.isInitialized;
    }
}

