package org.apache.pulsar.functions.worker;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.FunctionAction;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.io.MoreFiles;
import org.apache.pulsar.shade.com.google.common.io.RecursiveDeleteOption;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.nar.NarClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/FunctionActioner.class */
public class FunctionActioner implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(FunctionActioner.class);
    private final WorkerConfig workerConfig;
    private final RuntimeFactory runtimeFactory;
    private final Namespace dlogNamespace;
    private LinkedBlockingQueue<FunctionAction> actionQueue;
    private volatile boolean running;
    private Thread actioner;
    private final ConnectorsManager connectorsManager;

    public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace namespace, LinkedBlockingQueue<FunctionAction> linkedBlockingQueue, ConnectorsManager connectorsManager) {
        this.workerConfig = workerConfig;
        this.runtimeFactory = runtimeFactory;
        this.dlogNamespace = namespace;
        this.actionQueue = linkedBlockingQueue;
        this.connectorsManager = connectorsManager;
        this.actioner = new Thread(() -> {
            log.info("Starting Actioner Thread...");
            while (this.running) {
                try {
                    FunctionAction functionAction = (FunctionAction) linkedBlockingQueue.poll(1L, TimeUnit.SECONDS);
                    if (functionAction != null) {
                        if (functionAction.getAction() == FunctionAction.Action.START) {
                            try {
                                startFunction(functionAction.getFunctionRuntimeInfo());
                            } catch (Exception e) {
                                log.info("Error starting function", e);
                                functionAction.getFunctionRuntimeInfo().setStartupException(e);
                            }
                        } else {
                            stopFunction(functionAction.getFunctionRuntimeInfo());
                        }
                    }
                } catch (InterruptedException e2) {
                }
            }
        });
        this.actioner.setName("FunctionActionerThread");
    }

    public void start() {
        this.running = true;
        this.actioner.start();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.running = false;
    }

    public void join() throws InterruptedException {
        this.actioner.join();
    }

    @VisibleForTesting
    protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
        File file;
        Function.FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
        int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
        Function.FunctionDetails.Builder newBuilder = Function.FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
        log.info("Starting function {} - {} ...", newBuilder.getName(), Integer.valueOf(instanceId));
        String packagePath = functionMetaData.getPackageLocation().getPackagePath();
        boolean isFunctionPackageUrlSupported = org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported(packagePath);
        if (isFunctionPackageUrlSupported && packagePath.startsWith(org.apache.pulsar.functions.utils.Utils.FILE)) {
            file = new File(new URL(packagePath).toURI());
        } else if (isFunctionCodeBuiltin(newBuilder)) {
            file = getBuiltinArchive(newBuilder);
        } else {
            File file2 = new File(this.workerConfig.getDownloadDirectory(), getDownloadPackagePath(functionMetaData, instanceId));
            file2.mkdirs();
            file = new File(file2, new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails())).getName());
            downloadFile(file, isFunctionPackageUrlSupported, functionMetaData, instanceId);
        }
        InstanceConfig instanceConfig = new InstanceConfig();
        instanceConfig.setFunctionDetails(newBuilder.build());
        instanceConfig.setFunctionId(UUID.randomUUID().toString());
        instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
        instanceConfig.setInstanceId(String.valueOf(instanceId));
        instanceConfig.setMaxBufferedTuples(1024);
        instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
        log.info("start process with instance config {}", instanceConfig);
        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, file.getAbsolutePath(), this.runtimeFactory, this.workerConfig.getInstanceLivenessCheckFreqMs());
        functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
        runtimeSpawner.start();
    }

    private void downloadFile(File file, boolean z, Function.FunctionMetaData functionMetaData, int i) throws FileNotFoundException, IOException {
        File file2;
        File parentFile = file.getParentFile();
        if (file.exists()) {
            log.warn("Function package exists already {} deleting it", file);
            file.delete();
        }
        while (true) {
            file2 = new File(parentFile, file.getName() + "." + i + "." + UUID.randomUUID().toString());
            if (!file2.exists() && file2.createNewFile()) {
                break;
            }
        }
        String packagePath = functionMetaData.getPackageLocation().getPackagePath();
        boolean z2 = z && packagePath.startsWith(org.apache.pulsar.functions.utils.Utils.HTTP);
        log.info("Function package file {} will be downloaded from {}", file2, z2 ? packagePath : functionMetaData.getPackageLocation());
        if (z2) {
            Utils.downloadFromHttpUrl(packagePath, new FileOutputStream(file2));
        } else {
            Utils.downloadFromBookkeeper(this.dlogNamespace, new FileOutputStream(file2), packagePath);
        }
        try {
            try {
                Files.createLink(Paths.get(file.toURI()), Paths.get(file2.toURI()));
                log.info("Function package file is linked from {} to {}", file2, file);
            } catch (FileAlreadyExistsException e) {
                log.warn("Function package has been downloaded from {} and saved at {}", functionMetaData.getPackageLocation(), file);
            }
        } finally {
            file2.delete();
        }
    }

    private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        Function.Instance functionInstance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = functionInstance.getFunctionMetaData();
        log.info("Stopping function {} - {}...", functionMetaData.getFunctionDetails().getName(), Integer.valueOf(functionInstance.getInstanceId()));
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        File file = new File(this.workerConfig.getDownloadDirectory(), getDownloadPackagePath(functionMetaData, functionInstance.getInstanceId()));
        if (file.exists()) {
            try {
                MoreFiles.deleteRecursively(Paths.get(file.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
            } catch (IOException e) {
                log.warn("Failed to delete package for function: {}", FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
            }
        }
    }

    private String getDownloadPackagePath(Function.FunctionMetaData functionMetaData, int i) {
        return StringUtils.join(new String[]{functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), Integer.toString(i)}, File.separatorChar);
    }

    public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetailsOrBuilder) {
        if (!functionDetailsOrBuilder.hasSource() || StringUtils.isEmpty(functionDetailsOrBuilder.getSource().getBuiltin())) {
            return functionDetailsOrBuilder.hasSink() && !StringUtils.isEmpty(functionDetailsOrBuilder.getSink().getBuiltin());
        }
        return true;
    }

    private File getBuiltinArchive(Function.FunctionDetails.Builder builder) throws IOException {
        if (builder.hasSource()) {
            Function.SourceSpec source = builder.getSource();
            if (!StringUtils.isEmpty(source.getBuiltin())) {
                File file = this.connectorsManager.getSourceArchive(source.getBuiltin()).toFile();
                String sourceClass = ConnectorUtils.getConnectorDefinition(file.toString()).getSourceClass();
                Function.SourceSpec.Builder newBuilder = Function.SourceSpec.newBuilder(builder.getSource());
                newBuilder.setClassName(sourceClass);
                builder.setSource(newBuilder);
                fillSourceTypeClass(builder, file, sourceClass);
                return file;
            }
        }
        if (builder.hasSink()) {
            Function.SinkSpec sink = builder.getSink();
            if (!StringUtils.isEmpty(sink.getBuiltin())) {
                File file2 = this.connectorsManager.getSinkArchive(sink.getBuiltin()).toFile();
                String sinkClass = ConnectorUtils.getConnectorDefinition(file2.toString()).getSinkClass();
                Function.SinkSpec.Builder newBuilder2 = Function.SinkSpec.newBuilder(builder.getSink());
                newBuilder2.setClassName(sinkClass);
                builder.setSink(newBuilder2);
                fillSinkTypeClass(builder, file2, sinkClass);
                return file2;
            }
        }
        throw new IOException("Could not find built in archive definition");
    }

    private void fillSourceTypeClass(Function.FunctionDetails.Builder builder, File file, String str) throws IOException {
        NarClassLoader fromArchive = NarClassLoader.getFromArchive(file, Collections.emptySet());
        Throwable th = null;
        try {
            try {
                String name = org.apache.pulsar.functions.utils.Utils.getSourceType(str, fromArchive).getName();
                Function.SourceSpec.Builder newBuilder = Function.SourceSpec.newBuilder(builder.getSource());
                newBuilder.setTypeClassName(name);
                builder.setSource(newBuilder);
                Function.SinkSpec sink = builder.getSink();
                if (null == sink || StringUtils.isEmpty(sink.getTypeClassName())) {
                    Function.SinkSpec.Builder newBuilder2 = Function.SinkSpec.newBuilder(sink);
                    newBuilder2.setTypeClassName(name);
                    builder.setSink(newBuilder2);
                }
                if (fromArchive != null) {
                    if (0 == 0) {
                        fromArchive.close();
                        return;
                    }
                    try {
                        fromArchive.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fromArchive != null) {
                if (th != null) {
                    try {
                        fromArchive.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fromArchive.close();
                }
            }
            throw th4;
        }
    }

    private void fillSinkTypeClass(Function.FunctionDetails.Builder builder, File file, String str) throws IOException {
        NarClassLoader fromArchive = NarClassLoader.getFromArchive(file, Collections.emptySet());
        Throwable th = null;
        try {
            try {
                String name = org.apache.pulsar.functions.utils.Utils.getSinkType(str, fromArchive).getName();
                Function.SinkSpec.Builder newBuilder = Function.SinkSpec.newBuilder(builder.getSink());
                newBuilder.setTypeClassName(name);
                builder.setSink(newBuilder);
                Function.SourceSpec source = builder.getSource();
                if (null == source || StringUtils.isEmpty(source.getTypeClassName())) {
                    Function.SourceSpec.Builder newBuilder2 = Function.SourceSpec.newBuilder(source);
                    newBuilder2.setTypeClassName(name);
                    builder.setSource(newBuilder2);
                }
                if (fromArchive != null) {
                    if (0 == 0) {
                        fromArchive.close();
                        return;
                    }
                    try {
                        fromArchive.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fromArchive != null) {
                if (th != null) {
                    try {
                        fromArchive.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fromArchive.close();
                }
            }
            throw th4;
        }
    }

    public void setActionQueue(LinkedBlockingQueue<FunctionAction> linkedBlockingQueue) {
        this.actionQueue = linkedBlockingQueue;
    }

    public void setRunning(boolean z) {
        this.running = z;
    }

    public void setActioner(Thread thread) {
        this.actioner = thread;
    }

    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public Namespace getDlogNamespace() {
        return this.dlogNamespace;
    }

    public LinkedBlockingQueue<FunctionAction> getActionQueue() {
        return this.actionQueue;
    }

    public boolean isRunning() {
        return this.running;
    }

    public Thread getActioner() {
        return this.actioner;
    }

    public ConnectorsManager getConnectorsManager() {
        return this.connectorsManager;
    }

    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof FunctionActioner)) {
            return false;
        }
        FunctionActioner functionActioner = (FunctionActioner) obj;
        if (!functionActioner.canEqual(this)) {
            return false;
        }
        WorkerConfig workerConfig = getWorkerConfig();
        WorkerConfig workerConfig2 = functionActioner.getWorkerConfig();
        if (workerConfig == null) {
            if (workerConfig2 != null) {
                return false;
            }
        } else if (!workerConfig.equals(workerConfig2)) {
            return false;
        }
        RuntimeFactory runtimeFactory = getRuntimeFactory();
        RuntimeFactory runtimeFactory2 = functionActioner.getRuntimeFactory();
        if (runtimeFactory == null) {
            if (runtimeFactory2 != null) {
                return false;
            }
        } else if (!runtimeFactory.equals(runtimeFactory2)) {
            return false;
        }
        Namespace dlogNamespace = getDlogNamespace();
        Namespace dlogNamespace2 = functionActioner.getDlogNamespace();
        if (dlogNamespace == null) {
            if (dlogNamespace2 != null) {
                return false;
            }
        } else if (!dlogNamespace.equals(dlogNamespace2)) {
            return false;
        }
        LinkedBlockingQueue<FunctionAction> actionQueue = getActionQueue();
        LinkedBlockingQueue<FunctionAction> actionQueue2 = functionActioner.getActionQueue();
        if (actionQueue == null) {
            if (actionQueue2 != null) {
                return false;
            }
        } else if (!actionQueue.equals(actionQueue2)) {
            return false;
        }
        if (isRunning() != functionActioner.isRunning()) {
            return false;
        }
        Thread actioner = getActioner();
        Thread actioner2 = functionActioner.getActioner();
        if (actioner == null) {
            if (actioner2 != null) {
                return false;
            }
        } else if (!actioner.equals(actioner2)) {
            return false;
        }
        ConnectorsManager connectorsManager = getConnectorsManager();
        ConnectorsManager connectorsManager2 = functionActioner.getConnectorsManager();
        return connectorsManager == null ? connectorsManager2 == null : connectorsManager.equals(connectorsManager2);
    }

    protected boolean canEqual(Object obj) {
        return obj instanceof FunctionActioner;
    }

    public int hashCode() {
        WorkerConfig workerConfig = getWorkerConfig();
        int hashCode = (1 * 59) + (workerConfig == null ? 43 : workerConfig.hashCode());
        RuntimeFactory runtimeFactory = getRuntimeFactory();
        int hashCode2 = (hashCode * 59) + (runtimeFactory == null ? 43 : runtimeFactory.hashCode());
        Namespace dlogNamespace = getDlogNamespace();
        int hashCode3 = (hashCode2 * 59) + (dlogNamespace == null ? 43 : dlogNamespace.hashCode());
        LinkedBlockingQueue<FunctionAction> actionQueue = getActionQueue();
        int hashCode4 = (((hashCode3 * 59) + (actionQueue == null ? 43 : actionQueue.hashCode())) * 59) + (isRunning() ? 79 : 97);
        Thread actioner = getActioner();
        int hashCode5 = (hashCode4 * 59) + (actioner == null ? 43 : actioner.hashCode());
        ConnectorsManager connectorsManager = getConnectorsManager();
        return (hashCode5 * 59) + (connectorsManager == null ? 43 : connectorsManager.hashCode());
    }

    public String toString() {
        return "FunctionActioner(workerConfig=" + getWorkerConfig() + ", runtimeFactory=" + getRuntimeFactory() + ", dlogNamespace=" + getDlogNamespace() + ", actionQueue=" + getActionQueue() + ", running=" + isRunning() + ", actioner=" + getActioner() + ", connectorsManager=" + getConnectorsManager() + ")";
    }
}
