/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime;

import com.google.protobuf.MessageOrBuilder;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RuntimeSpawner
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(RuntimeSpawner.class);
    private final InstanceConfig instanceConfig;
    private final RuntimeFactory runtimeFactory;
    private final String codeFile;
    private final String originalCodeFileName;
    private Runtime runtime;
    private ScheduledFuture processLivenessCheckTimer;
    private int numRestarts;
    private long instanceLivenessCheckFreqMs;
    private Throwable runtimeDeathException;

    public RuntimeSpawner(InstanceConfig instanceConfig, String codeFile, String originalCodeFileName, RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
        this.instanceConfig = instanceConfig;
        this.runtimeFactory = containerFactory;
        this.codeFile = codeFile;
        this.originalCodeFileName = originalCodeFileName;
        this.numRestarts = 0;
        this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
        try {
            this.runtime = this.runtimeFactory.createContainer(this.instanceConfig, codeFile, originalCodeFileName, instanceLivenessCheckFreqMs / 1000L);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void start() throws Exception {
        Function.FunctionDetails details = this.instanceConfig.getFunctionDetails();
        log.info("{}/{}/{}-{} RuntimeSpawner starting function", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), this.instanceConfig.getInstanceId()});
        this.runtime.start();
        if (!this.runtimeFactory.externallyManaged() && this.instanceLivenessCheckFreqMs > 0L) {
            this.processLivenessCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
                Runtime runtime = this.runtime;
                if (runtime != null && !runtime.isAlive()) {
                    log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), runtime.getDeathException()});
                    try {
                        runtime.stop();
                        this.runtimeDeathException = runtime.getDeathException();
                        runtime.start();
                    }
                    catch (Exception e) {
                        log.error("{}/{}/{}-{} Function Restart failed", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), e, e});
                    }
                    ++this.numRestarts;
                }
            }, this.instanceLivenessCheckFreqMs, this.instanceLivenessCheckFreqMs, TimeUnit.MILLISECONDS);
        }
    }

    public void join() throws Exception {
        if (null != this.runtime) {
            this.runtime.join();
        }
    }

    public CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId) {
        Runtime runtime = this.runtime;
        if (null == runtime) {
            return FutureUtil.failedFuture((Throwable)new IllegalStateException("Function runtime is not started yet"));
        }
        return runtime.getFunctionStatus(instanceId).thenApply(f -> {
            InstanceCommunication.FunctionStatus.Builder builder = InstanceCommunication.FunctionStatus.newBuilder();
            builder.mergeFrom(f).setNumRestarts((long)this.numRestarts).setInstanceId(String.valueOf(instanceId));
            if (!f.getRunning() && this.runtimeDeathException != null) {
                builder.setFailureException(this.runtimeDeathException.getMessage());
            }
            return builder.build();
        });
    }

    public CompletableFuture<String> getFunctionStatusAsJson(int instanceId) {
        return this.getFunctionStatus(instanceId).thenApply(msg -> {
            try {
                return FunctionCommon.printJson((MessageOrBuilder)msg);
            }
            catch (IOException e) {
                throw new RuntimeException(this.instanceConfig.getFunctionDetails().getName() + " Exception parsing getStatus", e);
            }
        });
    }

    @Override
    public void close() {
        if (this.processLivenessCheckTimer != null) {
            this.processLivenessCheckTimer.cancel(true);
            this.processLivenessCheckTimer = null;
        }
        if (null != this.runtime) {
            try {
                this.runtime.stop();
            }
            catch (Exception e) {
                log.warn("Failed to stop function runtime: {}", (Object)e, (Object)e);
            }
            this.runtime = null;
        }
    }

    public InstanceConfig getInstanceConfig() {
        return this.instanceConfig;
    }

    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    public Runtime getRuntime() {
        return this.runtime;
    }
}

