package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.Gettable;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/AbstractFlinkService.class */
public abstract class AbstractFlinkService implements FlinkService {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
    private static final String EMPTY_JAR_FILENAME = "empty.jar";
    protected final KubernetesClient kubernetesClient;
    protected final FlinkConfigManager configManager;
    protected final ArtifactManager artifactManager;
    private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
    private final String emptyJar = createEmptyJar();

    public AbstractFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager flinkConfigManager) {
        this.kubernetesClient = kubernetesClient;
        this.configManager = flinkConfigManager;
        this.artifactManager = new ArtifactManager(flinkConfigManager);
    }

    protected abstract PodList getJmPodList(String str, String str2);

    protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) throws Exception;

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void submitApplicationCluster(JobSpec jobSpec, Configuration configuration, boolean z) throws Exception {
        LOG.info("Deploying application cluster{}", z ? " requiring last-state from HA metadata" : "");
        if (FlinkUtils.isKubernetesHAActivated(configuration)) {
            FlinkUtils.deleteJobGraphInKubernetesHA((String) configuration.get(KubernetesConfigOptions.CLUSTER_ID), (String) configuration.get(KubernetesConfigOptions.NAMESPACE), this.kubernetesClient);
        }
        if (z) {
            validateHaMetadataExists(configuration);
        }
        deployApplicationCluster(jobSpec, removeOperatorConfigs(configuration));
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public boolean isHaMetadataAvailable(Configuration configuration) {
        return FlinkUtils.isHaMetadataAvailable(configuration, this.kubernetesClient);
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public JobID submitJobToSessionCluster(ObjectMeta objectMeta, FlinkSessionJobSpec flinkSessionJobSpec, Configuration configuration, @Nullable String str) throws Exception {
        JobID generateSessionJobFixedJobID = FlinkUtils.generateSessionJobFixedJobID(objectMeta);
        runJar(flinkSessionJobSpec.getJob(), generateSessionJobFixedJobID, uploadJar(objectMeta, flinkSessionJobSpec, configuration), configuration, str);
        LOG.info("Submitted job: {} to session cluster.", generateSessionJobFixedJobID);
        return generateSessionJobFixedJobID;
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public boolean isJobManagerPortReady(Configuration configuration) {
        try {
            ClusterClient<String> clusterClient = getClusterClient(configuration);
            try {
                URI create = URI.create(clusterClient.getWebInterfaceURL());
                if (clusterClient != null) {
                    clusterClient.close();
                }
                InetSocketAddress inetSocketAddress = new InetSocketAddress(create.getHost(), create.getPort());
                Socket socket = new Socket();
                try {
                    socket.connect(inetSocketAddress, 1000);
                    socket.close();
                    return true;
                } catch (IOException e) {
                    return false;
                }
            } finally {
            }
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public Collection<JobStatusMessage> listJobs(Configuration configuration) throws Exception {
        RestClusterClient clusterClient = getClusterClient(configuration);
        try {
            Collection<JobStatusMessage> collection = (Collection) clusterClient.sendRequest(JobsOverviewHeaders.getInstance(), EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()).thenApply(AbstractFlinkService::toJobStatusMessage).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            if (clusterClient != null) {
                clusterClient.close();
            }
            return collection;
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public JobResult requestJobResult(Configuration configuration, JobID jobID) throws Exception {
        ClusterClient<String> clusterClient = getClusterClient(configuration);
        try {
            JobResult jobResult = (JobResult) clusterClient.requestJobResult(jobID).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            if (clusterClient != null) {
                clusterClient.close();
            }
            return jobResult;
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0043. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:19:0x01cb  */
    /* JADX WARN: Removed duplicated region for block: B:22:0x0213  */
    /* JADX WARN: Removed duplicated region for block: B:27:0x022c  */
    /* JADX WARN: Removed duplicated region for block: B:30:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void cancelJob(org.apache.flink.kubernetes.operator.crd.FlinkDeployment r9, org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode r10, org.apache.flink.configuration.Configuration r11, boolean r12) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 570
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(org.apache.flink.kubernetes.operator.crd.FlinkDeployment, org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode, org.apache.flink.configuration.Configuration, boolean):void");
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:4:0x0042. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0146  */
    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void cancelSessionJob(org.apache.flink.kubernetes.operator.crd.FlinkSessionJob r9, org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode r10, org.apache.flink.configuration.Configuration r11) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 389
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelSessionJob(org.apache.flink.kubernetes.operator.crd.FlinkSessionJob, org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode, org.apache.flink.configuration.Configuration):void");
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void triggerSavepoint(String str, SavepointTriggerType savepointTriggerType, SavepointInfo savepointInfo, Configuration configuration) throws Exception {
        LOG.info("Triggering new savepoint");
        RestClusterClient clusterClient = getClusterClient(configuration);
        try {
            SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
            SavepointTriggerMessageParameters unresolvedMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
            unresolvedMessageParameters.jobID.resolve(JobID.fromHexString(str));
            TriggerResponse triggerResponse = (TriggerResponse) clusterClient.sendRequest(savepointTriggerHeaders, unresolvedMessageParameters, new SavepointTriggerRequestBody((String) Preconditions.checkNotNull((String) configuration.get(CheckpointingOptions.SAVEPOINT_DIRECTORY)), false, ((FlinkVersion) configuration.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14) ? (SavepointFormatType) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE) : null, null)).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            LOG.info("Savepoint successfully triggered: " + triggerResponse.getTriggerId().toHexString());
            savepointInfo.setTrigger(triggerResponse.getTriggerId().toHexString(), savepointTriggerType);
            if (clusterClient != null) {
                clusterClient.close();
            }
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public Optional<Savepoint> getLastCheckpoint(JobID jobID, Configuration configuration) throws Exception {
        RestClusterClient clusterClient = getClusterClient(configuration);
        try {
            CustomCheckpointingStatisticsHeaders customCheckpointingStatisticsHeaders = CustomCheckpointingStatisticsHeaders.getInstance();
            JobMessageParameters m54getUnresolvedMessageParameters = customCheckpointingStatisticsHeaders.m54getUnresolvedMessageParameters();
            m54getUnresolvedMessageParameters.jobPathParameter.resolve(jobID);
            Optional<String> latestCheckpointPath = ((CheckpointHistoryWrapper) clusterClient.sendRequest(customCheckpointingStatisticsHeaders, m54getUnresolvedMessageParameters, EmptyRequestBody.getInstance()).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS)).getLatestCheckpointPath();
            if (latestCheckpointPath.isPresent() && latestCheckpointPath.get().equals("<checkpoint-not-externally-addressable>")) {
                throw new DeploymentFailedException("Latest checkpoint not externally addressable, manual recovery required.", "CheckpointNotFound");
            }
            Optional map = latestCheckpointPath.map(str -> {
                return Savepoint.of(str, SavepointTriggerType.UNKNOWN);
            });
            if (clusterClient != null) {
                clusterClient.close();
            }
            return map;
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void disposeSavepoint(String str, Configuration configuration) throws Exception {
        RestClusterClient clusterClient = getClusterClient(configuration);
        try {
            clusterClient.sendRequest(SavepointDisposalTriggerHeaders.getInstance(), EmptyMessageParameters.getInstance(), new SavepointDisposalRequest(str)).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
            if (clusterClient != null) {
                clusterClient.close();
            }
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public SavepointFetchResult fetchSavepointInfo(String str, String str2, Configuration configuration) {
        LOG.info("Fetching savepoint result with triggerId: " + str);
        try {
            RestClusterClient clusterClient = getClusterClient(configuration);
            try {
                SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
                SavepointStatusMessageParameters unresolvedMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters();
                unresolvedMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(str2));
                unresolvedMessageParameters.triggerIdPathParameter.resolve(TriggerId.fromHexString(str));
                CompletableFuture sendRequest = clusterClient.sendRequest(savepointStatusHeaders, unresolvedMessageParameters, EmptyRequestBody.getInstance());
                if (sendRequest.get() == null || ((AsynchronousOperationResult) sendRequest.get()).resource() == null) {
                    SavepointFetchResult pending = SavepointFetchResult.pending();
                    if (clusterClient != null) {
                        clusterClient.close();
                    }
                    return pending;
                }
                if (((org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo) ((AsynchronousOperationResult) sendRequest.get()).resource()).getLocation() != null) {
                    String location = ((org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo) ((AsynchronousOperationResult) sendRequest.get()).resource()).getLocation();
                    LOG.info("Savepoint result: {}", location);
                    SavepointFetchResult completed = SavepointFetchResult.completed(location);
                    if (clusterClient != null) {
                        clusterClient.close();
                    }
                    return completed;
                }
                if (((org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo) ((AsynchronousOperationResult) sendRequest.get()).resource()).getFailureCause() == null) {
                    SavepointFetchResult pending2 = SavepointFetchResult.pending();
                    if (clusterClient != null) {
                        clusterClient.close();
                    }
                    return pending2;
                }
                LOG.error("Failure occurred while fetching the savepoint result", ((org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo) ((AsynchronousOperationResult) sendRequest.get()).resource()).getFailureCause());
                SavepointFetchResult error = SavepointFetchResult.error(((org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo) ((AsynchronousOperationResult) sendRequest.get()).resource()).getFailureCause().toString());
                if (clusterClient != null) {
                    clusterClient.close();
                }
                return error;
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Exception while fetching the savepoint result", e);
            return SavepointFetchResult.error(e.getMessage());
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public Map<String, String> getClusterInfo(Configuration configuration) throws Exception {
        HashMap hashMap = new HashMap();
        RestClusterClient clusterClient = getClusterClient(configuration);
        try {
            CustomDashboardConfiguration customDashboardConfiguration = (CustomDashboardConfiguration) clusterClient.sendRequest(CustomDashboardConfigurationHeaders.getInstance(), EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance()).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
            hashMap.put(CustomDashboardConfiguration.FIELD_NAME_FLINK_VERSION, customDashboardConfiguration.getFlinkVersion());
            hashMap.put(CustomDashboardConfiguration.FIELD_NAME_FLINK_REVISION, customDashboardConfiguration.getFlinkRevision());
            if (clusterClient != null) {
                clusterClient.close();
            }
            return hashMap;
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public PodList getJmPodList(FlinkDeployment flinkDeployment, Configuration configuration) {
        String string = configuration.getString(KubernetesConfigOptions.NAMESPACE);
        try {
            ClusterClient<String> clusterClient = getClusterClient(configuration);
            try {
                String str = (String) clusterClient.getClusterId();
                if (clusterClient != null) {
                    clusterClient.close();
                }
                return getJmPodList(string, str);
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.kubernetes.operator.service.FlinkService
    public void waitForClusterShutdown(Configuration configuration) {
        waitForClusterShutdown(configuration.getString(KubernetesConfigOptions.NAMESPACE), configuration.getString(KubernetesConfigOptions.CLUSTER_ID), this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
    }

    @VisibleForTesting
    protected ClusterClient<String> getClusterClient(Configuration configuration) throws Exception {
        String str = (String) configuration.get(KubernetesConfigOptions.CLUSTER_ID);
        String format = String.format("http://%s:%s", (String) ObjectUtils.firstNonNull(new String[]{this.configManager.getOperatorConfiguration().getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName(str, (String) configuration.get(KubernetesConfigOptions.NAMESPACE))}), Integer.valueOf(configuration.getInteger(RestOptions.PORT)));
        LOG.debug("Creating RestClusterClient({})", format);
        return new RestClusterClient(configuration, str, (configuration2, fatalErrorHandler) -> {
            return new StandaloneClientHAServices(format);
        });
    }

    private JarRunResponseBody runJar(JobSpec jobSpec, JobID jobID, JarUploadResponseBody jarUploadResponseBody, Configuration configuration, String str) {
        String substring = jarUploadResponseBody.getFilename().substring(jarUploadResponseBody.getFilename().lastIndexOf("/") + 1);
        try {
            try {
                RestClusterClient clusterClient = getClusterClient(configuration);
                try {
                    JarRunHeaders jarRunHeaders = JarRunHeaders.getInstance();
                    JarRunMessageParameters unresolvedMessageParameters = jarRunHeaders.getUnresolvedMessageParameters();
                    unresolvedMessageParameters.jarIdPathParameter.resolve(substring);
                    JarRunRequestBody jarRunRequestBody = new JarRunRequestBody(jobSpec.getEntryClass(), (String) null, jobSpec.getArgs() == null ? null : Arrays.asList(jobSpec.getArgs()), jobSpec.getParallelism() > 0 ? Integer.valueOf(jobSpec.getParallelism()) : null, jobID, jobSpec.getAllowNonRestoredState(), str, ((FlinkVersion) configuration.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14) ? RestoreMode.DEFAULT : null);
                    LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
                    JarRunResponseBody jarRunResponseBody = (JarRunResponseBody) clusterClient.sendRequest(jarRunHeaders, unresolvedMessageParameters, jarRunRequestBody).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
                    if (clusterClient != null) {
                        clusterClient.close();
                    }
                    return jarRunResponseBody;
                } catch (Throwable th) {
                    if (clusterClient != null) {
                        try {
                            clusterClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
                deleteJar(configuration, substring);
            }
        } catch (Exception e) {
            LOG.error("Failed to submit job to session cluster.", e);
            throw new FlinkRuntimeException(e);
        }
    }

    private JarUploadResponseBody uploadJar(ObjectMeta objectMeta, FlinkSessionJobSpec flinkSessionJobSpec, Configuration configuration) throws Exception {
        File fetch = this.artifactManager.fetch(findJarURI(flinkSessionJobSpec.getJob()), configuration, this.artifactManager.generateJarDir(objectMeta, flinkSessionJobSpec));
        Preconditions.checkArgument(fetch.exists(), String.format("The jar file %s not exists", fetch.getAbsolutePath()));
        JarUploadHeaders jarUploadHeaders = JarUploadHeaders.getInstance();
        String deploymentName = flinkSessionJobSpec.getDeploymentName();
        String namespace = objectMeta.getNamespace();
        int integer = configuration.getInteger(RestOptions.PORT);
        String str = (String) ObjectUtils.firstNonNull(new String[]{this.configManager.getOperatorConfiguration().getFlinkServiceHostOverride(), ExternalServiceDecorator.getNamespacedExternalServiceName(deploymentName, namespace)});
        try {
            RestClient restClient = new RestClient(configuration, this.executorService);
            try {
                JarUploadResponseBody jarUploadResponseBody = (JarUploadResponseBody) restClient.sendRequest(str, integer, jarUploadHeaders, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.singletonList(new FileUpload(fetch.toPath(), "application/java-archive"))).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
                restClient.close();
                LOG.debug("Deleting the jar file {}", fetch);
                FileUtils.deleteFileOrDirectory(fetch);
                return jarUploadResponseBody;
            } finally {
            }
        } catch (Throwable th) {
            LOG.debug("Deleting the jar file {}", fetch);
            FileUtils.deleteFileOrDirectory(fetch);
            throw th;
        }
    }

    private String findJarURI(JobSpec jobSpec) {
        return jobSpec.getJarURI() != null ? jobSpec.getJarURI() : this.emptyJar;
    }

    private void deleteJar(Configuration configuration, String str) {
        LOG.debug("Deleting the jar: {}", str);
        try {
            RestClusterClient clusterClient = getClusterClient(configuration);
            try {
                JarDeleteHeaders jarDeleteHeaders = JarDeleteHeaders.getInstance();
                JarDeleteMessageParameters unresolvedMessageParameters = jarDeleteHeaders.getUnresolvedMessageParameters();
                unresolvedMessageParameters.jarIdPathParameter.resolve(str);
                clusterClient.sendRequest(jarDeleteHeaders, unresolvedMessageParameters, EmptyRequestBody.getInstance()).get(this.configManager.getOperatorConfiguration().getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
                if (clusterClient != null) {
                    clusterClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Failed to delete the jar: {}.", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void waitForClusterShutdown(String str, String str2, long j) {
        PodList jmPodList;
        boolean z = true;
        boolean z2 = true;
        for (int i = 0; i < j; i++) {
            if (z && ((jmPodList = getJmPodList(str, str2)) == null || jmPodList.getItems().isEmpty())) {
                z = false;
            }
            if (z2 && ((Service) ((Gettable) ((ServiceResource) ((NonNamespaceOperation) this.kubernetesClient.services().inNamespace(str)).withName(ExternalServiceDecorator.getExternalServiceName(str2))).fromServer()).get()) == null) {
                z2 = false;
            }
            if (!z && !z2) {
                break;
            }
            if ((i + 1) % 5 == 0) {
                LOG.info("Waiting for cluster shutdown... ({}s)", Integer.valueOf(i + 1));
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        LOG.info("Cluster shutdown completed.");
    }

    private static List<JobStatusMessage> toJobStatusMessage(MultipleJobsDetails multipleJobsDetails) {
        return (List) multipleJobsDetails.getJobs().stream().map(jobDetails -> {
            return new JobStatusMessage(jobDetails.getJobId(), jobDetails.getJobName(), getEffectiveStatus(jobDetails), jobDetails.getStartTime());
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public static Configuration removeOperatorConfigs(Configuration configuration) {
        Configuration configuration2 = new Configuration();
        configuration.toMap().forEach((str, str2) -> {
            if (str.startsWith(KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX)) {
                return;
            }
            configuration2.setString(str, str2);
        });
        return configuration2;
    }

    @VisibleForTesting
    protected static JobStatus getEffectiveStatus(JobDetails jobDetails) {
        boolean z = jobDetails.getNumTasks() == jobDetails.getTasksPerState()[ExecutionState.RUNNING.ordinal()] + jobDetails.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
        JobStatus status = jobDetails.getStatus();
        if (JobStatus.RUNNING.equals(status) && !z) {
            status = JobStatus.CREATED;
            LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, status);
        }
        return status;
    }

    private void validateHaMetadataExists(Configuration configuration) {
        if (!isHaMetadataAvailable(configuration)) {
            throw new DeploymentFailedException("HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "RestoreFailed");
        }
    }

    private String createEmptyJar() {
        try {
            String str = Files.createTempDirectory("flink", new FileAttribute[0]).toString() + "/empty.jar";
            LOG.debug("Creating empty jar to {}", str);
            new JarOutputStream(new FileOutputStream(str), new Manifest()).close();
            return str;
        } catch (Exception e) {
            throw new RuntimeException("Failed to create empty jar", e);
        }
    }
}
