package org.apache.linkis.engineconnplugin.flink.client.shims;

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.ExecutionEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.SinkTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.SourceSinkTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.SourceTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.TableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.TemporalTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.ViewEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;

/* loaded from: input_file:org/apache/linkis/engineconnplugin/flink/client/shims/Flink1122Shims.class */
public class Flink1122Shims extends FlinkShims {
    private SessionState sessionState;
    private TableEnvironmentInternal tableEnv;
    private Environment environment;
    private ClassLoader classLoader;
    private StreamExecutionEnvironment streamExecEnv;
    private Executor executor;
    private ExecutionEnvironment execEnv;

    public Flink1122Shims(String str) {
        super(str);
    }

    public CompletableFuture<String> triggerSavepoint(Object obj, Object obj2, String str) {
        return ((ClusterClient) obj).triggerSavepoint((JobID) obj2, str);
    }

    public CompletableFuture<String> cancelWithSavepoint(Object obj, Object obj2, String str) {
        return ((ClusterClient) obj).cancelWithSavepoint((JobID) obj2, str);
    }

    public CompletableFuture<String> stopWithSavepoint(Object obj, Object obj2, boolean z, String str) {
        return ((ClusterClient) obj).stopWithSavepoint((JobID) obj2, z, str);
    }

    public Object initializeTableEnvironment(Object obj, Object obj2, Object obj3, Object obj4, ClassLoader classLoader) throws SqlExecutionException {
        SessionState sessionState = (SessionState) obj4;
        this.streamExecEnv = (StreamExecutionEnvironment) obj3;
        this.environment = (Environment) obj;
        this.classLoader = classLoader;
        EnvironmentSettings environmentSettings = this.environment.getExecution().getEnvironmentSettings();
        TableConfig tableConfig = new TableConfig();
        this.environment.getConfiguration().asMap().forEach((str, str2) -> {
            tableConfig.getConfiguration().setString(str, str2);
        });
        if (sessionState == null) {
            ModuleManager moduleManager = new ModuleManager();
            CatalogManager build = CatalogManager.newBuilder().classLoader(classLoader).config(tableConfig.getConfiguration()).defaultCatalog(environmentSettings.getBuiltInCatalogName(), new GenericInMemoryCatalog(environmentSettings.getBuiltInCatalogName(), environmentSettings.getBuiltInDatabaseName())).build();
            FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, build, moduleManager);
            this.sessionState = SessionState.of(build, moduleManager, functionCatalog);
            createTableEnvironment(environmentSettings, tableConfig, build, moduleManager, functionCatalog);
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            this.environment.getModules().forEach((str3, moduleEntry) -> {
            });
            if (!linkedHashMap.isEmpty()) {
                this.tableEnv.unloadModule("core");
                TableEnvironmentInternal tableEnvironmentInternal = this.tableEnv;
                tableEnvironmentInternal.getClass();
                linkedHashMap.forEach(tableEnvironmentInternal::loadModule);
            }
            registerFunctions();
            initializeCatalogs();
        } else {
            this.sessionState = sessionState;
            createTableEnvironment(environmentSettings, tableConfig, sessionState.catalogManager, sessionState.moduleManager, sessionState.functionCatalog);
        }
        return this.tableEnv;
    }

    private void createTableEnvironment(EnvironmentSettings environmentSettings, TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        if (this.environment.getExecution().isStreamingPlanner()) {
            this.execEnv = null;
            this.executor = lookupExecutor(environmentSettings.toExecutorProperties(), this.streamExecEnv);
            this.tableEnv = createStreamTableEnvironment(this.streamExecEnv, environmentSettings, tableConfig, this.executor, catalogManager, moduleManager, functionCatalog);
        } else {
            this.streamExecEnv = null;
            this.execEnv = createExecutionEnvironment();
            this.executor = null;
            this.tableEnv = new BatchTableEnvironmentImpl(this.execEnv, tableConfig, catalogManager, moduleManager);
        }
    }

    private void initializeCatalogs() throws SqlExecutionException {
        wrapClassLoader(() -> {
            this.environment.getCatalogs().forEach((str, catalogEntry) -> {
                this.tableEnv.registerCatalog(str, createCatalog(str, catalogEntry.asMap(), this.classLoader));
            });
        });
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : this.environment.getTables().entrySet()) {
            String str = (String) entry.getKey();
            TableEntry tableEntry = (TableEntry) entry.getValue();
            if ((tableEntry instanceof SourceTableEntry) || (tableEntry instanceof SourceSinkTableEntry)) {
                hashMap.put(str, createTableSource(this.environment.getExecution(), tableEntry.asMap(), this.classLoader));
            }
            if ((tableEntry instanceof SinkTableEntry) || (tableEntry instanceof SourceSinkTableEntry)) {
                hashMap2.put(str, createTableSink(this.environment.getExecution(), tableEntry.asMap(), this.classLoader));
            }
        }
        TableEnvironmentInternal tableEnvironmentInternal = this.tableEnv;
        tableEnvironmentInternal.getClass();
        hashMap.forEach(tableEnvironmentInternal::registerTableSourceInternal);
        TableEnvironmentInternal tableEnvironmentInternal2 = this.tableEnv;
        tableEnvironmentInternal2.getClass();
        hashMap2.forEach(tableEnvironmentInternal2::registerTableSinkInternal);
        Iterator it = this.environment.getTables().entrySet().iterator();
        while (it.hasNext()) {
            TableEntry tableEntry2 = (TableEntry) ((Map.Entry) it.next()).getValue();
            if (tableEntry2 instanceof TemporalTableEntry) {
                registerTemporalTable((TemporalTableEntry) tableEntry2);
            }
        }
        Iterator it2 = this.environment.getTables().entrySet().iterator();
        while (it2.hasNext()) {
            TableEntry tableEntry3 = (TableEntry) ((Map.Entry) it2.next()).getValue();
            if (tableEntry3 instanceof ViewEntry) {
                registerView((ViewEntry) tableEntry3);
            }
        }
        Optional currentCatalog = this.environment.getExecution().getCurrentCatalog();
        TableEnvironmentInternal tableEnvironmentInternal3 = this.tableEnv;
        tableEnvironmentInternal3.getClass();
        currentCatalog.ifPresent(tableEnvironmentInternal3::useCatalog);
        Optional currentDatabase = this.environment.getExecution().getCurrentDatabase();
        TableEnvironmentInternal tableEnvironmentInternal4 = this.tableEnv;
        tableEnvironmentInternal4.getClass();
        currentDatabase.ifPresent(tableEnvironmentInternal4::useDatabase);
    }

    private ExecutionEnvironment createExecutionEnvironment() {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(this.environment.getExecution().getRestartStrategy());
        executionEnvironment.setParallelism(this.environment.getExecution().getParallelism());
        return executionEnvironment;
    }

    private void registerFunctions() throws SqlExecutionException {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.environment.getFunctions().forEach((str, functionEntry) -> {
            linkedHashMap.put(str, FunctionService.createFunction(functionEntry.getDescriptor(), this.classLoader, false));
        });
        registerFunctions(linkedHashMap);
    }

    private void registerFunctions(Map<String, FunctionDefinition> map) throws SqlExecutionException {
        if (this.tableEnv instanceof StreamTableEnvironment) {
            StreamTableEnvironment streamTableEnvironment = this.tableEnv;
            for (Map.Entry<String, FunctionDefinition> entry : map.entrySet()) {
                String key = entry.getKey();
                TableFunction tableFunction = (FunctionDefinition) entry.getValue();
                if (tableFunction instanceof ScalarFunction) {
                    streamTableEnvironment.registerFunction(key, (ScalarFunction) tableFunction);
                } else if (tableFunction instanceof AggregateFunction) {
                    streamTableEnvironment.registerFunction(key, (AggregateFunction) tableFunction);
                } else {
                    if (!(tableFunction instanceof TableFunction)) {
                        throw new SqlExecutionException(MessageFormat.format(FlinkErrorCodeSummary.SUPPORTED_FUNCTION_TYPE.getErrorDesc(), tableFunction.getClass().getName()));
                    }
                    streamTableEnvironment.registerFunction(key, tableFunction);
                }
            }
            return;
        }
        BatchTableEnvironment batchTableEnvironment = this.tableEnv;
        for (Map.Entry<String, FunctionDefinition> entry2 : map.entrySet()) {
            String key2 = entry2.getKey();
            TableFunction tableFunction2 = (FunctionDefinition) entry2.getValue();
            if (tableFunction2 instanceof ScalarFunction) {
                batchTableEnvironment.registerFunction(key2, (ScalarFunction) tableFunction2);
            } else if (tableFunction2 instanceof AggregateFunction) {
                batchTableEnvironment.registerFunction(key2, (AggregateFunction) tableFunction2);
            } else {
                if (!(tableFunction2 instanceof TableFunction)) {
                    throw new SqlExecutionException(MessageFormat.format(FlinkErrorCodeSummary.SUPPORTED_FUNCTION_TYPE.getErrorDesc(), tableFunction2.getClass().getName()));
                }
                batchTableEnvironment.registerFunction(key2, tableFunction2);
            }
        }
    }

    private void registerView(ViewEntry viewEntry) throws SqlExecutionException {
        try {
            this.tableEnv.registerTable(viewEntry.getName(), this.tableEnv.sqlQuery(viewEntry.getQuery()));
        } catch (Exception e) {
            throw new SqlExecutionException("Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery() + "\nCause: " + e.getMessage());
        }
    }

    private void registerTemporalTable(TemporalTableEntry temporalTableEntry) throws SqlExecutionException {
        try {
            TemporalTableFunction createTemporalTableFunction = this.tableEnv.scan(new String[]{temporalTableEntry.getHistoryTable()}).createTemporalTableFunction(temporalTableEntry.getTimeAttribute(), String.join(",", temporalTableEntry.getPrimaryKeyFields()));
            if (this.tableEnv instanceof StreamTableEnvironment) {
                this.tableEnv.registerFunction(temporalTableEntry.getName(), createTemporalTableFunction);
            } else {
                this.tableEnv.registerFunction(temporalTableEntry.getName(), createTemporalTableFunction);
            }
        } catch (Exception e) {
            throw new SqlExecutionException("Invalid temporal table '" + temporalTableEntry.getName() + "' over table '" + temporalTableEntry.getHistoryTable() + ".\nCause: " + e.getMessage());
        }
    }

    private static Executor lookupExecutor(Map<String, String> map, StreamExecutionEnvironment streamExecutionEnvironment) {
        try {
            ExecutorFactory find = ComponentFactoryService.find(ExecutorFactory.class, map);
            return (Executor) find.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class).invoke(find, map, streamExecutionEnvironment);
        } catch (Exception e) {
            throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", e);
        }
    }

    private Module createModule(Map<String, String> map, ClassLoader classLoader) {
        return TableFactoryService.find(ModuleFactory.class, map, classLoader).createModule(map);
    }

    private Catalog createCatalog(String str, Map<String, String> map, ClassLoader classLoader) {
        return TableFactoryService.find(CatalogFactory.class, map, classLoader).createCatalog(str, map);
    }

    private static TableSource<?> createTableSource(ExecutionEntry executionEntry, Map<String, String> map, ClassLoader classLoader) throws SqlExecutionException {
        if (executionEntry.isStreamingPlanner()) {
            return TableFactoryService.find(TableSourceFactory.class, map, classLoader).createTableSource(map);
        }
        if (executionEntry.isBatchPlanner()) {
            return TableFactoryService.find(BatchTableSourceFactory.class, map, classLoader).createBatchTableSource(map);
        }
        throw new SqlExecutionException(FlinkErrorCodeSummary.SUPPORTED_SOURCES.getErrorDesc());
    }

    private static TableSink<?> createTableSink(ExecutionEntry executionEntry, Map<String, String> map, ClassLoader classLoader) throws SqlExecutionException {
        if (executionEntry.isStreamingPlanner()) {
            return TableFactoryService.find(TableSinkFactory.class, map, classLoader).createTableSink(map);
        }
        if (executionEntry.isBatchPlanner()) {
            return TableFactoryService.find(BatchTableSinkFactory.class, map, classLoader).createBatchTableSink(map);
        }
        throw new SqlExecutionException(FlinkErrorCodeSummary.SUPPORTED_SINKS.getErrorDesc());
    }

    private TableEnvironmentInternal createStreamTableEnvironment(StreamExecutionEnvironment streamExecutionEnvironment, EnvironmentSettings environmentSettings, TableConfig tableConfig, Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        Map plannerProperties = environmentSettings.toPlannerProperties();
        return new StreamTableEnvironmentImpl(catalogManager, moduleManager, functionCatalog, tableConfig, streamExecutionEnvironment, ComponentFactoryService.find(PlannerFactory.class, plannerProperties).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager), executor, environmentSettings.isStreamingMode(), this.classLoader);
    }

    void wrapClassLoader(Runnable runnable) {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(this.classLoader);
        Throwable th = null;
        try {
            try {
                runnable.run();
                if (of != null) {
                    if (0 == 0) {
                        of.close();
                        return;
                    }
                    try {
                        of.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (of != null) {
                if (th != null) {
                    try {
                        of.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    of.close();
                }
            }
            throw th4;
        }
    }
}
