package org.apache.linkis.engineconnplugin.flink.client.context;

import java.net.URL;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.SinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceTableEntry;
import org.apache.flink.table.client.config.entries.TableEntry;
import org.apache.flink.table.client.config.entries.TemporalTableEntry;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.linkis.engineconnplugin.flink.client.config.Environment;
import org.apache.linkis.engineconnplugin.flink.client.factory.LinkisKubernetesClusterClientFactory;
import org.apache.linkis.engineconnplugin.flink.client.factory.LinkisYarnClusterClientFactory;
import org.apache.linkis.engineconnplugin.flink.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext.class */
public class ExecutionContext {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionContext.class);
    private final Environment environment;
    private final ClassLoader classLoader;
    private final Configuration flinkConfig;
    private LinkisYarnClusterClientFactory clusterClientFactory;
    private LinkisKubernetesClusterClientFactory kubernetesClusterClientFactory;
    private TableEnvironmentInternal tableEnv;
    private ExecutionEnvironment execEnv;
    private StreamExecutionEnvironment streamExecEnv;
    private Executor executor;
    private SessionState sessionState;

    /* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext$Builder.class */
    public static class Builder {
        private final Environment sessionEnv;
        private final List<URL> dependencies;
        private final Configuration configuration;
        private Environment defaultEnv;
        private Environment currentEnv;
        private LinkisYarnClusterClientFactory clusterClientFactory;

        @Nullable
        private SessionState sessionState;

        private Builder(Environment environment, @Nullable Environment environment2, List<URL> list, Configuration configuration) {
            this.defaultEnv = environment;
            this.sessionEnv = environment2;
            this.dependencies = list;
            this.configuration = configuration;
        }

        public Builder env(Environment environment) {
            this.currentEnv = environment;
            return this;
        }

        public Builder sessionState(SessionState sessionState) {
            this.sessionState = sessionState;
            return this;
        }

        Builder clusterClientFactory(LinkisYarnClusterClientFactory linkisYarnClusterClientFactory) {
            this.clusterClientFactory = linkisYarnClusterClientFactory;
            return this;
        }

        public ExecutionContext build() {
            if (this.sessionEnv == null) {
                this.currentEnv = this.defaultEnv;
            }
            if (this.clusterClientFactory == null) {
                return new ExecutionContext(this.currentEnv == null ? Environment.merge(this.defaultEnv, this.sessionEnv) : this.currentEnv, this.sessionState, this.dependencies, this.configuration);
            }
            return new ExecutionContext(this.currentEnv == null ? Environment.merge(this.defaultEnv, this.sessionEnv) : this.currentEnv, this.sessionState, this.dependencies, this.configuration, this.clusterClientFactory);
        }
    }

    /* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/context/ExecutionContext$SessionState.class */
    public static class SessionState {
        public final CatalogManager catalogManager;
        public final ModuleManager moduleManager;
        public final FunctionCatalog functionCatalog;

        private SessionState(CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
            this.catalogManager = catalogManager;
            this.moduleManager = moduleManager;
            this.functionCatalog = functionCatalog;
        }

        public static SessionState of(CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
            return new SessionState(catalogManager, moduleManager, functionCatalog);
        }
    }

    private ExecutionContext(Environment environment, @Nullable SessionState sessionState, List<URL> list, Configuration configuration) {
        this(environment, sessionState, list, configuration, new LinkisYarnClusterClientFactory(), new LinkisKubernetesClusterClientFactory());
    }

    private ExecutionContext(Environment environment, @Nullable SessionState sessionState, List<URL> list, Configuration configuration, LinkisYarnClusterClientFactory linkisYarnClusterClientFactory) {
        this(environment, sessionState, list, configuration, linkisYarnClusterClientFactory, new LinkisKubernetesClusterClientFactory());
    }

    private ExecutionContext(Environment environment, @Nullable SessionState sessionState, List<URL> list, Configuration configuration, LinkisYarnClusterClientFactory linkisYarnClusterClientFactory, LinkisKubernetesClusterClientFactory linkisKubernetesClusterClientFactory) {
        this.environment = environment;
        this.flinkConfig = configuration;
        this.sessionState = sessionState;
        this.classLoader = ClientUtils.buildUserCodeClassLoader(list == null ? Collections.emptyList() : list, Collections.emptyList(), getClass().getClassLoader(), configuration);
        LOG.debug("Deployment descriptor: {}", environment.getDeployment());
        LOG.info("flinkConfig config: {}", configuration);
        this.clusterClientFactory = linkisYarnClusterClientFactory;
        this.kubernetesClusterClientFactory = linkisKubernetesClusterClientFactory;
    }

    public StreamExecutionEnvironment getStreamExecutionEnvironment() throws SqlExecutionException {
        if (this.streamExecEnv == null) {
            getTableEnvironment();
        }
        return this.streamExecEnv;
    }

    public void setString(String str, String str2) {
        this.flinkConfig.setString(str, str2);
    }

    public void setBoolean(String str, boolean z) {
        this.flinkConfig.setBoolean(str, z);
    }

    public Configuration getFlinkConfig() {
        return this.flinkConfig;
    }

    public ClassLoader getClassLoader() {
        return this.classLoader;
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    public YarnClusterDescriptor createClusterDescriptor() {
        return this.clusterClientFactory.m9createClusterDescriptor(this.flinkConfig);
    }

    public KubernetesClusterDescriptor createKubernetesClusterDescriptor() {
        return this.kubernetesClusterClientFactory.m7createClusterDescriptor(this.flinkConfig);
    }

    public Map<String, Catalog> getCatalogs() {
        HashMap hashMap = new HashMap();
        for (String str : this.tableEnv.listCatalogs()) {
            this.tableEnv.getCatalog(str).ifPresent(catalog -> {
            });
        }
        return hashMap;
    }

    public SessionState getSessionState() {
        return this.sessionState;
    }

    public <R> R wrapClassLoader(Supplier<R> supplier) {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(this.classLoader);
        Throwable th = null;
        try {
            try {
                R r = supplier.get();
                if (of != null) {
                    if (0 != 0) {
                        try {
                            of.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        of.close();
                    }
                }
                return r;
            } finally {
            }
        } catch (Throwable th3) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    of.close();
                }
            }
            throw th3;
        }
    }

    public <R> R wrapClassLoader(Function<TableEnvironmentInternal, R> function) throws SqlExecutionException {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(this.classLoader);
        Throwable th = null;
        try {
            try {
                R apply = function.apply(getTableEnvironment());
                if (of != null) {
                    if (0 != 0) {
                        try {
                            of.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        of.close();
                    }
                }
                return apply;
            } finally {
            }
        } catch (Throwable th3) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    of.close();
                }
            }
            throw th3;
        }
    }

    void wrapClassLoader(Runnable runnable) {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(this.classLoader);
        Throwable th = null;
        try {
            try {
                runnable.run();
                if (of != null) {
                    if (0 == 0) {
                        of.close();
                        return;
                    }
                    try {
                        of.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    of.close();
                }
            }
            throw th4;
        }
    }

    public TableEnvironmentInternal getTableEnvironment() throws SqlExecutionException {
        if (this.tableEnv == null) {
            synchronized (this) {
                if (this.tableEnv == null) {
                    initializeTableEnvironment(this.sessionState);
                }
            }
        }
        return this.tableEnv;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.streamExecEnv != null ? this.streamExecEnv.getConfig() : this.execEnv.getConfig();
    }

    public LinkisYarnClusterClientFactory getClusterClientFactory() {
        return this.clusterClientFactory;
    }

    public static Builder builder(Environment environment, Environment environment2, List<URL> list, Configuration configuration) {
        return new Builder(environment, environment2, list, configuration);
    }

    private Module createModule(Map<String, String> map, ClassLoader classLoader) {
        return TableFactoryService.find(ModuleFactory.class, map, classLoader).createModule(map);
    }

    private Catalog createCatalog(String str, Map<String, String> map, ClassLoader classLoader) {
        return TableFactoryService.find(CatalogFactory.class, map, classLoader).createCatalog(str, map);
    }

    private static TableSource<?> createTableSource(ExecutionEntry executionEntry, Map<String, String> map, ClassLoader classLoader) throws SqlExecutionException {
        if (executionEntry.isStreamingPlanner()) {
            return TableFactoryService.find(TableSourceFactory.class, map, classLoader).createTableSource(map);
        }
        if (executionEntry.isBatchPlanner()) {
            return TableFactoryService.find(BatchTableSourceFactory.class, map, classLoader).createBatchTableSource(map);
        }
        throw new SqlExecutionException(FlinkErrorCodeSummary.SUPPORTED_SOURCES.getErrorDesc());
    }

    private static TableSink<?> createTableSink(ExecutionEntry executionEntry, Map<String, String> map, ClassLoader classLoader) throws SqlExecutionException {
        if (executionEntry.isStreamingPlanner()) {
            return TableFactoryService.find(TableSinkFactory.class, map, classLoader).createTableSink(map);
        }
        if (executionEntry.isBatchPlanner()) {
            return TableFactoryService.find(BatchTableSinkFactory.class, map, classLoader).createBatchTableSink(map);
        }
        throw new SqlExecutionException(FlinkErrorCodeSummary.SUPPORTED_SINKS.getErrorDesc());
    }

    private TableEnvironmentInternal createStreamTableEnvironment(StreamExecutionEnvironment streamExecutionEnvironment, EnvironmentSettings environmentSettings, TableConfig tableConfig, Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        Map plannerProperties = environmentSettings.toPlannerProperties();
        return new StreamTableEnvironmentImpl(catalogManager, moduleManager, functionCatalog, tableConfig, streamExecutionEnvironment, ComponentFactoryService.find(PlannerFactory.class, plannerProperties).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager), executor, environmentSettings.isStreamingMode(), this.classLoader);
    }

    private static Executor lookupExecutor(Map<String, String> map, StreamExecutionEnvironment streamExecutionEnvironment) {
        try {
            ExecutorFactory find = ComponentFactoryService.find(ExecutorFactory.class, map);
            return (Executor) find.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class).invoke(find, map, streamExecutionEnvironment);
        } catch (Exception e) {
            throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", e);
        }
    }

    private void initializeTableEnvironment(@Nullable SessionState sessionState) throws SqlExecutionException {
        EnvironmentSettings environmentSettings = this.environment.getExecution().getEnvironmentSettings();
        TableConfig tableConfig = new TableConfig();
        this.environment.getConfiguration().asMap().forEach((str, str2) -> {
            tableConfig.getConfiguration().setString(str, str2);
        });
        if (!(sessionState == null)) {
            this.sessionState = sessionState;
            createTableEnvironment(environmentSettings, tableConfig, sessionState.catalogManager, sessionState.moduleManager, sessionState.functionCatalog);
            return;
        }
        ModuleManager moduleManager = new ModuleManager();
        CatalogManager build = CatalogManager.newBuilder().classLoader(this.classLoader).config(tableConfig.getConfiguration()).defaultCatalog(environmentSettings.getBuiltInCatalogName(), new GenericInMemoryCatalog(environmentSettings.getBuiltInCatalogName(), environmentSettings.getBuiltInDatabaseName())).build();
        FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, build, moduleManager);
        this.sessionState = SessionState.of(build, moduleManager, functionCatalog);
        createTableEnvironment(environmentSettings, tableConfig, build, moduleManager, functionCatalog);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.environment.getModules().forEach((str3, moduleEntry) -> {
        });
        if (!linkedHashMap.isEmpty()) {
            this.tableEnv.unloadModule("core");
            TableEnvironmentInternal tableEnvironmentInternal = this.tableEnv;
            tableEnvironmentInternal.getClass();
            linkedHashMap.forEach(tableEnvironmentInternal::loadModule);
        }
        registerFunctions();
        initializeCatalogs();
    }

    private void createTableEnvironment(EnvironmentSettings environmentSettings, TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        if (this.environment.getExecution().isStreamingPlanner()) {
            this.streamExecEnv = createStreamExecutionEnvironment();
            this.execEnv = null;
            this.executor = lookupExecutor(environmentSettings.toExecutorProperties(), this.streamExecEnv);
            this.tableEnv = createStreamTableEnvironment(this.streamExecEnv, environmentSettings, tableConfig, this.executor, catalogManager, moduleManager, functionCatalog);
            return;
        }
        this.streamExecEnv = null;
        this.execEnv = createExecutionEnvironment();
        this.executor = null;
        this.tableEnv = new BatchTableEnvironmentImpl(this.execEnv, tableConfig, catalogManager, moduleManager);
    }

    private void initializeCatalogs() throws SqlExecutionException {
        wrapClassLoader(() -> {
            this.environment.getCatalogs().forEach((str, catalogEntry) -> {
                this.tableEnv.registerCatalog(str, createCatalog(str, catalogEntry.asMap(), this.classLoader));
            });
        });
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<String, TableEntry> entry : this.environment.getTables().entrySet()) {
            String key = entry.getKey();
            TableEntry value = entry.getValue();
            if ((value instanceof SourceTableEntry) || (value instanceof SourceSinkTableEntry)) {
                hashMap.put(key, createTableSource(this.environment.getExecution(), value.asMap(), this.classLoader));
            }
            if ((value instanceof SinkTableEntry) || (value instanceof SourceSinkTableEntry)) {
                hashMap2.put(key, createTableSink(this.environment.getExecution(), value.asMap(), this.classLoader));
            }
        }
        TableEnvironmentInternal tableEnvironmentInternal = this.tableEnv;
        tableEnvironmentInternal.getClass();
        hashMap.forEach(tableEnvironmentInternal::registerTableSourceInternal);
        TableEnvironmentInternal tableEnvironmentInternal2 = this.tableEnv;
        tableEnvironmentInternal2.getClass();
        hashMap2.forEach(tableEnvironmentInternal2::registerTableSinkInternal);
        Iterator<Map.Entry<String, TableEntry>> it = this.environment.getTables().entrySet().iterator();
        while (it.hasNext()) {
            TableEntry value2 = it.next().getValue();
            if (value2 instanceof TemporalTableEntry) {
                registerTemporalTable((TemporalTableEntry) value2);
            }
        }
        Iterator<Map.Entry<String, TableEntry>> it2 = this.environment.getTables().entrySet().iterator();
        while (it2.hasNext()) {
            TableEntry value3 = it2.next().getValue();
            if (value3 instanceof ViewEntry) {
                registerView((ViewEntry) value3);
            }
        }
        Optional currentCatalog = this.environment.getExecution().getCurrentCatalog();
        TableEnvironmentInternal tableEnvironmentInternal3 = this.tableEnv;
        tableEnvironmentInternal3.getClass();
        currentCatalog.ifPresent(tableEnvironmentInternal3::useCatalog);
        Optional currentDatabase = this.environment.getExecution().getCurrentDatabase();
        TableEnvironmentInternal tableEnvironmentInternal4 = this.tableEnv;
        tableEnvironmentInternal4.getClass();
        currentDatabase.ifPresent(tableEnvironmentInternal4::useDatabase);
    }

    private ExecutionEnvironment createExecutionEnvironment() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(this.environment.getExecution().getRestartStrategy());
        executionEnvironment.setParallelism(((Integer) this.environment.getExecution().getParallelism().orElse(1)).intValue());
        return executionEnvironment;
    }

    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
        StreamContextEnvironment.setAsContext(new DefaultExecutorServiceLoader(), this.flinkConfig, this.classLoader, false, false);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(this.flinkConfig);
        executionEnvironment.setRestartStrategy(this.environment.getExecution().getRestartStrategy());
        executionEnvironment.setParallelism(((Integer) this.environment.getExecution().getParallelism().orElse(1)).intValue());
        executionEnvironment.setMaxParallelism(this.environment.getExecution().getMaxParallelism());
        executionEnvironment.setStreamTimeCharacteristic(this.environment.getExecution().getTimeCharacteristic());
        if (executionEnvironment.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
            executionEnvironment.getConfig().setAutoWatermarkInterval(this.environment.getExecution().getPeriodicWatermarksInterval());
        }
        return executionEnvironment;
    }

    private void registerFunctions() throws SqlExecutionException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.environment.getFunctions().forEach((str, functionEntry) -> {
            linkedHashMap.put(str, FunctionService.createFunction(functionEntry.getDescriptor(), this.classLoader, false));
        });
        registerFunctions(linkedHashMap);
    }

    private void registerFunctions(Map<String, FunctionDefinition> map) throws SqlExecutionException {
        if (this.tableEnv instanceof StreamTableEnvironment) {
            StreamTableEnvironment streamTableEnvironment = this.tableEnv;
            for (Map.Entry<String, FunctionDefinition> entry : map.entrySet()) {
                String key = entry.getKey();
                TableFunction tableFunction = (FunctionDefinition) entry.getValue();
                if (tableFunction instanceof ScalarFunction) {
                    streamTableEnvironment.registerFunction(key, (ScalarFunction) tableFunction);
                } else if (tableFunction instanceof AggregateFunction) {
                    streamTableEnvironment.registerFunction(key, (AggregateFunction) tableFunction);
                } else {
                    if (!(tableFunction instanceof TableFunction)) {
                        throw new SqlExecutionException(MessageFormat.format(FlinkErrorCodeSummary.SUPPORTED_FUNCTION_TYPE.getErrorDesc(), tableFunction.getClass().getName()));
                    }
                    streamTableEnvironment.registerFunction(key, tableFunction);
                }
            }
            return;
        }
        BatchTableEnvironment batchTableEnvironment = this.tableEnv;
        for (Map.Entry<String, FunctionDefinition> entry2 : map.entrySet()) {
            String key2 = entry2.getKey();
            TableFunction tableFunction2 = (FunctionDefinition) entry2.getValue();
            if (tableFunction2 instanceof ScalarFunction) {
                batchTableEnvironment.registerFunction(key2, (ScalarFunction) tableFunction2);
            } else if (tableFunction2 instanceof AggregateFunction) {
                batchTableEnvironment.registerFunction(key2, (AggregateFunction) tableFunction2);
            } else {
                if (!(tableFunction2 instanceof TableFunction)) {
                    throw new SqlExecutionException(MessageFormat.format(FlinkErrorCodeSummary.SUPPORTED_FUNCTION_TYPE.getErrorDesc(), tableFunction2.getClass().getName()));
                }
                batchTableEnvironment.registerFunction(key2, tableFunction2);
            }
        }
    }

    private void registerView(ViewEntry viewEntry) throws SqlExecutionException {
        try {
            this.tableEnv.registerTable(viewEntry.getName(), this.tableEnv.sqlQuery(viewEntry.getQuery()));
        } catch (Exception e) {
            throw new SqlExecutionException("Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery() + "\nCause: " + e.getMessage());
        }
    }

    private void registerTemporalTable(TemporalTableEntry temporalTableEntry) throws SqlExecutionException {
        try {
            TemporalTableFunction createTemporalTableFunction = this.tableEnv.scan(new String[]{temporalTableEntry.getHistoryTable()}).createTemporalTableFunction(temporalTableEntry.getTimeAttribute(), String.join(",", temporalTableEntry.getPrimaryKeyFields()));
            if (this.tableEnv instanceof StreamTableEnvironment) {
                this.tableEnv.registerFunction(temporalTableEntry.getName(), createTemporalTableFunction);
            } else {
                this.tableEnv.registerFunction(temporalTableEntry.getName(), createTemporalTableFunction);
            }
        } catch (Exception e) {
            throw new SqlExecutionException("Invalid temporal table '" + temporalTableEntry.getName() + "' over table '" + temporalTableEntry.getHistoryTable() + ".\nCause: " + e.getMessage());
        }
    }

    public ExecutionContext cloneExecutionContext(Builder builder) {
        ExecutionContext build = builder.clusterClientFactory(this.clusterClientFactory).build();
        if (this.tableEnv != null) {
            build.tableEnv = this.tableEnv;
            build.execEnv = this.execEnv;
            build.streamExecEnv = this.streamExecEnv;
            build.executor = this.executor;
        }
        return build;
    }
}
