/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.client.gateway.local;

import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.DeploymentEntry;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.SinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceTableEntry;
import org.apache.flink.table.client.config.entries.TemporalTableEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.FunctionDescriptor;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionContext<ClusterID> {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionContext.class);
    private final Environment environment;
    private final SessionContext originalSessionContext;
    private final ClassLoader classLoader;
    private final Configuration flinkConfig;
    private final ClusterClientFactory<ClusterID> clusterClientFactory;
    private final ClusterID clusterId;
    private final ClusterSpecification clusterSpec;
    private TableEnvironment tableEnv;
    private ExecutionEnvironment execEnv;
    private StreamExecutionEnvironment streamExecEnv;
    private Executor executor;
    private SessionState sessionState;

    private ExecutionContext(Environment environment, SessionContext originalSessionContext, @Nullable SessionState sessionState, List<URL> dependencies, Configuration flinkConfig, ClusterClientServiceLoader clusterClientServiceLoader, Options commandLineOptions, List<CustomCommandLine> availableCommandLines) throws FlinkException {
        this.environment = environment;
        this.originalSessionContext = originalSessionContext;
        this.flinkConfig = flinkConfig;
        if (this.containsPythonFunction(environment)) {
            dependencies = this.addPythonDependency(dependencies);
        }
        this.classLoader = ClientUtils.buildUserCodeClassLoader(dependencies, Collections.emptyList(), (ClassLoader)this.getClass().getClassLoader(), (Configuration)flinkConfig);
        this.initializeTableEnvironment(sessionState);
        LOG.debug("Deployment descriptor: {}", (Object)environment.getDeployment());
        CommandLine commandLine = ExecutionContext.createCommandLine(environment.getDeployment(), commandLineOptions);
        flinkConfig.addAll(ExecutionContext.createExecutionConfig(commandLine, commandLineOptions, availableCommandLines, dependencies));
        ClusterClientServiceLoader serviceLoader = (ClusterClientServiceLoader)Preconditions.checkNotNull((Object)clusterClientServiceLoader);
        this.clusterClientFactory = serviceLoader.getClusterClientFactory(flinkConfig);
        Preconditions.checkState((this.clusterClientFactory != null ? 1 : 0) != 0);
        this.clusterId = this.clusterClientFactory.getClusterId(flinkConfig);
        this.clusterSpec = this.clusterClientFactory.getClusterSpecification(flinkConfig);
    }

    public Configuration getFlinkConfig() {
        return this.flinkConfig;
    }

    public SessionContext getOriginalSessionContext() {
        return this.originalSessionContext;
    }

    public ClassLoader getClassLoader() {
        return this.classLoader;
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    public ClusterSpecification getClusterSpec() {
        return this.clusterSpec;
    }

    public ClusterID getClusterId() {
        return this.clusterId;
    }

    public ClusterDescriptor<ClusterID> createClusterDescriptor() {
        return this.clusterClientFactory.createClusterDescriptor(this.flinkConfig);
    }

    public Map<String, Catalog> getCatalogs() {
        HashMap<String, Catalog> catalogs = new HashMap<String, Catalog>();
        for (String name : this.tableEnv.listCatalogs()) {
            this.tableEnv.getCatalog(name).ifPresent(c -> catalogs.put(name, (Catalog)c));
        }
        return catalogs;
    }

    public SessionState getSessionState() {
        return this.sessionState;
    }

    public <R> R wrapClassLoader(Supplier<R> supplier) {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.classLoader);){
            R r = supplier.get();
            return r;
        }
    }

    void wrapClassLoader(Runnable runnable) {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.classLoader);){
            runnable.run();
        }
    }

    public TableEnvironment getTableEnvironment() {
        return this.tableEnv;
    }

    public ExecutionConfig getExecutionConfig() {
        if (this.streamExecEnv != null) {
            return this.streamExecEnv.getConfig();
        }
        return this.execEnv.getConfig();
    }

    public Pipeline createPipeline(String name) {
        return this.wrapClassLoader(() -> {
            if (this.streamExecEnv != null) {
                StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl)this.tableEnv;
                return streamTableEnv.getPipeline(name);
            }
            BatchTableEnvironmentImpl batchTableEnv = (BatchTableEnvironmentImpl)this.tableEnv;
            return batchTableEnv.getPipeline(name);
        });
    }

    public static Builder builder(Environment defaultEnv, SessionContext sessionContext, List<URL> dependencies, Configuration configuration, ClusterClientServiceLoader serviceLoader, Options commandLineOptions, List<CustomCommandLine> commandLines) {
        return new Builder(defaultEnv, sessionContext, dependencies, configuration, serviceLoader, commandLineOptions, commandLines);
    }

    public void close() {
        this.wrapClassLoader(() -> this.getCatalogs().values().forEach(Catalog::close));
    }

    private static Configuration createExecutionConfig(CommandLine commandLine, Options commandLineOptions, List<CustomCommandLine> availableCommandLines, List<URL> dependencies) throws FlinkException {
        LOG.debug("Available commandline options: {}", (Object)commandLineOptions);
        List options = Stream.of(commandLine.getOptions()).map(o -> o.getOpt() + "=" + o.getValue()).collect(Collectors.toList());
        LOG.debug("Instantiated commandline args: {}, options: {}", (Object)commandLine.getArgList(), options);
        CustomCommandLine activeCommandLine = ExecutionContext.findActiveCommandLine(availableCommandLines, commandLine);
        LOG.debug("Available commandlines: {}, active commandline: {}", availableCommandLines, (Object)activeCommandLine);
        Configuration executionConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
        try {
            ProgramOptions programOptions = ProgramOptions.create((CommandLine)commandLine);
            ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromProgramOptions((ProgramOptions)programOptions, dependencies);
            executionConfigAccessor.applyToConfiguration(executionConfig);
        }
        catch (CliArgsException e) {
            throw new SqlExecutionException("Invalid deployment run options.", e);
        }
        LOG.info("Executor config: {}", (Object)executionConfig);
        return executionConfig;
    }

    private static CommandLine createCommandLine(DeploymentEntry deployment, Options commandLineOptions) {
        try {
            return deployment.getCommandLine(commandLineOptions);
        }
        catch (Exception e) {
            throw new SqlExecutionException("Invalid deployment options.", e);
        }
    }

    private static CustomCommandLine findActiveCommandLine(List<CustomCommandLine> availableCommandLines, CommandLine commandLine) {
        for (CustomCommandLine cli : availableCommandLines) {
            if (!cli.isActive(commandLine)) continue;
            return cli;
        }
        throw new SqlExecutionException("Could not find a matching deployment.");
    }

    private Module createModule(Map<String, String> moduleProperties, ClassLoader classLoader) {
        ModuleFactory factory = (ModuleFactory)TableFactoryService.find(ModuleFactory.class, moduleProperties, (ClassLoader)classLoader);
        return factory.createModule(moduleProperties);
    }

    private Catalog createCatalog(String name, Map<String, String> catalogProperties, ClassLoader classLoader) {
        CatalogFactory factory = (CatalogFactory)TableFactoryService.find(CatalogFactory.class, catalogProperties, (ClassLoader)classLoader);
        return factory.createCatalog(name, catalogProperties);
    }

    private TableSource<?> createTableSource(String name, Map<String, String> sourceProperties) {
        if (this.environment.getExecution().isStreamingPlanner()) {
            TableSourceFactory factory = (TableSourceFactory)TableFactoryService.find(TableSourceFactory.class, sourceProperties, (ClassLoader)this.classLoader);
            return factory.createTableSource((TableSourceFactory.Context)new TableSourceFactoryContextImpl(ObjectIdentifier.of((String)this.tableEnv.getCurrentCatalog(), (String)this.tableEnv.getCurrentDatabase(), (String)name), (CatalogTable)CatalogTableImpl.fromProperties(sourceProperties), (ReadableConfig)this.tableEnv.getConfig().getConfiguration()));
        }
        if (this.environment.getExecution().isBatchPlanner()) {
            BatchTableSourceFactory factory = (BatchTableSourceFactory)TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, (ClassLoader)this.classLoader);
            return factory.createBatchTableSource(sourceProperties);
        }
        throw new SqlExecutionException("Unsupported execution type for sources.");
    }

    private TableSink<?> createTableSink(String name, Map<String, String> sinkProperties) {
        if (this.environment.getExecution().isStreamingPlanner()) {
            TableSinkFactory factory = (TableSinkFactory)TableFactoryService.find(TableSinkFactory.class, sinkProperties, (ClassLoader)this.classLoader);
            return factory.createTableSink((TableSinkFactory.Context)new TableSinkFactoryContextImpl(ObjectIdentifier.of((String)this.tableEnv.getCurrentCatalog(), (String)this.tableEnv.getCurrentDatabase(), (String)name), (CatalogTable)CatalogTableImpl.fromProperties(sinkProperties), (ReadableConfig)this.tableEnv.getConfig().getConfiguration(), !this.environment.getExecution().inStreamingMode()));
        }
        if (this.environment.getExecution().isBatchPlanner()) {
            BatchTableSinkFactory factory = (BatchTableSinkFactory)TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, (ClassLoader)this.classLoader);
            return factory.createBatchTableSink(sinkProperties);
        }
        throw new SqlExecutionException("Unsupported execution type for sinks.");
    }

    private static TableEnvironment createStreamTableEnvironment(StreamExecutionEnvironment env, EnvironmentSettings settings, TableConfig config, Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog, ClassLoader userClassLoader) {
        Map plannerProperties = settings.toPlannerProperties();
        Planner planner = ((PlannerFactory)ComponentFactoryService.find(PlannerFactory.class, (Map)plannerProperties)).create(plannerProperties, executor, config, functionCatalog, catalogManager);
        return new StreamTableEnvironmentImpl(catalogManager, moduleManager, functionCatalog, config, env, planner, executor, settings.isStreamingMode(), userClassLoader);
    }

    private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
        try {
            ExecutorFactory executorFactory = (ExecutorFactory)ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
            Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
            return (Executor)createMethod.invoke((Object)executorFactory, executorProperties, executionEnvironment);
        }
        catch (Exception e) {
            throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", (Throwable)e);
        }
    }

    private void initializeTableEnvironment(@Nullable SessionState sessionState) {
        EnvironmentSettings settings = this.environment.getExecution().getEnvironmentSettings();
        boolean noInheritedState = sessionState == null;
        TableConfig config = this.createTableConfig();
        if (noInheritedState) {
            ModuleManager moduleManager = new ModuleManager();
            CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(this.classLoader).config((ReadableConfig)config.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), (Catalog)new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).build();
            FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager);
            this.sessionState = SessionState.of(catalogManager, moduleManager, functionCatalog);
            this.createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog);
            LinkedHashMap<String, Module> modules = new LinkedHashMap<String, Module>();
            this.environment.getModules().forEach((name, entry) -> modules.put((String)name, this.createModule(entry.asMap(), this.classLoader)));
            if (!modules.isEmpty()) {
                this.tableEnv.unloadModule("core");
                modules.forEach((arg_0, arg_1) -> ((TableEnvironment)this.tableEnv).loadModule(arg_0, arg_1));
            }
            this.registerFunctions();
            this.initializeCatalogs();
        } else {
            this.sessionState = sessionState;
            this.createTableEnvironment(settings, config, sessionState.catalogManager, sessionState.moduleManager, sessionState.functionCatalog);
        }
    }

    private TableConfig createTableConfig() {
        TableConfig config = new TableConfig();
        config.addConfiguration(this.flinkConfig);
        Configuration conf = config.getConfiguration();
        this.environment.getConfiguration().asMap().forEach((arg_0, arg_1) -> ((Configuration)conf).setString(arg_0, arg_1));
        ExecutionEntry execution = this.environment.getExecution();
        config.setIdleStateRetentionTime(Time.milliseconds((long)execution.getMinStateRetention()), Time.milliseconds((long)execution.getMaxStateRetention()));
        conf.set(CoreOptions.DEFAULT_PARALLELISM, (Object)execution.getParallelism());
        conf.set(PipelineOptions.MAX_PARALLELISM, (Object)execution.getMaxParallelism());
        conf.set(StreamPipelineOptions.TIME_CHARACTERISTIC, (Object)execution.getTimeCharacteristic());
        if (execution.getTimeCharacteristic() == TimeCharacteristic.EventTime) {
            conf.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, (Object)Duration.ofMillis(execution.getPeriodicWatermarksInterval()));
        }
        this.setRestartStrategy(conf);
        return config;
    }

    private void setRestartStrategy(Configuration conf) {
        RestartStrategies.RestartStrategyConfiguration restartStrategy = this.environment.getExecution().getRestartStrategy();
        if (restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration) {
            conf.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"none");
        } else if (restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration) {
            conf.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixed-delay");
            RestartStrategies.FixedDelayRestartStrategyConfiguration fixedDelay = (RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy;
            conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)fixedDelay.getRestartAttempts());
            conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(fixedDelay.getDelayBetweenAttemptsInterval().toMilliseconds()));
        } else if (restartStrategy instanceof RestartStrategies.FailureRateRestartStrategyConfiguration) {
            conf.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"failure-rate");
            RestartStrategies.FailureRateRestartStrategyConfiguration failureRate = (RestartStrategies.FailureRateRestartStrategyConfiguration)restartStrategy;
            conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, (Object)failureRate.getMaxFailureRate());
            conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, (Object)Duration.ofMillis(failureRate.getFailureInterval().toMilliseconds()));
            conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY, (Object)Duration.ofMillis(failureRate.getDelayBetweenAttemptsInterval().toMilliseconds()));
        } else if (restartStrategy instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
            conf.removeConfig(RestartStrategyOptions.RESTART_STRATEGY);
        }
    }

    private void createTableEnvironment(EnvironmentSettings settings, TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        if (this.environment.getExecution().isStreamingPlanner()) {
            this.streamExecEnv = this.createStreamExecutionEnvironment();
            this.execEnv = null;
            Map executorProperties = settings.toExecutorProperties();
            this.executor = ExecutionContext.lookupExecutor(executorProperties, this.streamExecEnv);
            this.tableEnv = ExecutionContext.createStreamTableEnvironment(this.streamExecEnv, settings, config, this.executor, catalogManager, moduleManager, functionCatalog, this.classLoader);
        } else if (this.environment.getExecution().isBatchPlanner()) {
            this.streamExecEnv = null;
            this.execEnv = ExecutionEnvironment.getExecutionEnvironment();
            this.executor = null;
            this.tableEnv = new BatchTableEnvironmentImpl(this.execEnv, config, catalogManager, moduleManager);
        } else {
            throw new SqlExecutionException("Unsupported execution type specified.");
        }
    }

    private void initializeCatalogs() {
        this.wrapClassLoader(() -> this.environment.getCatalogs().forEach((name, entry) -> {
            Catalog catalog = this.createCatalog((String)name, entry.asMap(), this.classLoader);
            this.tableEnv.registerCatalog(name, catalog);
        }));
        HashMap<String, TableSource> tableSources = new HashMap<String, TableSource>();
        HashMap<String, TableSink> tableSinks = new HashMap<String, TableSink>();
        this.environment.getTables().forEach((name, entry) -> {
            if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
                tableSources.put((String)name, (TableSource)this.createTableSource((String)name, entry.asMap()));
            }
            if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
                tableSinks.put((String)name, (TableSink)this.createTableSink((String)name, entry.asMap()));
            }
        });
        tableSources.forEach((arg_0, arg_1) -> ((TableEnvironmentInternal)((TableEnvironmentInternal)this.tableEnv)).registerTableSourceInternal(arg_0, arg_1));
        tableSinks.forEach((arg_0, arg_1) -> ((TableEnvironmentInternal)((TableEnvironmentInternal)this.tableEnv)).registerTableSinkInternal(arg_0, arg_1));
        this.environment.getTables().forEach((name, entry) -> {
            if (entry instanceof TemporalTableEntry) {
                TemporalTableEntry temporalTableEntry = (TemporalTableEntry)entry;
                this.registerTemporalTable(temporalTableEntry);
            }
        });
        this.environment.getTables().forEach((name, entry) -> {
            if (entry instanceof ViewEntry) {
                ViewEntry viewEntry = (ViewEntry)entry;
                this.registerTemporaryView(viewEntry);
            }
        });
        Optional<String> catalog = this.environment.getExecution().getCurrentCatalog();
        catalog.ifPresent(arg_0 -> ((TableEnvironment)this.tableEnv).useCatalog(arg_0));
        Optional<String> database = this.environment.getExecution().getCurrentDatabase();
        database.ifPresent(arg_0 -> ((TableEnvironment)this.tableEnv).useDatabase(arg_0));
    }

    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(this.environment.getExecution().getTimeCharacteristic());
        if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
            env.getConfig().setAutoWatermarkInterval(this.environment.getExecution().getPeriodicWatermarksInterval());
        }
        return env;
    }

    private void registerFunctions() {
        LinkedHashMap<String, FunctionDefinition> functions = new LinkedHashMap<String, FunctionDefinition>();
        this.environment.getFunctions().forEach((name, entry) -> {
            UserDefinedFunction function = FunctionService.createFunction((FunctionDescriptor)entry.getDescriptor(), (ClassLoader)this.classLoader, (boolean)false, (ReadableConfig)this.getTableEnvironment().getConfig().getConfiguration());
            functions.put((String)name, (FunctionDefinition)function);
        });
        this.registerFunctions(functions);
    }

    private void registerFunctions(Map<String, FunctionDefinition> functions) {
        if (this.tableEnv instanceof StreamTableEnvironment) {
            StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment)this.tableEnv;
            functions.forEach((k, v) -> {
                if (this.environment.getExecution().isBlinkPlanner()) {
                    if (v instanceof ScalarFunction || v instanceof TableFunction) {
                        streamTableEnvironment.createTemporarySystemFunction(k, (UserDefinedFunction)v);
                        return;
                    } else {
                        if (!(v instanceof AggregateFunction)) throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
                        streamTableEnvironment.registerFunction(k, (AggregateFunction)v);
                    }
                    return;
                } else if (v instanceof ScalarFunction) {
                    streamTableEnvironment.registerFunction(k, (ScalarFunction)v);
                    return;
                } else if (v instanceof AggregateFunction) {
                    streamTableEnvironment.registerFunction(k, (AggregateFunction)v);
                    return;
                } else {
                    if (!(v instanceof TableFunction)) throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
                    streamTableEnvironment.registerFunction(k, (TableFunction)v);
                }
            });
        } else {
            BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment)this.tableEnv;
            functions.forEach((k, v) -> {
                if (v instanceof ScalarFunction) {
                    batchTableEnvironment.registerFunction(k, (ScalarFunction)v);
                } else if (v instanceof AggregateFunction) {
                    batchTableEnvironment.registerFunction(k, (AggregateFunction)v);
                } else if (v instanceof TableFunction) {
                    batchTableEnvironment.registerFunction(k, (TableFunction)v);
                } else {
                    throw new SqlExecutionException("Unsupported function type: " + v.getClass().getName());
                }
            });
        }
    }

    private void registerTemporaryView(ViewEntry viewEntry) {
        try {
            this.tableEnv.createTemporaryView(viewEntry.getName(), this.tableEnv.sqlQuery(viewEntry.getQuery()));
        }
        catch (Exception e) {
            throw new SqlExecutionException("Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery() + "\nCause: " + e.getMessage());
        }
    }

    private void registerTemporalTable(TemporalTableEntry temporalTableEntry) {
        try {
            Table table = this.tableEnv.from(temporalTableEntry.getHistoryTable());
            List<String> primaryKeyFields = temporalTableEntry.getPrimaryKeyFields();
            if (primaryKeyFields.size() > 1) {
                throw new ValidationException("Temporal tables over a composite primary key are not supported yet.");
            }
            TemporalTableFunction function = table.createTemporalTableFunction((Expression)Expressions.$((String)temporalTableEntry.getTimeAttribute()), (Expression)Expressions.$((String)primaryKeyFields.get(0)));
            if (this.tableEnv instanceof StreamTableEnvironment) {
                StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment)this.tableEnv;
                streamTableEnvironment.registerFunction(temporalTableEntry.getName(), (TableFunction)function);
            } else {
                BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment)this.tableEnv;
                batchTableEnvironment.registerFunction(temporalTableEntry.getName(), (TableFunction)function);
            }
        }
        catch (Exception e) {
            throw new SqlExecutionException("Invalid temporal table '" + temporalTableEntry.getName() + "' over table '" + temporalTableEntry.getHistoryTable() + ".\nCause: " + e.getMessage());
        }
    }

    private boolean containsPythonFunction(Environment environment) {
        return environment.getFunctions().values().stream().anyMatch(f -> "python".equals(f.getDescriptor().toProperties().get("from")));
    }

    private List<URL> addPythonDependency(List<URL> dependencies) {
        ArrayList<URL> newDependencies = new ArrayList<URL>(dependencies);
        try {
            URL location = Class.forName("org.apache.flink.python.PythonFunctionRunner", false, Thread.currentThread().getContextClassLoader()).getProtectionDomain().getCodeSource().getLocation();
            if (Paths.get(location.toURI()).toFile().isFile()) {
                newDependencies.add(location);
            }
        }
        catch (ClassNotFoundException | URISyntaxException e) {
            throw new SqlExecutionException("Python UDF detected but flink-python jar not found. If you starts SQL-Client via `sql-client.sh`, please add the flink-python jar via `-j` command option manually.", e);
        }
        return newDependencies;
    }

    public static class SessionState {
        public final CatalogManager catalogManager;
        public final ModuleManager moduleManager;
        public final FunctionCatalog functionCatalog;

        private SessionState(CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
            this.catalogManager = catalogManager;
            this.moduleManager = moduleManager;
            this.functionCatalog = functionCatalog;
        }

        public static SessionState of(CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
            return new SessionState(catalogManager, moduleManager, functionCatalog);
        }
    }

    public static class Builder {
        private final SessionContext sessionContext;
        private final List<URL> dependencies;
        private final Configuration configuration;
        private final ClusterClientServiceLoader serviceLoader;
        private final Options commandLineOptions;
        private final List<CustomCommandLine> commandLines;
        private Environment defaultEnv;
        private Environment currentEnv;
        @Nullable
        private SessionState sessionState;

        private Builder(Environment defaultEnv, @Nullable SessionContext sessionContext, List<URL> dependencies, Configuration configuration, ClusterClientServiceLoader serviceLoader, Options commandLineOptions, List<CustomCommandLine> commandLines) {
            this.defaultEnv = defaultEnv;
            this.sessionContext = sessionContext;
            this.dependencies = dependencies;
            this.configuration = configuration;
            this.serviceLoader = serviceLoader;
            this.commandLineOptions = commandLineOptions;
            this.commandLines = commandLines;
        }

        public Builder env(Environment environment) {
            this.currentEnv = environment;
            return this;
        }

        public Builder sessionState(SessionState sessionState) {
            this.sessionState = sessionState;
            return this;
        }

        public ExecutionContext<?> build() {
            try {
                return new ExecutionContext(this.currentEnv == null ? Environment.merge(this.defaultEnv, this.sessionContext.getSessionEnv()) : this.currentEnv, this.sessionContext, this.sessionState, this.dependencies, this.configuration, this.serviceLoader, this.commandLineOptions, this.commandLines);
            }
            catch (Throwable t) {
                throw new SqlExecutionException("Could not create execution context.", t);
            }
        }
    }
}

