package org.apache.flink.table.client.gateway.local;

import java.net.URL;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.BatchQueryConfig;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.QueryConfig;
import org.apache.flink.table.api.StreamQueryConfig;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.DeploymentEntry;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.SinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceTableEntry;
import org.apache.flink.table.client.config.entries.TemporalTableEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;

/* loaded from: input_file:org/apache/flink/table/client/gateway/local/ExecutionContext.class */
public class ExecutionContext<T> {
    private final SessionContext sessionContext;
    private final Environment mergedEnv;
    private final List<URL> dependencies;
    private final ClassLoader classLoader;
    private final Map<String, Catalog> catalogs = new LinkedHashMap();
    private final Map<String, TableSource<?>> tableSources;
    private final Map<String, TableSink<?>> tableSinks;
    private final Map<String, UserDefinedFunction> functions;
    private final Configuration flinkConfig;
    private final CommandLine commandLine;
    private final CustomCommandLine<T> activeCommandLine;
    private final RunOptions runOptions;
    private final T clusterId;
    private final ClusterSpecification clusterSpec;

    /* loaded from: input_file:org/apache/flink/table/client/gateway/local/ExecutionContext$EnvironmentInstance.class */
    public class EnvironmentInstance {
        private final QueryConfig queryConfig;
        private final ExecutionEnvironment execEnv;
        private final StreamExecutionEnvironment streamExecEnv;
        private final Executor executor;
        private final TableEnvironment tableEnv;

        private EnvironmentInstance() {
            EnvironmentSettings environmentSettings = ExecutionContext.this.mergedEnv.getExecution().getEnvironmentSettings();
            if (ExecutionContext.this.mergedEnv.getExecution().isStreamingPlanner()) {
                this.streamExecEnv = createStreamExecutionEnvironment();
                this.execEnv = null;
                this.executor = ExecutionContext.lookupExecutor(environmentSettings.toExecutorProperties(), this.streamExecEnv);
                this.tableEnv = ExecutionContext.createStreamTableEnvironment(this.streamExecEnv, environmentSettings, this.executor);
            } else {
                if (!ExecutionContext.this.mergedEnv.getExecution().isBatchPlanner()) {
                    throw new SqlExecutionException("Unsupported execution type specified.");
                }
                this.streamExecEnv = null;
                this.execEnv = createExecutionEnvironment();
                this.executor = null;
                this.tableEnv = BatchTableEnvironment.create(this.execEnv);
            }
            ExecutionContext.this.mergedEnv.getConfiguration().asMap().forEach((str, str2) -> {
                this.tableEnv.getConfig().getConfiguration().setString(str, str2);
            });
            Map map = ExecutionContext.this.catalogs;
            TableEnvironment tableEnvironment = this.tableEnv;
            tableEnvironment.getClass();
            map.forEach(tableEnvironment::registerCatalog);
            this.queryConfig = createQueryConfig();
            Map map2 = ExecutionContext.this.tableSources;
            TableEnvironment tableEnvironment2 = this.tableEnv;
            tableEnvironment2.getClass();
            map2.forEach(tableEnvironment2::registerTableSource);
            Map map3 = ExecutionContext.this.tableSinks;
            TableEnvironment tableEnvironment3 = this.tableEnv;
            tableEnvironment3.getClass();
            map3.forEach(tableEnvironment3::registerTableSink);
            registerFunctions();
            ExecutionContext.this.mergedEnv.getTables().forEach((str3, tableEntry) -> {
                if (tableEntry instanceof ViewEntry) {
                    registerView((ViewEntry) tableEntry);
                } else if (tableEntry instanceof TemporalTableEntry) {
                    registerTemporalTable((TemporalTableEntry) tableEntry);
                }
            });
            if (ExecutionContext.this.sessionContext.getCurrentCatalog().isPresent()) {
                this.tableEnv.useCatalog(ExecutionContext.this.sessionContext.getCurrentCatalog().get());
            } else if (ExecutionContext.this.mergedEnv.getExecution().getCurrentCatalog().isPresent()) {
                this.tableEnv.useCatalog(ExecutionContext.this.mergedEnv.getExecution().getCurrentCatalog().get());
            }
            if (ExecutionContext.this.sessionContext.getCurrentDatabase().isPresent()) {
                this.tableEnv.useDatabase(ExecutionContext.this.sessionContext.getCurrentDatabase().get());
            } else if (ExecutionContext.this.mergedEnv.getExecution().getCurrentDatabase().isPresent()) {
                this.tableEnv.useDatabase(ExecutionContext.this.mergedEnv.getExecution().getCurrentDatabase().get());
            }
        }

        public QueryConfig getQueryConfig() {
            return this.queryConfig;
        }

        public ExecutionEnvironment getExecutionEnvironment() {
            return this.execEnv;
        }

        public StreamExecutionEnvironment getStreamExecutionEnvironment() {
            return this.streamExecEnv;
        }

        public TableEnvironment getTableEnvironment() {
            return this.tableEnv;
        }

        public ExecutionConfig getExecutionConfig() {
            return this.streamExecEnv != null ? this.streamExecEnv.getConfig() : this.execEnv.getConfig();
        }

        public JobGraph createJobGraph(String str) {
            return ClusterClient.getJobGraph(ExecutionContext.this.flinkConfig, createPlan(str, ExecutionContext.this.flinkConfig), ExecutionContext.this.dependencies, ExecutionContext.this.runOptions.getClasspaths(), ExecutionContext.this.runOptions.getSavepointRestoreSettings());
        }

        private FlinkPlan createPlan(String str, Configuration configuration) {
            if (this.streamExecEnv != null) {
                return this.executor instanceof ExecutorBase ? this.executor.generateStreamGraph(str) : this.streamExecEnv.getStreamGraph(str);
            }
            int parallelism = this.execEnv.getParallelism();
            Plan createProgramPlan = this.execEnv.createProgramPlan();
            createProgramPlan.setJobName(str);
            return ClusterClient.getOptimizedPlan(new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration), createProgramPlan, parallelism);
        }

        private ExecutionEnvironment createExecutionEnvironment() {
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setRestartStrategy(ExecutionContext.this.mergedEnv.getExecution().getRestartStrategy());
            executionEnvironment.setParallelism(ExecutionContext.this.mergedEnv.getExecution().getParallelism());
            return executionEnvironment;
        }

        private StreamExecutionEnvironment createStreamExecutionEnvironment() {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setRestartStrategy(ExecutionContext.this.mergedEnv.getExecution().getRestartStrategy());
            executionEnvironment.setParallelism(ExecutionContext.this.mergedEnv.getExecution().getParallelism());
            executionEnvironment.setMaxParallelism(ExecutionContext.this.mergedEnv.getExecution().getMaxParallelism());
            executionEnvironment.setStreamTimeCharacteristic(ExecutionContext.this.mergedEnv.getExecution().getTimeCharacteristic());
            if (executionEnvironment.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
                executionEnvironment.getConfig().setAutoWatermarkInterval(ExecutionContext.this.mergedEnv.getExecution().getPeriodicWatermarksInterval());
            }
            return executionEnvironment;
        }

        private QueryConfig createQueryConfig() {
            if (this.streamExecEnv == null) {
                return new BatchQueryConfig();
            }
            StreamQueryConfig streamQueryConfig = new StreamQueryConfig();
            streamQueryConfig.withIdleStateRetentionTime(Time.milliseconds(ExecutionContext.this.mergedEnv.getExecution().getMinStateRetention()), Time.milliseconds(ExecutionContext.this.mergedEnv.getExecution().getMaxStateRetention()));
            return streamQueryConfig;
        }

        private void registerFunctions() {
            if (this.tableEnv instanceof StreamTableEnvironment) {
                StreamTableEnvironment streamTableEnvironment = this.tableEnv;
                ExecutionContext.this.functions.forEach((str, userDefinedFunction) -> {
                    if (userDefinedFunction instanceof ScalarFunction) {
                        streamTableEnvironment.registerFunction(str, (ScalarFunction) userDefinedFunction);
                    } else if (userDefinedFunction instanceof AggregateFunction) {
                        streamTableEnvironment.registerFunction(str, (AggregateFunction) userDefinedFunction);
                    } else {
                        if (!(userDefinedFunction instanceof TableFunction)) {
                            throw new SqlExecutionException("Unsupported function type: " + userDefinedFunction.getClass().getName());
                        }
                        streamTableEnvironment.registerFunction(str, (TableFunction) userDefinedFunction);
                    }
                });
            } else {
                BatchTableEnvironment batchTableEnvironment = this.tableEnv;
                ExecutionContext.this.functions.forEach((str2, userDefinedFunction2) -> {
                    if (userDefinedFunction2 instanceof ScalarFunction) {
                        batchTableEnvironment.registerFunction(str2, (ScalarFunction) userDefinedFunction2);
                    } else if (userDefinedFunction2 instanceof AggregateFunction) {
                        batchTableEnvironment.registerFunction(str2, (AggregateFunction) userDefinedFunction2);
                    } else {
                        if (!(userDefinedFunction2 instanceof TableFunction)) {
                            throw new SqlExecutionException("Unsupported function type: " + userDefinedFunction2.getClass().getName());
                        }
                        batchTableEnvironment.registerFunction(str2, (TableFunction) userDefinedFunction2);
                    }
                });
            }
        }

        private void registerView(ViewEntry viewEntry) {
            try {
                this.tableEnv.registerTable(viewEntry.getName(), this.tableEnv.sqlQuery(viewEntry.getQuery()));
            } catch (Exception e) {
                throw new SqlExecutionException("Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery() + "\nCause: " + e.getMessage());
            }
        }

        private void registerTemporalTable(TemporalTableEntry temporalTableEntry) {
            try {
                TemporalTableFunction createTemporalTableFunction = this.tableEnv.scan(new String[]{temporalTableEntry.getHistoryTable()}).createTemporalTableFunction(temporalTableEntry.getTimeAttribute(), String.join(",", temporalTableEntry.getPrimaryKeyFields()));
                if (this.tableEnv instanceof StreamTableEnvironment) {
                    this.tableEnv.registerFunction(temporalTableEntry.getName(), createTemporalTableFunction);
                } else {
                    this.tableEnv.registerFunction(temporalTableEntry.getName(), createTemporalTableFunction);
                }
            } catch (Exception e) {
                throw new SqlExecutionException("Invalid temporal table '" + temporalTableEntry.getName() + "' over table '" + temporalTableEntry.getHistoryTable() + ".\nCause: " + e.getMessage());
            }
        }
    }

    public ExecutionContext(Environment environment, SessionContext sessionContext, List<URL> list, Configuration configuration, Options options, List<CustomCommandLine<?>> list2) {
        this.sessionContext = sessionContext.copy();
        this.mergedEnv = Environment.merge(environment, sessionContext.getEnvironment());
        this.dependencies = list;
        this.flinkConfig = configuration;
        this.classLoader = FlinkUserCodeClassLoaders.parentFirst((URL[]) list.toArray(new URL[list.size()]), getClass().getClassLoader());
        this.mergedEnv.getCatalogs().forEach((str, catalogEntry) -> {
            this.catalogs.put(str, createCatalog(str, catalogEntry.asMap(), this.classLoader));
        });
        this.tableSources = new LinkedHashMap();
        this.tableSinks = new LinkedHashMap();
        this.mergedEnv.getTables().forEach((str2, tableEntry) -> {
            if ((tableEntry instanceof SourceTableEntry) || (tableEntry instanceof SourceSinkTableEntry)) {
                this.tableSources.put(str2, createTableSource(this.mergedEnv.getExecution(), tableEntry.asMap(), this.classLoader));
            }
            if ((tableEntry instanceof SinkTableEntry) || (tableEntry instanceof SourceSinkTableEntry)) {
                this.tableSinks.put(str2, createTableSink(this.mergedEnv.getExecution(), tableEntry.asMap(), this.classLoader));
            }
        });
        this.functions = new LinkedHashMap();
        this.mergedEnv.getFunctions().forEach((str3, functionEntry) -> {
            this.functions.put(str3, FunctionService.createFunction(functionEntry.getDescriptor(), this.classLoader, false));
        });
        this.commandLine = createCommandLine(this.mergedEnv.getDeployment(), options);
        this.activeCommandLine = findActiveCommandLine(list2, this.commandLine);
        this.runOptions = createRunOptions(this.commandLine);
        this.clusterId = (T) this.activeCommandLine.getClusterId(this.commandLine);
        this.clusterSpec = createClusterSpecification(this.activeCommandLine, this.commandLine);
    }

    public SessionContext getSessionContext() {
        return this.sessionContext;
    }

    public ClassLoader getClassLoader() {
        return this.classLoader;
    }

    public Environment getMergedEnvironment() {
        return this.mergedEnv;
    }

    public ClusterSpecification getClusterSpec() {
        return this.clusterSpec;
    }

    public T getClusterId() {
        return this.clusterId;
    }

    public ClusterDescriptor<T> createClusterDescriptor() throws Exception {
        return this.activeCommandLine.createClusterDescriptor(this.commandLine);
    }

    public ExecutionContext<T>.EnvironmentInstance createEnvironmentInstance() {
        try {
            return (EnvironmentInstance) wrapClassLoader(() -> {
                return new EnvironmentInstance();
            });
        } catch (Throwable th) {
            throw new SqlExecutionException("Could not create environment instance.", th);
        }
    }

    public Map<String, Catalog> getCatalogs() {
        return this.catalogs;
    }

    public Map<String, TableSource<?>> getTableSources() {
        return this.tableSources;
    }

    public Map<String, TableSink<?>> getTableSinks() {
        return this.tableSinks;
    }

    public <R> R wrapClassLoader(Supplier<R> supplier) {
        TemporaryClassLoaderContext temporaryClassLoaderContext = new TemporaryClassLoaderContext(this.classLoader);
        Throwable th = null;
        try {
            try {
                R r = supplier.get();
                if (temporaryClassLoaderContext != null) {
                    if (0 != 0) {
                        try {
                            temporaryClassLoaderContext.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        temporaryClassLoaderContext.close();
                    }
                }
                return r;
            } finally {
            }
        } catch (Throwable th3) {
            if (temporaryClassLoaderContext != null) {
                if (th != null) {
                    try {
                        temporaryClassLoaderContext.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    temporaryClassLoaderContext.close();
                }
            }
            throw th3;
        }
    }

    private static CommandLine createCommandLine(DeploymentEntry deploymentEntry, Options options) {
        try {
            return deploymentEntry.getCommandLine(options);
        } catch (Exception e) {
            throw new SqlExecutionException("Invalid deployment options.", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T> CustomCommandLine<T> findActiveCommandLine(List<CustomCommandLine<?>> list, CommandLine commandLine) {
        for (CustomCommandLine<?> customCommandLine : list) {
            if (customCommandLine.isActive(commandLine)) {
                return customCommandLine;
            }
        }
        throw new SqlExecutionException("Could not find a matching deployment.");
    }

    private static RunOptions createRunOptions(CommandLine commandLine) {
        try {
            return new RunOptions(commandLine);
        } catch (CliArgsException e) {
            throw new SqlExecutionException("Invalid deployment run options.", e);
        }
    }

    private static ClusterSpecification createClusterSpecification(CustomCommandLine<?> customCommandLine, CommandLine commandLine) {
        try {
            return customCommandLine.getClusterSpecification(commandLine);
        } catch (FlinkException e) {
            throw new SqlExecutionException("Could not create cluster specification for the given deployment.", e);
        }
    }

    private Catalog createCatalog(String str, Map<String, String> map, ClassLoader classLoader) {
        return TableFactoryService.find(CatalogFactory.class, map, classLoader).createCatalog(str, map);
    }

    private static TableSource<?> createTableSource(ExecutionEntry executionEntry, Map<String, String> map, ClassLoader classLoader) {
        if (executionEntry.isStreamingPlanner()) {
            return TableFactoryService.find(TableSourceFactory.class, map, classLoader).createTableSource(map);
        }
        if (executionEntry.isBatchPlanner()) {
            return TableFactoryService.find(BatchTableSourceFactory.class, map, classLoader).createBatchTableSource(map);
        }
        throw new SqlExecutionException("Unsupported execution type for sources.");
    }

    private static TableSink<?> createTableSink(ExecutionEntry executionEntry, Map<String, String> map, ClassLoader classLoader) {
        if (executionEntry.isStreamingPlanner()) {
            return TableFactoryService.find(TableSinkFactory.class, map, classLoader).createTableSink(map);
        }
        if (executionEntry.isBatchPlanner()) {
            return TableFactoryService.find(BatchTableSinkFactory.class, map, classLoader).createBatchTableSink(map);
        }
        throw new SqlExecutionException("Unsupported execution type for sinks.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static TableEnvironment createStreamTableEnvironment(StreamExecutionEnvironment streamExecutionEnvironment, EnvironmentSettings environmentSettings, Executor executor) {
        TableConfig tableConfig = TableConfig.getDefault();
        CatalogManager catalogManager = new CatalogManager(environmentSettings.getBuiltInCatalogName(), new GenericInMemoryCatalog(environmentSettings.getBuiltInCatalogName(), environmentSettings.getBuiltInDatabaseName()));
        FunctionCatalog functionCatalog = new FunctionCatalog(catalogManager);
        Map plannerProperties = environmentSettings.toPlannerProperties();
        return new StreamTableEnvironmentImpl(catalogManager, functionCatalog, tableConfig, streamExecutionEnvironment, ComponentFactoryService.find(PlannerFactory.class, plannerProperties).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager), executor, environmentSettings.isStreamingMode());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Executor lookupExecutor(Map<String, String> map, StreamExecutionEnvironment streamExecutionEnvironment) {
        try {
            ExecutorFactory find = ComponentFactoryService.find(ExecutorFactory.class, map);
            return (Executor) find.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class).invoke(find, map, streamExecutionEnvironment);
        } catch (Exception e) {
            throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", e);
        }
    }
}
