package org.apache.linkis.engineconnplugin.flink.client.deployment;

import java.io.Closeable;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.linkis.common.conf.TimeType;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;
import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.class */
public abstract class ClusterDescriptorAdapter implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterDescriptorAdapter.class);
    public static final long CLIENT_REQUEST_TIMEOUT = ((TimeType) FlinkEnvConfiguration.FLINK_CLIENT_REQUEST_TIMEOUT().getValue()).toLong();
    protected final ExecutionContext executionContext;
    private JobID jobId;
    protected ApplicationId clusterID;
    protected ClusterClient<ApplicationId> clusterClient;
    private YarnClusterDescriptor clusterDescriptor;
    protected String webInterfaceUrl;

    public void setJobId(JobID jobID) {
        this.jobId = jobID;
    }

    public JobID getJobId() {
        return this.jobId;
    }

    public ApplicationId getClusterID() {
        return this.clusterID;
    }

    public String getWebInterfaceUrl() {
        return this.webInterfaceUrl;
    }

    public ClusterDescriptorAdapter(ExecutionContext executionContext) {
        this.executionContext = executionContext;
    }

    public JobStatus getJobStatus() throws JobExecutionException {
        if (this.jobId == null) {
            throw new JobExecutionException("No job has been submitted. This is a bug.");
        }
        return (JobStatus) bridgeClientRequest(this.executionContext, this.jobId, () -> {
            return this.clusterClient.getJobStatus(this.jobId);
        }, false);
    }

    public void cancelJob() throws JobExecutionException {
        if (this.jobId == null) {
            LOG.info("No job has been submitted, ignore the method of cancelJob.");
        } else {
            LOG.info("Start to cancel job {}.", this.jobId);
            bridgeClientRequest(this.executionContext, this.jobId, () -> {
                return this.clusterClient.cancel(this.jobId);
            }, true);
        }
    }

    public String doSavepoint(String str, String str2) throws JobExecutionException {
        Supplier supplier;
        LOG.info("try to {} savepoint in path {}.", str2, str);
        boolean z = -1;
        switch (str2.hashCode()) {
            case -1367724422:
                if (str2.equals("cancel")) {
                    z = true;
                    break;
                }
                break;
            case -1059891784:
                if (str2.equals("trigger")) {
                    z = false;
                    break;
                }
                break;
            case 3540994:
                if (str2.equals("stop")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                supplier = () -> {
                    return this.clusterClient.triggerSavepoint(this.jobId, str);
                };
                break;
            case true:
                supplier = () -> {
                    return this.clusterClient.cancelWithSavepoint(this.jobId, str);
                };
                break;
            case true:
                supplier = () -> {
                    return this.clusterClient.stopWithSavepoint(this.jobId, false, str);
                };
                break;
            default:
                throw new JobExecutionException("not supported savepoint operator mode " + str2);
        }
        return (String) bridgeClientRequest(this.executionContext, this.jobId, supplier, false);
    }

    protected <R> R bridgeClientRequest(ExecutionContext executionContext, JobID jobID, Supplier<CompletableFuture<R>> supplier, boolean z) throws JobExecutionException {
        if (this.clusterClient == null) {
            if (this.clusterID == null) {
                LOG.error("Cluster information don't exist.");
                throw new JobExecutionException("Cluster information don't exist.");
            }
            this.clusterDescriptor = executionContext.createClusterDescriptor();
            try {
                this.clusterClient = this.clusterDescriptor.retrieve(this.clusterID).getClusterClient();
            } catch (ClusterRetrieveException e) {
                LOG.error(String.format("Job: %s could not retrieve or create a cluster.", jobID), e);
                throw new JobExecutionException(String.format("Job: %s could not retrieve or create a cluster.", jobID), e);
            }
        }
        try {
            return supplier.get().get(CLIENT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
        } catch (Exception e2) {
            if (z) {
                return null;
            }
            LOG.error(String.format("Job: %s operation failed!", jobID), e2);
            throw new JobExecutionException(String.format("Job: %s operation failed!", jobID), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void bindApplicationId() throws JobExecutionException {
        try {
            Method declaredMethod = StreamExecutionEnvironment.class.getDeclaredMethod("getConfiguration", new Class[0]);
            declaredMethod.setAccessible(true);
            try {
                Configuration configuration = (Configuration) declaredMethod.invoke(this.executionContext.getStreamExecutionEnvironment(), new Object[0]);
                String string = configuration.getString(YarnConfigOptions.APPLICATION_ID);
                if (StringUtils.isNotBlank(string)) {
                    LOG.info("The applicationId {} is exists in StreamExecutionEnvironment, ignore to bind applicationId to StreamExecutionEnvironment.", string);
                    return;
                }
                String string2 = this.executionContext.getFlinkConfig().getString(YarnConfigOptions.APPLICATION_ID);
                if (StringUtils.isBlank(string2) && this.clusterID == null) {
                    throw new JobExecutionException("No applicationId is exists!");
                }
                if (StringUtils.isNotBlank(string2)) {
                    configuration.setString(YarnConfigOptions.APPLICATION_ID, string2);
                    LOG.info("Bind applicationId {} to StreamExecutionEnvironment.", string2);
                } else {
                    configuration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(this.clusterID));
                    LOG.info("Bind applicationId {} to StreamExecutionEnvironment.", this.clusterID);
                }
            } catch (Exception e) {
                throw new JobExecutionException("StreamExecutionEnvironment.getConfiguration() execute failed!", e);
            }
        } catch (NoSuchMethodException e2) {
            throw new JobExecutionException("Not support flink version, StreamExecutionEnvironment.class is not exists getConfiguration method!", e2);
        }
    }

    public String toString() {
        return "ClusterDescriptorAdapter{jobId=" + this.jobId + ", clusterID=" + this.clusterID + '}';
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.clusterClient != null) {
            this.clusterClient.shutDownCluster();
            this.clusterClient.close();
        }
        if (this.clusterDescriptor != null) {
            this.clusterDescriptor.close();
        }
    }

    public abstract boolean isGloballyTerminalState() throws JobExecutionException;
}
