package org.apache.flink.table.client.gateway;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.table.api.SqlParserEOFException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler;
import org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeaders;
import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders;
import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import org.apache.flink.table.gateway.rest.header.statement.CompleteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
import org.apache.flink.table.gateway.rest.header.util.UrlPrefixDecorator;
import org.apache.flink.table.gateway.rest.message.operation.OperationMessageParameters;
import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.message.util.GetApiVersionResponseBody;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/client/gateway/ExecutorImpl.class */
public class ExecutorImpl implements Executor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorImpl.class);
    private static final long HEARTBEAT_INTERVAL_MILLISECONDS = 60000;
    private final AutoCloseableRegistry registry;
    private final URL gatewayUrl;
    private final ExecutorService executorService;
    private final RestClient restClient;
    private final SqlGatewayRestAPIVersion connectionVersion;
    private final SessionHandle sessionHandle;
    private final RowFormat rowFormat;
    private final Collection<HttpHeader> customHttpHeaders;

    /* loaded from: input_file:org/apache/flink/table/client/gateway/ExecutorImpl$RowDataInfoIterator.class */
    private class RowDataInfoIterator implements CloseableIterator<RowData> {
        private final OperationHandle operationHandle;
        private Iterator<RowData> current;

        @Nullable
        private Long nextToken;

        public RowDataInfoIterator(OperationHandle operationHandle, ResultInfo resultInfo, @Nullable Long l) {
            this.operationHandle = operationHandle;
            this.current = resultInfo.getData().iterator();
            this.nextToken = l;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            ExecutorImpl.getResponse(ExecutorImpl.this.closeOperationAsync(this.operationHandle));
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (!this.current.hasNext()) {
                while (this.nextToken != null && !this.current.hasNext()) {
                    FetchResultsResponseBody fetchResults = fetchResults(this.operationHandle, this.nextToken.longValue());
                    this.nextToken = SqlGatewayRestEndpointUtils.parseToken(fetchResults.getNextResultUri());
                    this.current = fetchResults.getResults().getData().iterator();
                }
            }
            return this.current.hasNext();
        }

        @Override // java.util.Iterator
        public RowData next() {
            return this.current.next();
        }

        private FetchResultsResponseBody fetchResults(OperationHandle operationHandle, long j) {
            return ExecutorImpl.this.getFetchResultResponse(operationHandle, j, false, interruptedException -> {
                ExecutorImpl.this.sendRequest(CancelOperationHeaders.getInstance(), new OperationMessageParameters(ExecutorImpl.this.sessionHandle, operationHandle), EmptyRequestBody.getInstance());
                return new SqlExecutionException("Interrupted to fetch results.", interruptedException);
            });
        }
    }

    public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress inetSocketAddress, String str) {
        this(defaultContext, NetUtils.socketToUrl(inetSocketAddress), str, 60000L, RowFormat.PLAIN_TEXT);
    }

    public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress inetSocketAddress, String str, RowFormat rowFormat) {
        this(defaultContext, NetUtils.socketToUrl(inetSocketAddress), str, 60000L, rowFormat);
    }

    public ExecutorImpl(DefaultContext defaultContext, URL url, String str) {
        this(defaultContext, url, str, 60000L, RowFormat.PLAIN_TEXT);
    }

    @VisibleForTesting
    ExecutorImpl(DefaultContext defaultContext, InetSocketAddress inetSocketAddress, String str, long j) {
        this(defaultContext, NetUtils.socketToUrl(inetSocketAddress), str, j, RowFormat.PLAIN_TEXT);
    }

    @VisibleForTesting
    ExecutorImpl(DefaultContext defaultContext, URL url, String str, long j, RowFormat rowFormat) {
        this.registry = new AutoCloseableRegistry();
        this.gatewayUrl = url;
        this.rowFormat = rowFormat;
        this.customHttpHeaders = readHeadersFromEnvironmentVariable(ConfigConstants.FLINK_REST_CLIENT_HEADERS);
        try {
            this.executorService = Executors.newCachedThreadPool();
            AutoCloseableRegistry autoCloseableRegistry = this.registry;
            ExecutorService executorService = this.executorService;
            executorService.getClass();
            autoCloseableRegistry.registerCloseable(executorService::shutdownNow);
            Configuration flinkConfig = defaultContext.getFlinkConfig();
            this.restClient = RestClient.forUrl(flinkConfig, this.executorService, url);
            this.registry.registerCloseable(this.restClient);
            this.connectionVersion = negotiateVersion();
            LOG.info("Open session to {} with connection version: {}.", url, this.connectionVersion);
            this.sessionHandle = new SessionHandle(UUID.fromString(((OpenSessionResponseBody) sendRequest(OpenSessionHeaders.getInstance(), EmptyMessageParameters.getInstance(), new OpenSessionRequestBody(str, flinkConfig.toMap())).get()).getSessionHandle()));
            this.registry.registerCloseable(this::closeSession);
            ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            AutoCloseableRegistry autoCloseableRegistry2 = this.registry;
            newSingleThreadScheduledExecutor.getClass();
            autoCloseableRegistry2.registerCloseable(newSingleThreadScheduledExecutor::shutdownNow);
            newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
            }, j, j, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            try {
                this.registry.close();
            } catch (Throwable th) {
                e.addSuppressed(th);
            }
            throw new SqlClientException("Failed to create the executor.", e);
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public void configureSession(String str) {
        try {
            sendRequest(ConfigureSessionHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), new ConfigureSessionRequestBody(str)).get();
        } catch (Exception e) {
            throw new SqlExecutionException(String.format("Failed to open session to %s", this.gatewayUrl), e);
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public ReadableConfig getSessionConfig() {
        try {
            return Configuration.fromMap(getSessionConfigMap());
        } catch (Exception e) {
            throw new SqlExecutionException("Failed to get the get session config.", e);
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public Map<String, String> getSessionConfigMap() {
        try {
            return ((GetSessionConfigResponseBody) getResponse(sendRequest(GetSessionConfigHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), EmptyRequestBody.getInstance()))).getProperties();
        } catch (Exception e) {
            throw new SqlExecutionException("Failed to get the get session config.", e);
        }
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public StatementResult executeStatement(String str) {
        CompletableFuture sendRequest = sendRequest(ExecuteStatementHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), new ExecuteStatementRequestBody(str));
        OperationHandle operationHandle = getOperationHandle(() -> {
            return ((ExecuteStatementResponseBody) getResponse(sendRequest, interruptedException -> {
                this.executorService.submit(() -> {
                    try {
                        ExecuteStatementResponseBody executeStatementResponseBody = (ExecuteStatementResponseBody) sendRequest.get();
                        executeStatementResponseBody.getClass();
                        closeOperationAsync(getOperationHandle(executeStatementResponseBody::getOperationHandle));
                    } catch (Exception e) {
                        interruptedException.addSuppressed(e);
                        LOG.error("Failed to cancel the interrupted exception.", interruptedException);
                    }
                });
                return new SqlExecutionException("Interrupted to get response.", interruptedException);
            })).getOperationHandle();
        });
        FetchResultsResponseBody fetchUtilResultsReady = fetchUtilResultsReady(operationHandle);
        return new StatementResult(fetchUtilResultsReady.getResults().getResultSchema(), new RowDataInfoIterator(operationHandle, fetchUtilResultsReady.getResults(), SqlGatewayRestEndpointUtils.parseToken(fetchUtilResultsReady.getNextResultUri())), fetchUtilResultsReady.isQueryResult(), fetchUtilResultsReady.getResultKind(), fetchUtilResultsReady.getJobID());
    }

    @Override // org.apache.flink.table.client.gateway.Executor
    public List<String> completeStatement(String str, int i) {
        return ((CompleteStatementResponseBody) getResponse(sendRequest(CompleteStatementHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), new CompleteStatementRequestBody(str, i)))).getCandidates();
    }

    @Override // org.apache.flink.table.client.gateway.Executor, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.registry.isClosed()) {
            return;
        }
        try {
            this.registry.close();
        } catch (Throwable th) {
            LOG.error("Exception happens when closing the Executor.", ExceptionUtils.firstOrSuppressed(th instanceof Exception ? (Exception) th : new Exception(th), th));
        }
    }

    @VisibleForTesting
    public SessionHandle getSessionHandle() {
        return this.sessionHandle;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u, R r) {
        Preconditions.checkNotNull(this.connectionVersion, "The connection version should not be null.");
        CustomHeadersDecorator customHeadersDecorator = new CustomHeadersDecorator(new UrlPrefixDecorator(m, this.gatewayUrl.getPath()));
        customHeadersDecorator.setCustomHeaders(this.customHttpHeaders);
        return sendRequest(customHeadersDecorator, u, r, this.connectionVersion);
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u, R r, SqlGatewayRestAPIVersion sqlGatewayRestAPIVersion) {
        try {
            return this.restClient.sendRequest(this.gatewayUrl.getHost(), this.gatewayUrl.getPort(), m, u, r, Collections.emptyList(), sqlGatewayRestAPIVersion);
        } catch (IOException e) {
            throw new SqlExecutionException("Failed to connect to the SQL Gateway.", e);
        }
    }

    private FetchResultsResponseBody fetchUtilResultsReady(OperationHandle operationHandle) {
        FetchResultsResponseBody fetchResultResponse;
        do {
            fetchResultResponse = getFetchResultResponse(operationHandle, 0L, true, interruptedException -> {
                closeOperationAsync(operationHandle);
                return new SqlExecutionException("Interrupted to fetch results.", interruptedException);
            });
        } while (fetchResultResponse.getResultType().equals(ResultSet.ResultType.NOT_READY));
        return fetchResultResponse;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FetchResultsResponseBody getFetchResultResponse(OperationHandle operationHandle, long j, boolean z, Function<InterruptedException, SqlExecutionException> function) {
        if (z) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                throw function.apply(e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if ((cause instanceof RestClientException) && cause.getMessage().contains("Encountered \"<EOF>\"")) {
                    throw new SqlExecutionException("The SQL statement is incomplete.", new SqlParserEOFException(cause.getMessage(), cause));
                }
                throw new SqlExecutionException(String.format("Failed to get response for the operation %s.", operationHandle), cause);
            }
        }
        return (FetchResultsResponseBody) sendRequest(FetchResultsHeaders.getDefaultInstance(), new FetchResultsMessageParameters(this.sessionHandle, operationHandle, Long.valueOf(j), this.rowFormat), EmptyRequestBody.getInstance()).get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> T getResponse(CompletableFuture<T> completableFuture) {
        return (T) getResponse(completableFuture, interruptedException -> {
            return new SqlExecutionException("Interrupted to get response.", interruptedException);
        });
    }

    private static <T> T getResponse(CompletableFuture<T> completableFuture, Function<InterruptedException, SqlExecutionException> function) {
        try {
            return completableFuture.get();
        } catch (InterruptedException e) {
            throw function.apply(e);
        } catch (ExecutionException e2) {
            throw new SqlExecutionException("Failed to get response.", e2.getCause());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<OperationStatusResponseBody> closeOperationAsync(OperationHandle operationHandle) {
        return sendRequest(CloseOperationHeaders.getInstance(), new OperationMessageParameters(this.sessionHandle, operationHandle), EmptyRequestBody.getInstance());
    }

    private OperationHandle getOperationHandle(Supplier<String> supplier) {
        return new OperationHandle(UUID.fromString(supplier.get()));
    }

    private SqlGatewayRestAPIVersion negotiateVersion() throws Exception {
        CustomHeadersDecorator customHeadersDecorator = new CustomHeadersDecorator(new UrlPrefixDecorator(GetApiVersionHeaders.getInstance(), this.gatewayUrl.getPath()));
        customHeadersDecorator.setCustomHeaders(this.customHttpHeaders);
        List list = (List) ((GetApiVersionResponseBody) getResponse(this.restClient.sendRequest(this.gatewayUrl.getHost(), this.gatewayUrl.getPort(), customHeadersDecorator, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion) Collections.min(SqlGatewayRestAPIVersion.getStableVersions())))).getVersions().stream().map(SqlGatewayRestAPIVersion::valueOf).collect(Collectors.toList());
        SqlGatewayRestAPIVersion defaultVersion = SqlGatewayRestAPIVersion.getDefaultVersion();
        if (list.contains(defaultVersion)) {
            return defaultVersion;
        }
        SqlGatewayRestAPIVersion sqlGatewayRestAPIVersion = (SqlGatewayRestAPIVersion) RestAPIVersion.getLatestVersion(list);
        if (sqlGatewayRestAPIVersion.equals(SqlGatewayRestAPIVersion.V1)) {
            throw new SqlExecutionException("Currently, SQL Client only supports to connect to the REST endpoint with API version larger than V1.");
        }
        return sqlGatewayRestAPIVersion;
    }

    private void closeSession() throws SqlExecutionException {
        if (this.sessionHandle == null) {
            return;
        }
        try {
            if (!((CloseSessionResponseBody) sendRequest(CloseSessionHeaders.getInstance(), new SessionMessageParameters(this.sessionHandle), EmptyRequestBody.getInstance()).get()).getStatus().equals(CloseSessionHandler.CLOSE_MESSAGE)) {
                LOG.warn("The status of close session response isn't {}.", CloseSessionHandler.CLOSE_MESSAGE);
            }
        } catch (Exception e) {
            LOG.warn(String.format("Unexpected error occurs when closing session %s.", this.sessionHandle), e);
        }
    }

    private static Collection<HttpHeader> readHeadersFromEnvironmentVariable(String str) {
        ArrayList arrayList = new ArrayList();
        String str2 = System.getenv(str);
        if (str2 != null) {
            for (String str3 : str2.split("\n")) {
                String[] split = str3.split(":", 2);
                if (split.length == 2) {
                    arrayList.add(new HttpHeader(split[0], split[1]));
                } else {
                    LOG.info("Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.", str3);
                }
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    Collection<HttpHeader> getCustomHttpHeaders() {
        return this.customHttpHeaders;
    }
}
