/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway.local;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.local.ChangelogResult;
import org.apache.flink.table.client.gateway.local.DynamicResult;
import org.apache.flink.table.client.gateway.local.ExecutionContext;
import org.apache.flink.table.client.gateway.local.MaterializedResult;
import org.apache.flink.table.client.gateway.local.ResultStore;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalExecutor
implements Executor {
    private static final Logger LOG = LoggerFactory.getLogger(LocalExecutor.class);
    private static final String DEFAULT_ENV_FILE = "sql-client-defaults.yaml";
    private final Environment defaultEnvironment;
    private final List<URL> dependencies;
    private final Configuration flinkConfig;
    private final List<CustomCommandLine<?>> commandLines;
    private final Options commandLineOptions;
    private final ResultStore resultStore;
    private ExecutionContext<?> executionContext;

    public LocalExecutor(URL defaultEnv, List<URL> jars, List<URL> libraries) {
        String flinkConfigDir;
        try {
            flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
            this.flinkConfig = GlobalConfiguration.loadConfiguration((String)flinkConfigDir);
            try {
                FileSystem.initialize((Configuration)this.flinkConfig);
            }
            catch (IOException e2) {
                throw new SqlClientException("Error while setting the default filesystem scheme from configuration.", e2);
            }
            this.commandLines = CliFrontend.loadCustomCommandLines((Configuration)this.flinkConfig, (String)flinkConfigDir);
            this.commandLineOptions = LocalExecutor.collectCommandLineOptions(this.commandLines);
        }
        catch (Exception e3) {
            throw new SqlClientException("Could not load Flink configuration.", e3);
        }
        if (defaultEnv == null) {
            String defaultFilePath = flinkConfigDir + "/" + DEFAULT_ENV_FILE;
            System.out.println("No default environment specified.");
            System.out.print("Searching for '" + defaultFilePath + "'...");
            File file = new File(defaultFilePath);
            if (file.exists()) {
                System.out.println("found.");
                try {
                    defaultEnv = Path.fromLocalFile((File)file).toUri().toURL();
                }
                catch (MalformedURLException e4) {
                    throw new SqlClientException(e4);
                }
                LOG.info("Using default environment file: {}", (Object)defaultEnv);
            } else {
                System.out.println("not found.");
            }
        }
        if (defaultEnv != null) {
            System.out.println("Reading default environment from: " + defaultEnv);
            try {
                this.defaultEnvironment = Environment.parse(defaultEnv);
            }
            catch (IOException e5) {
                throw new SqlClientException("Could not read default environment file at: " + defaultEnv, e5);
            }
        } else {
            this.defaultEnvironment = new Environment();
        }
        this.dependencies = LocalExecutor.discoverDependencies(jars, libraries);
        this.resultStore = new ResultStore(this.flinkConfig);
    }

    public LocalExecutor(Environment defaultEnvironment, List<URL> dependencies, Configuration flinkConfig, CustomCommandLine<?> commandLine) {
        this.defaultEnvironment = defaultEnvironment;
        this.dependencies = dependencies;
        this.flinkConfig = flinkConfig;
        this.commandLines = Collections.singletonList(commandLine);
        this.commandLineOptions = LocalExecutor.collectCommandLineOptions(this.commandLines);
        this.resultStore = new ResultStore(flinkConfig);
    }

    @Override
    public void start() {
    }

    @Override
    public Map<String, String> getSessionProperties(SessionContext session) throws SqlExecutionException {
        Environment env = this.getOrCreateExecutionContext(session).getMergedEnvironment();
        HashMap<String, String> properties2 = new HashMap<String, String>();
        properties2.putAll(env.getExecution().toProperties());
        properties2.putAll(env.getDeployment().toProperties());
        return properties2;
    }

    @Override
    public List<String> listTables(SessionContext session) throws SqlExecutionException {
        TableEnvironment tableEnv = this.getOrCreateExecutionContext(session).createEnvironmentInstance().getTableEnvironment();
        return Arrays.asList(tableEnv.listTables());
    }

    @Override
    public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException {
        TableEnvironment tableEnv = this.getOrCreateExecutionContext(session).createEnvironmentInstance().getTableEnvironment();
        try {
            return tableEnv.scan(name).getSchema();
        }
        catch (Throwable t) {
            throw new SqlExecutionException("No table with this name could be found.", t);
        }
    }

    @Override
    public String explainStatement(SessionContext session, String statement) throws SqlExecutionException {
        TableEnvironment tableEnv = this.getOrCreateExecutionContext(session).createEnvironmentInstance().getTableEnvironment();
        try {
            Table table = this.createTable(tableEnv, statement);
            return tableEnv.explain(table);
        }
        catch (Throwable t) {
            throw new SqlExecutionException("Invalid SQL statement.", t);
        }
    }

    @Override
    public ResultDescriptor executeQuery(SessionContext session, String query) throws SqlExecutionException {
        ExecutionContext<?> context = this.getOrCreateExecutionContext(session);
        return this.executeQueryInternal(context, query);
    }

    @Override
    public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges(SessionContext session, String resultId) throws SqlExecutionException {
        DynamicResult result = this.resultStore.getResult(resultId);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
        }
        if (result.isMaterialized()) {
            throw new SqlExecutionException("Invalid result retrieval mode.");
        }
        return ((ChangelogResult)result).retrieveChanges();
    }

    @Override
    public TypedResult<Integer> snapshotResult(SessionContext session, String resultId, int pageSize) throws SqlExecutionException {
        DynamicResult result = this.resultStore.getResult(resultId);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
        }
        if (!result.isMaterialized()) {
            throw new SqlExecutionException("Invalid result retrieval mode.");
        }
        return ((MaterializedResult)result).snapshot(pageSize);
    }

    @Override
    public List<Row> retrieveResultPage(String resultId, int page) throws SqlExecutionException {
        DynamicResult result = this.resultStore.getResult(resultId);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
        }
        if (!result.isMaterialized()) {
            throw new SqlExecutionException("Invalid result retrieval mode.");
        }
        return ((MaterializedResult)result).retrievePage(page);
    }

    @Override
    public void cancelQuery(SessionContext session, String resultId) throws SqlExecutionException {
        ExecutionContext<?> context = this.getOrCreateExecutionContext(session);
        this.cancelQueryInternal(context, resultId);
    }

    @Override
    public void stop(SessionContext session) {
        this.resultStore.getResults().forEach(resultId -> {
            try {
                this.cancelQuery(session, (String)resultId);
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        });
    }

    private <T> void cancelQueryInternal(ExecutionContext<T> context, String resultId) {
        DynamicResult result = this.resultStore.getResult(resultId);
        if (result == null) {
            throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'.");
        }
        LOG.info("Cancelling job {} and result retrieval.", (Object)resultId);
        result.close();
        this.resultStore.removeResult(resultId);
        try (ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor();){
            ClusterClient clusterClient = null;
            try {
                clusterClient = clusterDescriptor.retrieve(context.getClusterId());
                try {
                    clusterClient.cancel(new JobID(StringUtils.hexStringToByte((String)resultId)));
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            catch (Exception e2) {
                throw new SqlExecutionException("Could not retrieve or create a cluster.", e2);
            }
            finally {
                try {
                    if (clusterClient != null) {
                        clusterClient.shutdown();
                    }
                }
                catch (Exception exception) {}
            }
        }
        catch (SqlExecutionException e3) {
            throw e3;
        }
        catch (Exception e4) {
            throw new SqlExecutionException("Could not locate a cluster.", e4);
        }
    }

    private <T> ResultDescriptor executeQueryInternal(ExecutionContext<T> context, String query) {
        JobGraph jobGraph;
        ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();
        Table table = this.createTable(envInst.getTableEnvironment(), query);
        DynamicResult result = this.resultStore.createResult(context.getMergedEnvironment(), table.getSchema().withoutTimeAttributes(), envInst.getExecutionConfig());
        String jobName = context.getSessionContext().getName() + ": " + query;
        try {
            table.writeToSink(result.getTableSink(), envInst.getQueryConfig());
            jobGraph = envInst.createJobGraph(jobName);
        }
        catch (Throwable t) {
            result.close();
            throw new SqlExecutionException("Invalid SQL statement.", t);
        }
        String resultId = jobGraph.getJobID().toString();
        this.resultStore.storeResult(resultId, result);
        Runnable program = () -> {
            LOG.info("Submitting job {} for query {}`", (Object)jobGraph.getJobID(), (Object)jobName);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Submitting job {} with the following environment: \n{}", (Object)jobGraph.getJobID(), (Object)context.getMergedEnvironment());
            }
            this.deployJob(context, jobGraph, result);
        };
        result.startRetrieval(program);
        return new ResultDescriptor(resultId, table.getSchema().withoutTimeAttributes(), result.isMaterialized());
    }

    private Table createTable(TableEnvironment tableEnv, String query) {
        try {
            return tableEnv.sqlQuery(query);
        }
        catch (Throwable t) {
            throw new SqlExecutionException("Invalid SQL statement.", t);
        }
    }

    private synchronized ExecutionContext<?> getOrCreateExecutionContext(SessionContext session) throws SqlExecutionException {
        if (this.executionContext == null || !this.executionContext.getSessionContext().equals(session)) {
            try {
                this.executionContext = new ExecutionContext(this.defaultEnvironment, session, this.dependencies, this.flinkConfig, this.commandLineOptions, this.commandLines);
            }
            catch (Throwable t) {
                throw new SqlExecutionException("Could not create execution context.", t);
            }
        }
        return this.executionContext;
    }

    private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
        try (ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor();){
            ClusterClient clusterClient = null;
            try {
                if (context.getClusterId() == null) {
                    clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
                    result.setClusterId(clusterClient.getClusterId());
                    ((JobResult)((RestClusterClient)clusterClient).requestJobResult(jobGraph.getJobID()).get()).toJobExecutionResult(context.getClassLoader());
                } else {
                    clusterClient = clusterDescriptor.retrieve(context.getClusterId());
                    result.setClusterId(clusterClient.getClusterId());
                    clusterClient.setDetached(false);
                    clusterClient.submitJob(jobGraph, context.getClassLoader());
                }
            }
            catch (Exception e2) {
                throw new SqlExecutionException("Could not retrieve or create a cluster.", e2);
            }
            finally {
                try {
                    if (clusterClient != null) {
                        clusterClient.shutdown();
                    }
                }
                catch (Exception exception) {}
            }
        }
        catch (SqlExecutionException e3) {
            throw e3;
        }
        catch (Exception e4) {
            throw new SqlExecutionException("Could not locate a cluster.", e4);
        }
    }

    private static List<URL> discoverDependencies(List<URL> jars, List<URL> libraries) {
        ArrayList<URL> dependencies = new ArrayList<URL>();
        try {
            for (URL url : jars) {
                JobWithJars.checkJarFile((URL)url);
                dependencies.add(url);
            }
            for (URL libUrl : libraries) {
                File dir = new File(libUrl.toURI());
                if (!dir.isDirectory()) {
                    throw new SqlClientException("Directory expected: " + dir);
                }
                if (!dir.canRead()) {
                    throw new SqlClientException("Directory cannot be read: " + dir);
                }
                File[] files = dir.listFiles();
                if (files == null) {
                    throw new SqlClientException("Directory cannot be read: " + dir);
                }
                for (File f : files) {
                    if (!f.isFile() || !f.getAbsolutePath().toLowerCase().endsWith(".jar")) continue;
                    URL url = f.toURI().toURL();
                    JobWithJars.checkJarFile((URL)url);
                    dependencies.add(url);
                }
            }
        }
        catch (Exception e2) {
            throw new SqlClientException("Could not load all required JAR files.", e2);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Using the following dependencies: {}", dependencies);
        }
        return dependencies;
    }

    private static Options collectCommandLineOptions(List<CustomCommandLine<?>> commandLines) {
        Options customOptions = new Options();
        for (CustomCommandLine<?> customCommandLine : commandLines) {
            customCommandLine.addRunOptions(customOptions);
        }
        return CliFrontendParser.mergeOptions((Options)CliFrontendParser.getRunCommandOptions(), (Options)customOptions);
    }
}

