/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway.local;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.ExecutionContext;
import org.apache.flink.table.client.gateway.local.result.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProgramDeployer<C>
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ProgramDeployer.class);
    private final ExecutionContext<C> context;
    private final JobGraph jobGraph;
    private final String jobName;
    private final Result<C> result;
    private final boolean awaitJobResult;
    private final BlockingQueue<JobExecutionResult> executionResultBucket;

    public ProgramDeployer(ExecutionContext<C> context, String jobName, JobGraph jobGraph, Result<C> result, boolean awaitJobResult) {
        this.context = context;
        this.jobGraph = jobGraph;
        this.jobName = jobName;
        this.result = result;
        this.awaitJobResult = awaitJobResult;
        this.executionResultBucket = new LinkedBlockingDeque<JobExecutionResult>(1);
    }

    @Override
    public void run() {
        LOG.info("Submitting job {} for query {}`", (Object)this.jobGraph.getJobID(), (Object)this.jobName);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Submitting job {} with the following environment: \n{}", (Object)this.jobGraph.getJobID(), (Object)this.context.getMergedEnvironment());
        }
        this.deployJob(this.context, this.jobGraph, this.result);
    }

    public JobExecutionResult fetchExecutionResult() {
        return (JobExecutionResult)this.executionResultBucket.poll();
    }

    private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
        try (ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor();){
            try {
                if (context.getClusterId() == null) {
                    this.deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
                } else {
                    this.deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
                }
            }
            catch (Exception e2) {
                throw new SqlExecutionException("Could not retrieve or create a cluster.", e2);
            }
        }
        catch (SqlExecutionException e3) {
            throw e3;
        }
        catch (Exception e4) {
            throw new SqlExecutionException("Could not locate a cluster.", e4);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void deployJobOnNewCluster(ClusterDescriptor<T> clusterDescriptor, JobGraph jobGraph, Result<T> result, ClassLoader classLoader) throws Exception {
        ClusterClient clusterClient = null;
        try {
            clusterClient = clusterDescriptor.deployJobCluster(this.context.getClusterSpec(), jobGraph, false);
            result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
            if (this.awaitJobResult) {
                JobExecutionResult jobResult = ((JobResult)((RestClusterClient)clusterClient).requestJobResult(jobGraph.getJobID()).get()).toJobExecutionResult(this.context.getClassLoader());
                this.executionResultBucket.add(jobResult);
            }
        }
        finally {
            try {
                if (clusterClient != null) {
                    clusterClient.shutdown();
                }
            }
            catch (Exception exception) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void deployJobOnExistingCluster(T clusterId, ClusterDescriptor<T> clusterDescriptor, JobGraph jobGraph, Result<T> result) throws Exception {
        ClusterClient clusterClient = null;
        try {
            String webInterfaceUrl;
            clusterClient = clusterDescriptor.retrieve(clusterId);
            try {
                webInterfaceUrl = clusterClient.getWebInterfaceURL();
            }
            catch (Exception e2) {
                webInterfaceUrl = "N/A";
            }
            result.setClusterInformation(clusterClient.getClusterId(), webInterfaceUrl);
            if (this.awaitJobResult) {
                clusterClient.setDetached(false);
                JobExecutionResult jobResult = clusterClient.submitJob(jobGraph, this.context.getClassLoader()).getJobExecutionResult();
                this.executionResultBucket.add(jobResult);
            } else {
                clusterClient.setDetached(true);
                clusterClient.submitJob(jobGraph, this.context.getClassLoader());
            }
        }
        finally {
            try {
                if (clusterClient != null) {
                    clusterClient.shutdown();
                }
            }
            catch (Exception exception) {}
        }
    }
}

