/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.FunctionActioner;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerStatsManager;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.UriBuilder;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionRuntimeManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionRuntimeManager.class);
    @VisibleForTesting
    Map<String, Map<String, Function.Assignment>> workerIdToAssignments = new ConcurrentHashMap<String, Map<String, Function.Assignment>>();
    @VisibleForTesting
    final FunctionRuntimeInfos functionRuntimeInfos = new FunctionRuntimeInfos();
    @VisibleForTesting
    final WorkerConfig workerConfig;
    private FunctionActioner functionActioner;
    private RuntimeFactory runtimeFactory;
    private MembershipManager membershipManager;
    private final PulsarAdmin functionAdmin;
    private WorkerService workerService;
    boolean isInitializePhase = false;
    private final CompletableFuture<Void> isInitialized = new CompletableFuture();
    private final FunctionMetaDataManager functionMetaDataManager;
    private final WorkerStatsManager workerStatsManager;
    private final ErrorNotifier errorNotifier;

    public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager, FunctionsManager functionsManager, FunctionMetaDataManager functionMetaDataManager, WorkerStatsManager workerStatsManager, ErrorNotifier errorNotifier) throws Exception {
        this.workerConfig = workerConfig;
        this.workerService = workerService;
        this.functionAdmin = workerService.getFunctionAdmin();
        SecretsProviderConfigurator secretsProviderConfigurator = !StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName()) ? (SecretsProviderConfigurator)Reflections.createInstance(workerConfig.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader()) : new DefaultSecretsProviderConfigurator();
        log.info("Initializing secrets provider configurator {} with configs: {}", (Object)secretsProviderConfigurator.getClass().getName(), workerConfig.getSecretsProviderConfiguratorConfig());
        secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
        Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();
        AuthenticationConfig authConfig = null;
        if (workerConfig.isAuthenticationEnabled()) {
            authConfig = AuthenticationConfig.builder().clientAuthenticationPlugin(workerConfig.getBrokerClientAuthenticationPlugin()).clientAuthenticationParameters(workerConfig.getBrokerClientAuthenticationParameters()).tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()).useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()).tlsHostnameVerificationEnable(workerConfig.isTlsEnableHostnameVerification()).build();
            if (!StringUtils.isEmpty(workerConfig.getFunctionAuthProviderClassName())) {
                functionAuthProvider = Optional.of(FunctionAuthProvider.getAuthProvider(workerConfig.getFunctionAuthProviderClassName()));
            }
        }
        Optional<Object> runtimeCustomizer = Optional.empty();
        if (!StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName())) {
            runtimeCustomizer = Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
            ((RuntimeCustomizer)runtimeCustomizer.get()).initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
        }
        if (!StringUtils.isEmpty(workerConfig.getFunctionRuntimeFactoryClassName())) {
            this.runtimeFactory = RuntimeFactory.getFuntionRuntimeFactory(workerConfig.getFunctionRuntimeFactoryClassName());
        } else if (workerConfig.getThreadContainerFactory() != null) {
            this.runtimeFactory = new ThreadRuntimeFactory();
            workerConfig.setFunctionRuntimeFactoryConfigs(ObjectMapperFactory.getThreadLocal().convertValue((Object)workerConfig.getThreadContainerFactory(), Map.class));
        } else if (workerConfig.getProcessContainerFactory() != null) {
            this.runtimeFactory = new ProcessRuntimeFactory();
            workerConfig.setFunctionRuntimeFactoryConfigs(ObjectMapperFactory.getThreadLocal().convertValue((Object)workerConfig.getProcessContainerFactory(), Map.class));
        } else if (workerConfig.getKubernetesContainerFactory() != null) {
            this.runtimeFactory = new KubernetesRuntimeFactory();
            workerConfig.setFunctionRuntimeFactoryConfigs(ObjectMapperFactory.getThreadLocal().convertValue((Object)workerConfig.getKubernetesContainerFactory(), Map.class));
        } else {
            throw new RuntimeException("A Function Runtime Factory needs to be set");
        }
        this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider, runtimeCustomizer);
        this.functionActioner = new FunctionActioner(this.workerConfig, this.runtimeFactory, dlogNamespace, connectorsManager, functionsManager, workerService.getBrokerAdmin());
        this.membershipManager = membershipManager;
        this.functionMetaDataManager = functionMetaDataManager;
        this.workerStatsManager = workerStatsManager;
        this.errorNotifier = errorNotifier;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public MessageId initialize() {
        try (Reader<byte[]> reader = WorkerUtils.createReader(this.workerService.getClient().newReader(), this.workerConfig.getWorkerId() + "-function-assignment-initialize", this.workerConfig.getFunctionAssignmentTopic(), MessageId.earliest);){
            this.isInitializePhase = true;
            MessageId lastMessageRead = MessageId.earliest;
            while (reader.hasMessageAvailable()) {
                Message<byte[]> message = reader.readNext();
                lastMessageRead = message.getMessageId();
                this.processAssignmentMessage(message);
            }
            this.isInitializePhase = false;
            Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
            if (assignmentMap != null) {
                for (Function.Assignment assignment : assignmentMap.values()) {
                    if (!this.needsStart(assignment)) continue;
                    this.startFunctionInstance(assignment);
                }
            }
            this.isInitialized.complete(null);
            MessageId messageId = lastMessageRead;
            return messageId;
        }
        catch (Exception e) {
            log.error("Failed to initialize function runtime manager: {}", (Object)e.getMessage(), (Object)e);
            throw new RuntimeException(e);
        }
    }

    public synchronized Map<String, Map<String, Function.Assignment>> getCurrentAssignments() {
        HashMap<String, Map<String, Function.Assignment>> copy = new HashMap<String, Map<String, Function.Assignment>>();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            HashMap<String, Function.Assignment> tmp = new HashMap<String, Function.Assignment>();
            tmp.putAll(entry.getValue());
            copy.put(entry.getKey(), tmp);
        }
        return copy;
    }

    public synchronized Function.Assignment findFunctionAssignment(String tenant, String namespace, String functionName, int instanceId) {
        return this.findAssignment(tenant, namespace, functionName, instanceId);
    }

    public synchronized Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName) {
        return FunctionRuntimeManager.findFunctionAssignments(tenant, namespace, functionName, this.workerIdToAssignments);
    }

    public static Collection<Function.Assignment> findFunctionAssignments(String tenant, String namespace, String functionName, Map<String, Map<String, Function.Assignment>> workerIdToAssignments) {
        LinkedList<Function.Assignment> assignments = new LinkedList<Function.Assignment>();
        for (Map<String, Function.Assignment> entryMap : workerIdToAssignments.values()) {
            assignments.addAll(entryMap.values().stream().filter(assignment -> tenant.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant()) && namespace.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace()) && functionName.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName())).collect(Collectors.toList()));
        }
        return assignments;
    }

    public synchronized void removeAssignments(Collection<Function.Assignment> assignments) {
        for (Function.Assignment assignment : assignments) {
            this.deleteAssignment(assignment);
        }
    }

    public synchronized void restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId, URI uri) throws Exception {
        String workerId;
        if (this.runtimeFactory.externallyManaged()) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.NOT_IMPLEMENTED).type("application/json").entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build());
        }
        Function.Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
        String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
        if (assignment == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(fullFunctionName + " doesn't exist")).build());
        }
        String assignedWorkerId = assignment.getWorkerId();
        if (assignedWorkerId.equals(workerId = this.workerConfig.getWorkerId())) {
            this.stopFunction(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
            return;
        }
        List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
        WorkerInfo workerInfo = null;
        for (WorkerInfo entry : workerInfoList) {
            if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
            workerInfo = entry;
        }
        if (workerInfo == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
        }
        if (uri == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
        }
        URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
        throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
    }

    public synchronized void restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception {
        String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName);
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        if (assignments.isEmpty()) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment assignment = assignments.iterator().next();
            String assignedWorkerId = assignment.getWorkerId();
            String workerId = this.workerConfig.getWorkerId();
            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
            if (assignedWorkerId.equals(workerId)) {
                this.stopFunction(fullyQualifiedInstanceId, true);
            } else {
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] has not been assigned yet", (Object)fullyQualifiedInstanceId);
                    }
                    throw new WebApplicationException(Response.serverError().status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
                }
                Function.FunctionDetails.ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
                if (Function.FunctionDetails.ComponentType.SOURCE == componentType) {
                    this.functionAdmin.sources().restartSource(tenant, namespace, functionName);
                } else if (Function.FunctionDetails.ComponentType.SINK == componentType) {
                    this.functionAdmin.sinks().restartSink(tenant, namespace, functionName);
                } else {
                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
                }
            }
        } else {
            for (Function.Assignment assignment : assignments) {
                String assignedWorkerId = assignment.getWorkerId();
                String workerId = this.workerConfig.getWorkerId();
                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
                if (assignedWorkerId.equals(workerId)) {
                    this.stopFunction(fullyQualifiedInstanceId, true);
                    continue;
                }
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("[{}] has not been assigned yet", (Object)fullyQualifiedInstanceId);
                    continue;
                }
                Function.FunctionDetails.ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
                if (Function.FunctionDetails.ComponentType.SOURCE == componentType) {
                    this.functionAdmin.sources().restartSource(tenant, namespace, functionName, assignment.getInstance().getInstanceId());
                    continue;
                }
                if (Function.FunctionDetails.ComponentType.SINK == componentType) {
                    this.functionAdmin.sinks().restartSink(tenant, namespace, functionName, assignment.getInstance().getInstanceId());
                    continue;
                }
                this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, assignment.getInstance().getInstanceId());
            }
        }
    }

    public void stopAllOwnedFunctions() {
        if (this.runtimeFactory.externallyManaged()) {
            log.warn("Will not stop any functions since they are externally managed");
            return;
        }
        String workerId = this.workerConfig.getWorkerId();
        Map<String, Function.Assignment> assignments = this.workerIdToAssignments.get(workerId);
        if (assignments != null) {
            TreeMap<String, Function.Assignment> copiedAssignments = new TreeMap<String, Function.Assignment>(assignments);
            copiedAssignments.values().forEach(assignment -> {
                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
                try {
                    this.stopFunction(fullyQualifiedInstanceId, false);
                }
                catch (Exception e) {
                    log.warn("Failed to stop function {} - {}", (Object)fullyQualifiedInstanceId, (Object)e.getMessage());
                }
            });
        }
    }

    private void stopFunction(String fullyQualifiedInstanceId, boolean restart) throws Exception {
        log.info("[{}] {}..", (Object)(restart ? "restarting" : "stopping"), (Object)fullyQualifiedInstanceId);
        FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo != null) {
            this.conditionallyStopFunction(functionRuntimeInfo);
            try {
                if (restart) {
                    this.conditionallyStartFunction(functionRuntimeInfo);
                }
            }
            catch (Exception ex) {
                log.info("{} Error re-starting function", (Object)fullyQualifiedInstanceId, (Object)ex);
                functionRuntimeInfo.setStartupException(ex);
                throw ex;
            }
        }
    }

    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(String tenant, String namespace, String functionName, int instanceId, URI uri) {
        String workerId;
        Function.Assignment assignment = this.runtimeFactory.externallyManaged() ? this.findAssignment(tenant, namespace, functionName, -1) : this.findAssignment(tenant, namespace, functionName, instanceId);
        if (assignment == null) {
            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
        }
        String assignedWorkerId = assignment.getWorkerId();
        if (assignedWorkerId.equals(workerId = this.workerConfig.getWorkerId())) {
            FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
            RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
            if (runtimeSpawner != null) {
                return WorkerUtils.getFunctionInstanceStats(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
            }
            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
        }
        List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
        WorkerInfo workerInfo = null;
        for (WorkerInfo entry : workerInfoList) {
            if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
            workerInfo = entry;
        }
        if (workerInfo == null) {
            return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
        }
        if (uri == null) {
            throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
        }
        URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
        throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
    }

    /*
     * Enabled aggressive block sorting
     */
    public FunctionStats getFunctionStats(String tenant, String namespace, String functionName, URI uri) throws PulsarAdminException {
        Collection<Function.Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);
        FunctionStats functionStats = new FunctionStats();
        if (assignments.isEmpty()) {
            return functionStats;
        }
        if (this.runtimeFactory.externallyManaged()) {
            Function.Assignment assignment = assignments.iterator().next();
            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
            if (isOwner) {
                int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
                for (int i = 0; i < parallelism; ++i) {
                    FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = this.getFunctionInstanceStats(tenant, namespace, functionName, i, null);
                    FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
                    functionInstanceStats.setInstanceId(i);
                    functionInstanceStats.setMetrics(functionInstanceStatsData);
                    functionStats.addInstance(functionInstanceStats);
                }
                return functionStats.calculateOverall();
            } else {
                List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    return functionStats;
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
            }
        }
        for (Function.Assignment assignment : assignments) {
            boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
            FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData = isOwner ? this.getFunctionInstanceStats(tenant, namespace, functionName, assignment.getInstance().getInstanceId(), null) : this.functionAdmin.functions().getFunctionStats(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
            FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
            functionInstanceStats.setInstanceId(assignment.getInstance().getInstanceId());
            functionInstanceStats.setMetrics(functionInstanceStatsData);
            functionStats.addInstance(functionInstanceStats);
        }
        return functionStats.calculateOverall();
    }

    public synchronized void processAssignmentMessage(Message<byte[]> msg) {
        if (msg.getData() == null || msg.getData().length == 0) {
            log.info("Received assignment delete: {}", (Object)msg.getKey());
            this.deleteAssignment(msg.getKey());
        } else {
            Function.Assignment assignment;
            try {
                assignment = Function.Assignment.parseFrom(msg.getData());
            }
            catch (IOException e) {
                log.error("[{}] Received bad assignment update at message {}", (Object)msg.getMessageId(), (Object)e);
                throw new RuntimeException(e);
            }
            log.info("Received assignment update: {}", (Object)assignment);
            this.processAssignment(assignment);
        }
    }

    public synchronized void processAssignment(Function.Assignment newAssignment) {
        boolean exists = false;
        for (Map<String, Function.Assignment> entry : this.workerIdToAssignments.values()) {
            if (!entry.containsKey(FunctionCommon.getFullyQualifiedInstanceId(newAssignment.getInstance()))) continue;
            exists = true;
        }
        if (exists) {
            this.updateAssignment(newAssignment);
        } else {
            this.addAssignment(newAssignment);
        }
    }

    private void updateAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
        Function.Assignment existingAssignment = this.findAssignment(assignment);
        if (!existingAssignment.equals(assignment)) {
            Function.Assignment existing_assignment;
            FunctionRuntimeInfo newFunctionRuntimeInfo;
            FunctionRuntimeInfo functionRuntimeInfo = this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
            if (this.runtimeFactory.externallyManaged()) {
                if (!assignment.getInstance().equals(existingAssignment.getInstance())) {
                    if (functionRuntimeInfo != null) {
                        this.conditionallyStopFunction(functionRuntimeInfo);
                    }
                    if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                        if (this.needsStart(assignment)) {
                            newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                            newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                            this.conditionallyStartFunction(newFunctionRuntimeInfo);
                            this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                        }
                    } else {
                        this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
                    }
                } else if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                    newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                    newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                    RuntimeSpawner runtimeSpawner = this.functionActioner.getRuntimeSpawner(assignment.getInstance(), assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
                    runtimeSpawner.getRuntime().reinitialize();
                    newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
                    this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                } else {
                    this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
                }
            } else {
                if (functionRuntimeInfo != null) {
                    this.conditionallyStopFunction(functionRuntimeInfo);
                }
                if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                    if (this.needsStart(assignment)) {
                        newFunctionRuntimeInfo = new FunctionRuntimeInfo();
                        newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                        this.conditionallyStartFunction(newFunctionRuntimeInfo);
                        this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
                    }
                } else {
                    this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
                }
            }
            if ((existing_assignment = this.findAssignment(assignment)) != null) {
                this.deleteAssignment(existing_assignment);
            }
            this.setAssignment(assignment);
        }
    }

    public synchronized void deleteAssignment(String fullyQualifiedInstanceId) {
        Map<String, Function.Assignment> worker;
        FunctionRuntimeInfo functionRuntimeInfo = this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo != null) {
            Function.FunctionDetails functionDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
            if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) {
                this.conditionallyStopFunction(functionRuntimeInfo);
            } else {
                FunctionInstanceId functionInstanceId = new FunctionInstanceId(fullyQualifiedInstanceId);
                String name = functionInstanceId.getName();
                String namespace = functionInstanceId.getNamespace();
                String tenant = functionInstanceId.getTenant();
                Collection<Function.Assignment> assignments = FunctionRuntimeManager.findFunctionAssignments(tenant, namespace, name, this.workerIdToAssignments);
                if (assignments.size() > 1) {
                    this.conditionallyStopFunction(functionRuntimeInfo);
                } else {
                    this.conditionallyTerminateFunction(functionRuntimeInfo);
                }
            }
            this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
        }
        String workerId = null;
        for (Map.Entry<String, Map<String, Function.Assignment>> workerAssignments : this.workerIdToAssignments.entrySet()) {
            if (workerAssignments.getValue().remove(fullyQualifiedInstanceId) == null) continue;
            workerId = workerAssignments.getKey();
            break;
        }
        if (workerId != null && (worker = this.workerIdToAssignments.get(workerId)) != null && worker.isEmpty()) {
            this.workerIdToAssignments.remove(workerId);
        }
    }

    @VisibleForTesting
    void deleteAssignment(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
        Map<String, Function.Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
        if (assignmentMap != null) {
            assignmentMap.remove(fullyQualifiedInstanceId);
            if (assignmentMap.isEmpty()) {
                this.workerIdToAssignments.remove(assignment.getWorkerId());
            }
        }
    }

    private void addAssignment(Function.Assignment assignment) {
        this.setAssignment(assignment);
        if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId()) && this.needsStart(assignment)) {
            this.startFunctionInstance(assignment);
        }
    }

    private void startFunctionInstance(Function.Assignment assignment) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
        FunctionRuntimeInfo functionRuntimeInfo = this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
        if (functionRuntimeInfo == null) {
            functionRuntimeInfo = new FunctionRuntimeInfo().setFunctionInstance(assignment.getInstance());
            this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo);
        } else {
            log.warn("Function {} already running. Going to restart function.", (Object)functionRuntimeInfo);
            this.conditionallyStopFunction(functionRuntimeInfo);
        }
        this.conditionallyStartFunction(functionRuntimeInfo);
    }

    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
        return this.functionRuntimeInfos.getAll();
    }

    private Function.Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            Map<String, Function.Assignment> assignmentMap = entry.getValue();
            Function.Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
            if (existingAssignment == null) continue;
            return existingAssignment;
        }
        return null;
    }

    private Function.Assignment findAssignment(Function.Assignment assignment) {
        return this.findAssignment(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
    }

    @VisibleForTesting
    void setAssignment(Function.Assignment assignment) {
        if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
            this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap());
        }
        this.workerIdToAssignments.get(assignment.getWorkerId()).put(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), assignment);
    }

    @Override
    public void close() throws Exception {
        this.stopAllOwnedFunctions();
        if (this.runtimeFactory != null) {
            this.runtimeFactory.close();
        }
    }

    public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        return this._getFunctionRuntimeInfo(fullyQualifiedInstanceId);
    }

    private FunctionRuntimeInfo _getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
        FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfos.get(fullyQualifiedInstanceId);
        if (functionRuntimeInfo == null && this.workerIdToAssignments.containsValue(this.workerConfig.getWorkerId()) && this.workerIdToAssignments.get(this.workerConfig.getWorkerId()).containsValue(fullyQualifiedInstanceId)) {
            log.error("Assignments and RuntimeInfos are inconsistent. FunctionRuntimeInfo missing for " + fullyQualifiedInstanceId);
        }
        return functionRuntimeInfo;
    }

    private boolean needsStart(Function.Assignment assignment) {
        boolean toStart = false;
        Function.FunctionMetaData functionMetaData = assignment.getInstance().getFunctionMetaData();
        if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
            toStart = true;
        } else if (assignment.getInstance().getInstanceId() < 0) {
            for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
                if (state != Function.FunctionState.RUNNING) continue;
                toStart = true;
            }
        } else if (functionMetaData.getInstanceStatesOrDefault(assignment.getInstance().getInstanceId(), Function.FunctionState.RUNNING) == Function.FunctionState.RUNNING) {
            toStart = true;
        }
        return toStart;
    }

    private void conditionallyStartFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.workerStatsManager.startInstanceProcessTimeStart();
            this.functionActioner.startFunction(functionRuntimeInfo);
            this.workerStatsManager.startInstanceProcessTimeEnd();
        }
    }

    private void conditionallyStopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.workerStatsManager.stopInstanceProcessTimeStart();
            this.functionActioner.stopFunction(functionRuntimeInfo);
            this.workerStatsManager.stopInstanceProcessTimeEnd();
        }
    }

    private void conditionallyTerminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        if (!this.isInitializePhase) {
            this.workerStatsManager.startInstanceProcessTimeStart();
            this.functionActioner.terminateFunction(functionRuntimeInfo);
            this.workerStatsManager.startInstanceProcessTimeEnd();
        }
    }

    public int getMyInstances() {
        Map<String, Function.Assignment> myAssignments = this.workerIdToAssignments.get(this.workerConfig.getWorkerId());
        return myAssignments == null ? 0 : myAssignments.size();
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public void setFunctionActioner(FunctionActioner functionActioner) {
        this.functionActioner = functionActioner;
    }

    public FunctionActioner getFunctionActioner() {
        return this.functionActioner;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public WorkerService getWorkerService() {
        return this.workerService;
    }

    public CompletableFuture<Void> getIsInitialized() {
        return this.isInitialized;
    }

    @VisibleForTesting
    class FunctionRuntimeInfos {
        private Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<String, FunctionRuntimeInfo>();

        FunctionRuntimeInfos() {
        }

        public FunctionRuntimeInfo get(String fullyQualifiedInstanceId) {
            return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
        }

        public void put(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
            if (!FunctionRuntimeManager.this.isInitializePhase) {
                this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
            }
        }

        public void remove(String fullyQualifiedInstanceId) {
            if (!FunctionRuntimeManager.this.isInitializePhase) {
                this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
            }
        }

        public Map<String, FunctionRuntimeInfo> getAll() {
            return this.functionRuntimeInfoMap;
        }

        public int size() {
            return this.functionRuntimeInfoMap.size();
        }
    }
}

