package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.worker.FunctionAction;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.javax.ws.rs.client.ClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionRuntimeManager.class */
public class FunctionRuntimeManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionRuntimeManager.class);

    @VisibleForTesting
    final WorkerConfig workerConfig;

    @VisibleForTesting
    LinkedBlockingQueue<FunctionAction> actionQueue;
    private final FunctionAssignmentTailer functionAssignmentTailer;
    private FunctionActioner functionActioner;
    private RuntimeFactory runtimeFactory;
    private MembershipManager membershipManager;
    private final ConnectorsManager connectorsManager;

    @VisibleForTesting
    Map<String, Map<String, Function.Assignment>> workerIdToAssignments = new ConcurrentHashMap();

    @VisibleForTesting
    Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap();
    private long currentAssignmentVersion = 0;

    public FunctionRuntimeManager(WorkerConfig workerConfig, PulsarClient pulsarClient, Namespace namespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception {
        this.workerConfig = workerConfig;
        this.connectorsManager = connectorsManager;
        this.functionAssignmentTailer = new FunctionAssignmentTailer(this, pulsarClient.newReader().topic(this.workerConfig.getFunctionAssignmentTopic()).startMessageId(MessageId.earliest).create());
        AuthenticationConfig build = AuthenticationConfig.builder().clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin()).clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters()).tlsTrustCertsFilePath(workerConfig.getTlsTrustCertsFilePath()).useTls(workerConfig.isUseTls()).tlsAllowInsecureConnection(workerConfig.isTlsAllowInsecureConnection()).tlsHostnameVerificationEnable(workerConfig.isTlsHostnameVerificationEnable()).build();
        if (workerConfig.getThreadContainerFactory() != null) {
            this.runtimeFactory = new ThreadRuntimeFactory(workerConfig.getThreadContainerFactory().getThreadGroupName(), workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), build);
        } else {
            if (workerConfig.getProcessContainerFactory() == null) {
                throw new RuntimeException("Either Thread or Process Container Factory need to be set");
            }
            this.runtimeFactory = new ProcessRuntimeFactory(workerConfig.getPulsarServiceUrl(), workerConfig.getStateStorageServiceUrl(), build, workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(), workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), workerConfig.getProcessContainerFactory().getLogDirectory());
        }
        this.actionQueue = new LinkedBlockingQueue<>();
        this.functionActioner = new FunctionActioner(this.workerConfig, this.runtimeFactory, namespace, this.actionQueue, connectorsManager);
        this.membershipManager = membershipManager;
    }

    public void start() {
        log.info("/** Starting Function Runtime Manager **/");
        log.info("Initialize metrics sink...");
        log.info("Starting function actioner...");
        this.functionActioner.start();
        log.info("Starting function assignment tailer...");
        this.functionAssignmentTailer.start();
    }

    public synchronized Map<String, Map<String, Function.Assignment>> getCurrentAssignments() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<String, Function.Assignment>> entry : this.workerIdToAssignments.entrySet()) {
            HashMap hashMap2 = new HashMap();
            hashMap2.putAll(entry.getValue());
            hashMap.put(entry.getKey(), hashMap2);
        }
        return hashMap;
    }

    public synchronized Function.Assignment findFunctionAssignment(String str, String str2, String str3, int i) {
        return findAssignment(str, str2, str3, i);
    }

    public synchronized Collection<Function.Assignment> findFunctionAssignments(String str, String str2, String str3) {
        return findFunctionAssignments(str, str2, str3, this.workerIdToAssignments);
    }

    public static Collection<Function.Assignment> findFunctionAssignments(String str, String str2, String str3, Map<String, Map<String, Function.Assignment>> map) {
        LinkedList linkedList = new LinkedList();
        Iterator<Map<String, Function.Assignment>> it = map.values().iterator();
        while (it.hasNext()) {
            linkedList.addAll((Collection) it.next().values().stream().filter(assignment -> {
                return str.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant()) && str2.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace()) && str3.equals(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName());
            }).collect(Collectors.toList()));
        }
        return linkedList;
    }

    public synchronized long getCurrentAssignmentVersion() {
        return new Long(this.currentAssignmentVersion).longValue();
    }

    public synchronized void removeAssignments(Collection<Function.Assignment> collection) {
        Iterator<Function.Assignment> it = collection.iterator();
        while (it.hasNext()) {
            deleteAssignment(it.next());
        }
    }

    public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String str, String str2, String str3, int i) {
        InstanceCommunication.FunctionStatus build;
        Function.Assignment findAssignment = findAssignment(str, str2, str3, i);
        String workerId = findAssignment.getWorkerId();
        String workerId2 = this.workerConfig.getWorkerId();
        if (findAssignment == null) {
            InstanceCommunication.FunctionStatus.Builder newBuilder = InstanceCommunication.FunctionStatus.newBuilder();
            newBuilder.setRunning(false);
            newBuilder.setFailureException("Function has not been scheduled");
            return newBuilder.build();
        }
        if (workerId.equals(workerId2)) {
            FunctionRuntimeInfo functionRuntimeInfo = getFunctionRuntimeInfo(Utils.getFullyQualifiedInstanceId(findAssignment.getInstance()));
            if (functionRuntimeInfo.getRuntimeSpawner() != null) {
                try {
                    InstanceCommunication.FunctionStatus.Builder newBuilder2 = InstanceCommunication.FunctionStatus.newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
                    newBuilder2.setWorkerId(workerId);
                    build = newBuilder2.build();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            } else {
                InstanceCommunication.FunctionStatus.Builder newBuilder3 = InstanceCommunication.FunctionStatus.newBuilder();
                newBuilder3.setRunning(false);
                newBuilder3.setInstanceId(String.valueOf(i));
                if (functionRuntimeInfo.getStartupException() != null) {
                    newBuilder3.setFailureException(functionRuntimeInfo.getStartupException().getMessage());
                }
                newBuilder3.setWorkerId(workerId);
                build = newBuilder3.build();
            }
        } else {
            WorkerInfo workerInfo = null;
            for (WorkerInfo workerInfo2 : this.membershipManager.getCurrentMembership()) {
                if (findAssignment.getWorkerId().equals(workerInfo2.getWorkerId())) {
                    workerInfo = workerInfo2;
                }
            }
            if (workerInfo == null) {
                InstanceCommunication.FunctionStatus.Builder newBuilder4 = InstanceCommunication.FunctionStatus.newBuilder();
                newBuilder4.setRunning(false);
                newBuilder4.setInstanceId(String.valueOf(i));
                newBuilder4.setFailureException("Function has not been scheduled");
                return newBuilder4.build();
            }
            String str4 = (String) ClientBuilder.newClient().target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status", workerInfo.getWorkerHostname(), Integer.valueOf(workerInfo.getPort()), str, str2, str3, Integer.valueOf(i))).request("text/plain").get(String.class);
            InstanceCommunication.FunctionStatus.Builder newBuilder5 = InstanceCommunication.FunctionStatus.newBuilder();
            try {
                org.apache.pulsar.functions.utils.Utils.mergeJson(str4, newBuilder5);
                newBuilder5.setWorkerId(workerId);
                build = newBuilder5.build();
            } catch (IOException e2) {
                log.warn("Got invalid function status response from {}", workerInfo, e2);
                throw new RuntimeException(e2);
            }
        }
        return build;
    }

    public InstanceCommunication.FunctionStatusList getAllFunctionStatus(String str, String str2, String str3) {
        Collection<Function.Assignment> findFunctionAssignments = findFunctionAssignments(str, str2, str3);
        InstanceCommunication.FunctionStatusList.Builder newBuilder = InstanceCommunication.FunctionStatusList.newBuilder();
        if (findFunctionAssignments.isEmpty()) {
            return newBuilder.build();
        }
        for (Function.Assignment assignment : findFunctionAssignments) {
            newBuilder.addFunctionStatusList(getFunctionInstanceStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId()));
        }
        return newBuilder.build();
    }

    public synchronized void processAssignmentUpdate(MessageId messageId, Request.AssignmentsUpdate assignmentsUpdate) {
        if (assignmentsUpdate.getVersion() <= this.currentAssignmentVersion) {
            log.debug("Received out of date assignment update: {}", assignmentsUpdate);
            return;
        }
        HashMap hashMap = new HashMap();
        for (Function.Assignment assignment : assignmentsUpdate.getAssignmentsList()) {
            hashMap.put(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), assignment);
        }
        HashMap hashMap2 = new HashMap();
        Iterator<Map<String, Function.Assignment>> it = this.workerIdToAssignments.values().iterator();
        while (it.hasNext()) {
            hashMap2.putAll(it.next());
        }
        Map<String, Function.Assignment> diff = diff(hashMap, hashMap2);
        Map<String, Function.Assignment> diff2 = diff(hashMap2, hashMap);
        Map<String, Function.Assignment> inCommon = inCommon(hashMap, hashMap2);
        for (Map.Entry<String, Function.Assignment> entry : diff.entrySet()) {
            String key = entry.getKey();
            Function.Assignment value = entry.getValue();
            setAssignment(value);
            if (value.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                if (this.functionRuntimeInfoMap.containsKey(key)) {
                    log.warn("Function {} already running. Going to restart function.", this.functionRuntimeInfoMap.get(key));
                    insertStopAction(this.functionRuntimeInfoMap.get(key));
                } else {
                    setFunctionRuntimeInfo(key, new FunctionRuntimeInfo().setFunctionInstance(value.getInstance()));
                }
                insertStartAction(this.functionRuntimeInfoMap.get(key));
            }
        }
        for (Map.Entry<String, Function.Assignment> entry2 : diff2.entrySet()) {
            String key2 = entry2.getKey();
            Function.Assignment value2 = entry2.getValue();
            FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(key2);
            if (functionRuntimeInfo != null) {
                insertStopAction(functionRuntimeInfo);
                deleteFunctionRuntimeInfo(key2);
            }
            deleteAssignment(value2);
        }
        for (Map.Entry<String, Function.Assignment> entry3 : inCommon.entrySet()) {
            String key3 = entry3.getKey();
            Function.Assignment value3 = entry3.getValue();
            if (!findAssignment(value3).equals(value3)) {
                FunctionRuntimeInfo functionRuntimeInfo2 = this.functionRuntimeInfoMap.get(key3);
                if (functionRuntimeInfo2 != null) {
                    insertStopAction(functionRuntimeInfo2);
                }
                if (value3.getWorkerId().equals(this.workerConfig.getWorkerId())) {
                    FunctionRuntimeInfo functionRuntimeInfo3 = new FunctionRuntimeInfo();
                    functionRuntimeInfo3.setFunctionInstance(value3.getInstance());
                    insertStartAction(functionRuntimeInfo3);
                    setFunctionRuntimeInfo(key3, functionRuntimeInfo3);
                }
                Function.Assignment findAssignment = findAssignment(value3);
                if (findAssignment != null) {
                    deleteAssignment(findAssignment);
                }
                setAssignment(value3);
            }
        }
        this.currentAssignmentVersion = assignmentsUpdate.getVersion();
    }

    public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
        return this.functionRuntimeInfoMap;
    }

    public void updateRates() {
        Runtime runtime;
        for (Map.Entry<String, FunctionRuntimeInfo> entry : this.functionRuntimeInfoMap.entrySet()) {
            RuntimeSpawner runtimeSpawner = entry.getValue().getRuntimeSpawner();
            if (runtimeSpawner != null && (runtime = runtimeSpawner.getRuntime()) != null) {
                try {
                    runtime.resetMetrics().get();
                } catch (Exception e) {
                    log.error("Failed to update stats for {}-{}", entry.getKey(), e.getMessage());
                }
            }
        }
    }

    @VisibleForTesting
    void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
        FunctionAction functionAction = new FunctionAction();
        functionAction.setAction(FunctionAction.Action.STOP);
        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
        try {
            this.actionQueue.put(functionAction);
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while putting action");
        }
    }

    @VisibleForTesting
    void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
        FunctionAction functionAction = new FunctionAction();
        functionAction.setAction(FunctionAction.Action.START);
        functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
        try {
            this.actionQueue.put(functionAction);
        } catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while putting action");
        }
    }

    private Function.Assignment findAssignment(String str, String str2, String str3, int i) {
        String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(str, str2, str3, i);
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = this.workerIdToAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Function.Assignment assignment = it.next().getValue().get(fullyQualifiedInstanceId);
            if (assignment != null) {
                return assignment;
            }
        }
        return null;
    }

    private Function.Assignment findAssignment(Function.Assignment assignment) {
        return findAssignment(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
    }

    @VisibleForTesting
    void setAssignment(Function.Assignment assignment) {
        if (!this.workerIdToAssignments.containsKey(assignment.getWorkerId())) {
            this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap());
        }
        this.workerIdToAssignments.get(assignment.getWorkerId()).put(Utils.getFullyQualifiedInstanceId(assignment.getInstance()), assignment);
    }

    @VisibleForTesting
    void deleteAssignment(Function.Assignment assignment) {
        Map<String, Function.Assignment> map = this.workerIdToAssignments.get(assignment.getWorkerId());
        if (map != null) {
            String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
            if (map.containsKey(fullyQualifiedInstanceId)) {
                map.remove(fullyQualifiedInstanceId);
            }
            if (map.isEmpty()) {
                this.workerIdToAssignments.remove(assignment.getWorkerId());
            }
        }
    }

    private void deleteFunctionRuntimeInfo(String str) {
        this.functionRuntimeInfoMap.remove(str);
    }

    private void setFunctionRuntimeInfo(String str, FunctionRuntimeInfo functionRuntimeInfo) {
        this.functionRuntimeInfoMap.put(str, functionRuntimeInfo);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.functionActioner.close();
        this.functionAssignmentTailer.close();
        if (this.runtimeFactory != null) {
            this.runtimeFactory.close();
        }
    }

    private Map<String, Function.Assignment> diff(Map<String, Function.Assignment> map, Map<String, Function.Assignment> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Function.Assignment> entry : map.entrySet()) {
            if (!map2.containsKey(entry.getKey())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    private Map<String, Function.Assignment> inCommon(Map<String, Function.Assignment> map, Map<String, Function.Assignment> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Function.Assignment> entry : map.entrySet()) {
            if (map2.containsKey(entry.getKey())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    private FunctionRuntimeInfo getFunctionRuntimeInfo(String str) {
        return this.functionRuntimeInfoMap.get(str);
    }
}
