package org.apache.zeppelin.flink;

import com.google.common.collect.Lists;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.util.SqlSplitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/flink/FlinkSqlInterpreter.class */
public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
    protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterpreter.class);
    protected FlinkInterpreter flinkInterpreter;
    protected TableEnvironment tbenv;
    private SqlCommandParser sqlCommandParser;
    private SqlSplitter sqlSplitter;
    private int defaultSqlParallelism;
    private ReentrantReadWriteLock.WriteLock lock;
    private Map<String, ConfigOption> tableConfigOptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.zeppelin.flink.FlinkSqlInterpreter$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/zeppelin/flink/FlinkSqlInterpreter$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand = new int[SqlCommandParser.SqlCommand.values().length];

        static {
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.HELP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SHOW_CATALOGS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SHOW_DATABASES.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SHOW_TABLES.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SOURCE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.CREATE_FUNCTION.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DROP_FUNCTION.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.ALTER_FUNCTION.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SHOW_FUNCTIONS.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SHOW_MODULES.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.USE_CATALOG.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.USE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.CREATE_CATALOG.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DROP_CATALOG.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DESC.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DESCRIBE.ordinal()] = 16;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.EXPLAIN.ordinal()] = 17;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SELECT.ordinal()] = 18;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.SET.ordinal()] = 19;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.INSERT_INTO.ordinal()] = 20;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.INSERT_OVERWRITE.ordinal()] = 21;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.CREATE_TABLE.ordinal()] = 22;
            } catch (NoSuchFieldError e22) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DROP_TABLE.ordinal()] = 23;
            } catch (NoSuchFieldError e23) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.CREATE_VIEW.ordinal()] = 24;
            } catch (NoSuchFieldError e24) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DROP_VIEW.ordinal()] = 25;
            } catch (NoSuchFieldError e25) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.CREATE_DATABASE.ordinal()] = 26;
            } catch (NoSuchFieldError e26) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.DROP_DATABASE.ordinal()] = 27;
            } catch (NoSuchFieldError e27) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.ALTER_DATABASE.ordinal()] = 28;
            } catch (NoSuchFieldError e28) {
            }
            try {
                $SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[SqlCommandParser.SqlCommand.ALTER_TABLE.ordinal()] = 29;
            } catch (NoSuchFieldError e29) {
            }
        }
    }

    public FlinkSqlInterpreter(Properties properties) {
        super(properties);
        this.lock = new ReentrantReadWriteLock().writeLock();
    }

    protected abstract boolean isBatch();

    public void open() throws InterpreterException {
        this.sqlCommandParser = new SqlCommandParser(this.flinkInterpreter.getFlinkShims(), this.tbenv);
        this.sqlSplitter = new SqlSplitter();
        JobListener jobListener = new JobListener() { // from class: org.apache.zeppelin.flink.FlinkSqlInterpreter.1
            public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
                if (FlinkSqlInterpreter.this.lock.isHeldByCurrentThread()) {
                    FlinkSqlInterpreter.this.lock.unlock();
                    FlinkSqlInterpreter.LOGGER.info("UnLock JobSubmitLock");
                }
            }

            public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
            }
        };
        this.flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
        this.flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
        this.defaultSqlParallelism = this.flinkInterpreter.getDefaultSqlParallelism();
        this.tableConfigOptions = this.flinkInterpreter.getFlinkShims().extractTableConfigOptions();
    }

    protected InterpreterResult internalInterpret(String str, InterpreterContext interpreterContext) throws InterpreterException {
        LOGGER.debug("Interpret code: " + str);
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(this.flinkInterpreter.getFlinkScalaShellLoader());
            this.flinkInterpreter.createPlannerAgain();
            this.flinkInterpreter.setParallelismIfNecessary(interpreterContext);
            this.flinkInterpreter.setSavepointPathIfNecessary(interpreterContext);
            InterpreterResult runSqlList = runSqlList(str, interpreterContext);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return runSqlList;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public ZeppelinContext getZeppelinContext() {
        if (this.flinkInterpreter != null) {
            return this.flinkInterpreter.getZeppelinContext();
        }
        return null;
    }

    /* JADX WARN: Finally extract failed */
    private InterpreterResult runSqlList(String str, InterpreterContext interpreterContext) {
        try {
            try {
                boolean parseBoolean = Boolean.parseBoolean(interpreterContext.getStringLocalProperty("runAsOne", "false"));
                boolean z = true;
                boolean z2 = false;
                for (String str2 : (List) this.sqlSplitter.splitSql(str).stream().map((v0) -> {
                    return v0.trim();
                }).collect(Collectors.toList())) {
                    Optional parse = this.sqlCommandParser.parse(str2);
                    if (!parse.isPresent()) {
                        try {
                            interpreterContext.out.write("%text Invalid Sql statement: " + str2 + "\n");
                            interpreterContext.out.write(this.flinkInterpreter.getFlinkShims().sqlHelp());
                            InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.ERROR);
                            this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                            for (ConfigOption configOption : this.tableConfigOptions.values()) {
                                if (configOption.defaultValue() != null) {
                                    this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
                                }
                            }
                            this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                            return interpreterResult;
                        } catch (IOException e) {
                            InterpreterResult interpreterResult2 = new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
                            this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                            for (ConfigOption configOption2 : this.tableConfigOptions.values()) {
                                if (configOption2.defaultValue() != null) {
                                    this.tbenv.getConfig().getConfiguration().set(configOption2, configOption2.defaultValue());
                                }
                            }
                            this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                            return interpreterResult2;
                        }
                    }
                    try {
                        if (((SqlCommandParser.SqlCommandCall) parse.get()).command == SqlCommandParser.SqlCommand.INSERT_INTO || ((SqlCommandParser.SqlCommandCall) parse.get()).command == SqlCommandParser.SqlCommand.INSERT_OVERWRITE) {
                            z2 = true;
                            if (z && parseBoolean) {
                                this.flinkInterpreter.getFlinkShims().startMultipleInsert(this.tbenv, interpreterContext);
                                z = false;
                            }
                        }
                        callCommand((SqlCommandParser.SqlCommandCall) parse.get(), interpreterContext);
                        interpreterContext.out.flush();
                    } catch (Throwable th) {
                        LOGGER.error("Fail to run sql:" + str2, th);
                        try {
                            interpreterContext.out.write("%text Fail to run sql command: " + str2 + "\n" + ExceptionUtils.getStackTrace(th) + "\n");
                            InterpreterResult interpreterResult3 = new InterpreterResult(InterpreterResult.Code.ERROR);
                            this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                            for (ConfigOption configOption3 : this.tableConfigOptions.values()) {
                                if (configOption3.defaultValue() != null) {
                                    this.tbenv.getConfig().getConfiguration().set(configOption3, configOption3.defaultValue());
                                }
                            }
                            this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                            return interpreterResult3;
                        } catch (IOException e2) {
                            LOGGER.warn("Unexpected exception:", e2);
                            InterpreterResult interpreterResult4 = new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(th));
                            this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                            for (ConfigOption configOption4 : this.tableConfigOptions.values()) {
                                if (configOption4.defaultValue() != null) {
                                    this.tbenv.getConfig().getConfiguration().set(configOption4, configOption4.defaultValue());
                                }
                            }
                            this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                            return interpreterResult4;
                        }
                    }
                }
                if (parseBoolean) {
                    try {
                        if (z2) {
                            try {
                                this.lock.lock();
                                if (this.flinkInterpreter.getFlinkShims().executeMultipleInsertInto(interpreterContext.getStringLocalProperty("jobName", str), this.tbenv, interpreterContext)) {
                                    interpreterContext.out.write("Insertion successfully.\n");
                                }
                                if (this.lock.isHeldByCurrentThread()) {
                                    this.lock.unlock();
                                }
                            } catch (Exception e3) {
                                LOGGER.error("Fail to execute sql as one job", e3);
                                InterpreterResult interpreterResult5 = new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e3));
                                if (this.lock.isHeldByCurrentThread()) {
                                    this.lock.unlock();
                                }
                                this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                                for (ConfigOption configOption5 : this.tableConfigOptions.values()) {
                                    if (configOption5.defaultValue() != null) {
                                        this.tbenv.getConfig().getConfiguration().set(configOption5, configOption5.defaultValue());
                                    }
                                }
                                this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                                return interpreterResult5;
                            }
                        }
                    } catch (Throwable th2) {
                        if (this.lock.isHeldByCurrentThread()) {
                            this.lock.unlock();
                        }
                        throw th2;
                    }
                }
                this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                for (ConfigOption configOption6 : this.tableConfigOptions.values()) {
                    if (configOption6.defaultValue() != null) {
                        this.tbenv.getConfig().getConfiguration().set(configOption6, configOption6.defaultValue());
                    }
                }
                this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                return new InterpreterResult(InterpreterResult.Code.SUCCESS);
            } catch (Exception e4) {
                LOGGER.error("Fail to execute sql", e4);
                InterpreterResult interpreterResult6 = new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e4));
                this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
                for (ConfigOption configOption7 : this.tableConfigOptions.values()) {
                    if (configOption7.defaultValue() != null) {
                        this.tbenv.getConfig().getConfiguration().set(configOption7, configOption7.defaultValue());
                    }
                }
                this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
                return interpreterResult6;
            }
        } catch (Throwable th3) {
            this.tbenv.getConfig().getConfiguration().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Integer.valueOf(this.defaultSqlParallelism));
            for (ConfigOption configOption8 : this.tableConfigOptions.values()) {
                if (configOption8.defaultValue() != null) {
                    this.tbenv.getConfig().getConfiguration().set(configOption8, configOption8.defaultValue());
                }
            }
            this.tbenv.getConfig().getConfiguration().addAll(this.flinkInterpreter.getFlinkConfiguration());
            throw th3;
        }
    }

    private void callCommand(SqlCommandParser.SqlCommandCall sqlCommandCall, InterpreterContext interpreterContext) throws Exception {
        switch (AnonymousClass2.$SwitchMap$org$apache$zeppelin$flink$sql$SqlCommandParser$SqlCommand[sqlCommandCall.command.ordinal()]) {
            case 1:
                callHelp(interpreterContext);
                return;
            case 2:
                callShowCatalogs(interpreterContext);
                return;
            case 3:
                callShowDatabases(interpreterContext);
                return;
            case 4:
                callShowTables(interpreterContext);
                return;
            case 5:
                callSource(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 6:
                callCreateFunction(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 7:
                callDropFunction(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 8:
                callAlterFunction(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 9:
                callShowFunctions(interpreterContext);
                return;
            case 10:
                callShowModules(interpreterContext);
                return;
            case 11:
                callUseCatalog(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 12:
                callUseDatabase(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 13:
                callCreateCatalog(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 14:
                callDropCatalog(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 15:
            case 16:
                callDescribe(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 17:
                callExplain(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 18:
                callSelect(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 19:
                callSet(sqlCommandCall.operands[0], sqlCommandCall.operands[1], interpreterContext);
                return;
            case 20:
            case 21:
                callInsertInto(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 22:
                callCreateTable(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 23:
                callDropTable(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 24:
                callCreateView(sqlCommandCall, interpreterContext);
                return;
            case 25:
                callDropView(sqlCommandCall, interpreterContext);
                return;
            case 26:
                callCreateDatabase(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 27:
                callDropDatabase(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 28:
                callAlterDatabase(sqlCommandCall.operands[0], interpreterContext);
                return;
            case 29:
                callAlterTable(sqlCommandCall.operands[0], interpreterContext);
                return;
            default:
                throw new Exception("Unsupported command: " + sqlCommandCall.command);
        }
    }

    private void callAlterTable(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            this.tbenv.sqlUpdate(str);
            interpreterContext.out.write("Table has been modified.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callAlterDatabase(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            this.tbenv.sqlUpdate(str);
            interpreterContext.out.write("Database has been modified.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callDropDatabase(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.tbenv.sqlUpdate(str);
            interpreterContext.out.write("Database has been dropped.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callCreateDatabase(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.tbenv.sqlUpdate(str);
            interpreterContext.out.write("Database has been created.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callDropView(SqlCommandParser.SqlCommandCall sqlCommandCall, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            if (this.flinkInterpreter.getFlinkVersion().isFlink110()) {
                this.tbenv.dropTemporaryView(sqlCommandCall.operands[0]);
            } else {
                this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, sqlCommandCall.sql);
            }
            interpreterContext.out.write("View has been dropped.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommandCall, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            if (this.flinkInterpreter.getFlinkVersion().isFlink110()) {
                this.tbenv.createTemporaryView(sqlCommandCall.operands[0], this.tbenv.sqlQuery(sqlCommandCall.operands[1]));
            } else {
                this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, sqlCommandCall.sql);
            }
            interpreterContext.out.write("View has been created.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callCreateTable(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            this.tbenv.sqlUpdate(str);
            interpreterContext.out.write("Table has been created.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callDropTable(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            this.tbenv.sqlUpdate(str);
            interpreterContext.out.write("Table has been dropped.\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    private void callUseCatalog(String str, InterpreterContext interpreterContext) throws IOException {
        this.tbenv.useCatalog(str);
    }

    private void callCreateCatalog(String str, InterpreterContext interpreterContext) throws IOException {
        this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, str);
        interpreterContext.out.write("Catalog has been created.\n");
    }

    private void callDropCatalog(String str, InterpreterContext interpreterContext) throws IOException {
        this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, str);
        interpreterContext.out.write("Catalog has been dropped.\n");
    }

    private void callShowModules(InterpreterContext interpreterContext) throws IOException {
        interpreterContext.out.write("%table module\n" + StringUtils.join(this.tbenv.listModules(), "\n") + "\n");
    }

    private void callHelp(InterpreterContext interpreterContext) throws IOException {
        interpreterContext.out.write(this.flinkInterpreter.getFlinkShims().sqlHelp());
    }

    private void callShowCatalogs(InterpreterContext interpreterContext) throws IOException {
        interpreterContext.out.write("%table catalog\n" + StringUtils.join(this.tbenv.listCatalogs(), "\n") + "\n");
    }

    private void callShowDatabases(InterpreterContext interpreterContext) throws IOException {
        interpreterContext.out.write("%table database\n" + StringUtils.join(this.tbenv.listDatabases(), "\n") + "\n");
    }

    private void callShowTables(InterpreterContext interpreterContext) throws IOException {
        interpreterContext.out.write("%table table\n" + StringUtils.join((List) Lists.newArrayList(this.tbenv.listTables()).stream().filter(str -> {
            return !str.startsWith("UnnamedTable");
        }).collect(Collectors.toList()), "\n") + "\n");
    }

    private void callSource(String str, InterpreterContext interpreterContext) throws IOException {
        runSqlList(IOUtils.toString(new FileInputStream(str)), interpreterContext);
    }

    private void callCreateFunction(String str, InterpreterContext interpreterContext) throws IOException {
        this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, str);
        interpreterContext.out.write("Function has been created.\n");
    }

    private void callDropFunction(String str, InterpreterContext interpreterContext) throws IOException {
        this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, str);
        interpreterContext.out.write("Function has been dropped.\n");
    }

    private void callAlterFunction(String str, InterpreterContext interpreterContext) throws IOException {
        this.flinkInterpreter.getFlinkShims().executeSql(this.tbenv, str);
        interpreterContext.out.write("Function has been modified.\n");
    }

    private void callShowFunctions(InterpreterContext interpreterContext) throws IOException {
        interpreterContext.out.write("%table function\n" + StringUtils.join(this.tbenv.listUserDefinedFunctions(), "\n") + "\n");
    }

    private void callUseDatabase(String str, InterpreterContext interpreterContext) throws IOException {
        this.tbenv.useDatabase(str);
    }

    private void callDescribe(String str, InterpreterContext interpreterContext) throws IOException {
        TableSchema schema = this.tbenv.scan(str.split("\\.")).getSchema();
        StringBuilder sb = new StringBuilder();
        sb.append("Column\tType\n");
        for (int i = 0; i < schema.getFieldCount(); i++) {
            sb.append(((String) schema.getFieldName(i).get()) + "\t" + schema.getFieldDataType(i).get() + "\n");
        }
        interpreterContext.out.write("%table\n" + sb.toString());
    }

    private void callExplain(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            interpreterContext.out.write(this.flinkInterpreter.getFlinkShims().explain(this.tbenv, str) + "\n");
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    public void callSelect(String str, InterpreterContext interpreterContext) throws IOException {
        try {
            this.lock.lock();
            callInnerSelect(str, interpreterContext);
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    public abstract void callInnerSelect(String str, InterpreterContext interpreterContext) throws IOException;

    private String removeSingleQuote(String str) {
        String trim = str.trim();
        if (trim.startsWith("'")) {
            trim = trim.substring(1);
        }
        if (trim.endsWith("'")) {
            trim = trim.substring(0, trim.length() - 1);
        }
        return trim;
    }

    public void callSet(String str, String str2, InterpreterContext interpreterContext) throws Exception {
        String removeSingleQuote = removeSingleQuote(str);
        String removeSingleQuote2 = removeSingleQuote(str2);
        if ("execution.runtime-mode".equals(removeSingleQuote)) {
            throw new UnsupportedOperationException("execution.runtime-mode is not supported to set, you can use %flink.ssql & %flink.bsql to switch between streaming mode and batch mode");
        }
        if (!this.tableConfigOptions.containsKey(removeSingleQuote)) {
            throw new IOException(removeSingleQuote + " is not a valid table/sql config, please check link: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html");
        }
        LOGGER.info("Set table config: {}={}", removeSingleQuote, removeSingleQuote2);
        this.tbenv.getConfig().getConfiguration().setString(removeSingleQuote, removeSingleQuote2);
    }

    public void callInsertInto(String str, InterpreterContext interpreterContext) throws IOException {
        if (!isBatch()) {
            interpreterContext.getLocalProperties().put("flink.streaming.insert_into", "true");
        }
        try {
            try {
                this.lock.lock();
                if (Boolean.parseBoolean(interpreterContext.getStringLocalProperty("runAsOne", "false"))) {
                    this.flinkInterpreter.getFlinkShims().addInsertStatement(str, this.tbenv, interpreterContext);
                } else {
                    this.tbenv.sqlUpdate(str);
                    this.tbenv.execute(interpreterContext.getStringLocalProperty("jobName", str));
                    interpreterContext.out.write("Insertion successfully.\n");
                }
            } catch (Exception e) {
                throw new IOException(e);
            }
        } finally {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
        }
    }

    public void cancel(InterpreterContext interpreterContext) throws InterpreterException {
        this.flinkInterpreter.cancel(interpreterContext);
    }
}
