package org.apache.flink.table.gateway.service;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.session.Session;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.class */
public class SqlGatewayServiceImpl implements SqlGatewayService {
    private static final Logger LOG = LoggerFactory.getLogger(SqlGatewayServiceImpl.class);
    private final SessionManager sessionManager;

    public SqlGatewayServiceImpl(SessionManager sessionManager) {
        this.sessionManager = sessionManager;
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public SessionHandle openSession(SessionEnvironment sessionEnvironment) throws SqlGatewayException {
        try {
            return this.sessionManager.openSession(sessionEnvironment).getSessionHandle();
        } catch (Throwable th) {
            LOG.error("Failed to openSession.", th);
            throw new SqlGatewayException("Failed to openSession.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public void closeSession(SessionHandle sessionHandle) throws SqlGatewayException {
        try {
            this.sessionManager.closeSession(sessionHandle);
        } catch (Throwable th) {
            LOG.error("Failed to closeSession.", th);
            throw new SqlGatewayException("Failed to closeSession.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public void configureSession(SessionHandle sessionHandle, String str, long j) throws SqlGatewayException {
        try {
            if (j > 0) {
                throw new UnsupportedOperationException("SqlGatewayService doesn't support timeout mechanism now.");
            }
            OperationManager operationManager = getSession(sessionHandle).getOperationManager();
            OperationHandle submitOperation = operationManager.submitOperation(operationHandle -> {
                return getSession(sessionHandle).createExecutor().configureSession(operationHandle, str);
            });
            operationManager.awaitOperationTermination(submitOperation);
            operationManager.closeOperation(submitOperation);
        } catch (Throwable th) {
            LOG.error("Failed to configure session.", th);
            throw new SqlGatewayException("Failed to configure session.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).getSessionConfig();
        } catch (Throwable th) {
            LOG.error("Failed to getSessionConfig.", th);
            throw new SqlGatewayException("Failed to getSessionConfig.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public EndpointVersion getSessionEndpointVersion(SessionHandle sessionHandle) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).getEndpointVersion();
        } catch (Throwable th) {
            LOG.error("Failed to getSessionConfig.", th);
            throw new SqlGatewayException("Failed to getSessionEndpointVersion.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> callable) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).getOperationManager().submitOperation(callable);
        } catch (Throwable th) {
            LOG.error("Failed to submitOperation.", th);
            throw new SqlGatewayException("Failed to submitOperation.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) {
        try {
            getSession(sessionHandle).getOperationManager().cancelOperation(operationHandle);
        } catch (Throwable th) {
            LOG.error("Failed to cancelOperation.", th);
            throw new SqlGatewayException("Failed to cancelOperation.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) {
        try {
            getSession(sessionHandle).getOperationManager().closeOperation(operationHandle);
        } catch (Throwable th) {
            LOG.error("Failed to closeOperation.", th);
            throw new SqlGatewayException("Failed to closeOperation.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) {
        try {
            return getSession(sessionHandle).getOperationManager().getOperationInfo(operationHandle);
        } catch (Throwable th) {
            LOG.error("Failed to getOperationInfo.", th);
            throw new SqlGatewayException("Failed to getOperationInfo.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).getOperationManager().getOperationResultSchema(operationHandle);
        } catch (Throwable th) {
            LOG.error("Failed to getOperationResultSchema.", th);
            throw new SqlGatewayException("Failed to getOperationResultSchema.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public OperationHandle executeStatement(SessionHandle sessionHandle, String str, long j, Configuration configuration) throws SqlGatewayException {
        try {
            if (j > 0) {
                throw new UnsupportedOperationException("SqlGatewayService doesn't support timeout mechanism now.");
            }
            return getSession(sessionHandle).getOperationManager().submitOperation(operationHandle -> {
                return getSession(sessionHandle).createExecutor(configuration).executeStatement(operationHandle, str);
            });
        } catch (Throwable th) {
            LOG.error("Failed to execute statement.", th);
            throw new SqlGatewayException("Failed to execute statement.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, long j, int i) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).getOperationManager().fetchResults(operationHandle, j, i);
        } catch (Throwable th) {
            LOG.error("Failed to fetchResults.", th);
            throw new SqlGatewayException("Failed to fetchResults.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public ResultSet fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, FetchOrientation fetchOrientation, int i) {
        try {
            return getSession(sessionHandle).getOperationManager().fetchResults(operationHandle, fetchOrientation, i);
        } catch (Throwable th) {
            LOG.error("Failed to fetchResults.", th);
            throw new SqlGatewayException("Failed to fetchResults.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public String getCurrentCatalog(SessionHandle sessionHandle) {
        return getSession(sessionHandle).createExecutor().getCurrentCatalog();
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public Set<String> listCatalogs(SessionHandle sessionHandle) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).createExecutor().listCatalogs();
        } catch (Throwable th) {
            LOG.error("Failed to listCatalogs.", th);
            throw new SqlGatewayException("Failed to listCatalogs.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public Set<String> listDatabases(SessionHandle sessionHandle, String str) {
        try {
            return getSession(sessionHandle).createExecutor().listDatabases(str);
        } catch (Throwable th) {
            LOG.error("Failed to listDatabases.", th);
            throw new SqlGatewayException("Failed to listDatabases.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public Set<TableInfo> listTables(SessionHandle sessionHandle, String str, String str2, Set<CatalogBaseTable.TableKind> set) {
        try {
            return getSession(sessionHandle).createExecutor().listTables(str, str2, set);
        } catch (Throwable th) {
            LOG.error("Failed to listTables.", th);
            throw new SqlGatewayException("Failed to listTables.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public ResolvedCatalogBaseTable<?> getTable(SessionHandle sessionHandle, ObjectIdentifier objectIdentifier) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).createExecutor().getTable(objectIdentifier);
        } catch (Throwable th) {
            LOG.error("Failed to getTable.", th);
            throw new SqlGatewayException("Failed to getTable.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public Set<FunctionInfo> listUserDefinedFunctions(SessionHandle sessionHandle, String str, String str2) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).createExecutor().listUserDefinedFunctions(str, str2);
        } catch (Throwable th) {
            LOG.error("Failed to listUserDefinedFunctions.", th);
            throw new SqlGatewayException("Failed to listUserDefinedFunctions.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public Set<FunctionInfo> listSystemFunctions(SessionHandle sessionHandle) {
        try {
            return getSession(sessionHandle).createExecutor().listSystemFunctions();
        } catch (Throwable th) {
            LOG.error("Failed to listSystemFunctions.", th);
            throw new SqlGatewayException("Failed to listSystemFunctions.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public FunctionDefinition getFunctionDefinition(SessionHandle sessionHandle, UnresolvedIdentifier unresolvedIdentifier) throws SqlGatewayException {
        try {
            return getSession(sessionHandle).createExecutor().getFunctionDefinition(unresolvedIdentifier);
        } catch (Throwable th) {
            LOG.error("Failed to getFunctionDefinition.", th);
            throw new SqlGatewayException("Failed to getFunctionDefinition.", th);
        }
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public GatewayInfo getGatewayInfo() {
        return GatewayInfo.INSTANCE;
    }

    @Override // org.apache.flink.table.gateway.api.SqlGatewayService
    public List<String> completeStatement(SessionHandle sessionHandle, String str, int i) throws SqlGatewayException {
        try {
            OperationManager operationManager = getSession(sessionHandle).getOperationManager();
            OperationHandle submitOperation = operationManager.submitOperation(operationHandle -> {
                return getSession(sessionHandle).createExecutor().getCompletionHints(operationHandle, str, i);
            });
            operationManager.awaitOperationTermination(submitOperation);
            return (List) fetchResults(sessionHandle, submitOperation, 0L, Integer.MAX_VALUE).getData().stream().map(rowData -> {
                return rowData.getString(0);
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.toList());
        } catch (Throwable th) {
            LOG.error("Failed to get statement completion candidates.", th);
            throw new SqlGatewayException("Failed to get statement completion candidates.", th);
        }
    }

    @VisibleForTesting
    public Session getSession(SessionHandle sessionHandle) {
        return this.sessionManager.getSession(sessionHandle);
    }
}
