package org.apache.flink.table.gateway.rest;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.table.api.internal.StaticResultProvider;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointUtils;
import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.class */
public class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewayStatementITCase {
    private static final Logger LOG = LoggerFactory.getLogger(SqlGatewayRestEndpointStatementITCase.class);

    @Order(3)
    @RegisterExtension
    private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION;
    private static TestingRestClient restClient;
    private static final ExecuteStatementHeaders executeStatementHeaders;
    private static SessionMessageParameters sessionMessageParameters;
    private static final FetchResultsHeaders fetchResultsHeaders;
    private static final int OPERATION_WAIT_SECONDS = 100;
    private static final String PATTERN1 = "Caused by: ";
    private static final String PATTERN2 = "\tat ";
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).build();
    private SessionHandle sessionHandle;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase$RestTestParameters.class */
    public static class RestTestParameters extends AbstractSqlGatewayStatementITCase.TestParameters {
        private final RowFormat rowFormat;

        public RestTestParameters(String str, RowFormat rowFormat) {
            super(str);
            this.rowFormat = rowFormat;
        }

        public RowFormat getRowFormat() {
            return this.rowFormat;
        }

        @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase.TestParameters
        public String toString() {
            return "RestTestParameters{sqlPath='" + this.sqlPath + "', rowFormat=" + this.rowFormat + '}';
        }
    }

    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase$RowDataIterator.class */
    private class RowDataIterator implements Iterator<RowData> {
        private final SessionHandle sessionHandle;
        private final OperationHandle operationHandle;
        private Long token = 0L;
        private Iterator<RowData> fetchedRows = Collections.emptyIterator();

        public RowDataIterator(SessionHandle sessionHandle, OperationHandle operationHandle) throws Exception {
            this.sessionHandle = sessionHandle;
            this.operationHandle = operationHandle;
            fetch();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (this.token != null && !this.fetchedRows.hasNext()) {
                try {
                    fetch();
                } catch (Exception e) {
                    SqlGatewayRestEndpointStatementITCase.LOG.error("Failed to fetch results.", e);
                }
            }
            return this.fetchedRows.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public RowData next() {
            return this.fetchedRows.next();
        }

        private void fetch() throws Exception {
            FetchResultsResponseBody fetchResults = SqlGatewayRestEndpointStatementITCase.this.fetchResults(this.sessionHandle, this.operationHandle, this.token);
            this.token = SqlGatewayRestEndpointUtils.parseToken(fetchResults.getNextResultUri());
            this.fetchedRows = fetchResults.getResults().getData().iterator();
        }
    }

    @BeforeAll
    static void setup() throws Exception {
        restClient = TestingRestClient.getTestingRestClient();
    }

    @AfterAll
    static void cleanUp() throws Exception {
        restClient.shutdown();
    }

    @Parameters(name = "parameters={0}")
    public static List<AbstractSqlGatewayStatementITCase.TestParameters> parameters() throws Exception {
        return (List) listFlinkSqlTests().stream().flatMap(str -> {
            return Stream.of((Object[]) new RestTestParameters[]{new RestTestParameters(str, RowFormat.JSON), new RestTestParameters(str, RowFormat.PLAIN_TEXT)});
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    @BeforeEach
    public void before(@TempDir Path path) throws Exception {
        super.before(path);
        this.sessionHandle = service.openSession(this.defaultSessionEnvironment);
        sessionMessageParameters = new SessionMessageParameters(this.sessionHandle);
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    protected String runSingleStatement(String str) throws Exception {
        String operationHandle = ((ExecuteStatementResponseBody) restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), executeStatementHeaders, sessionMessageParameters, new ExecuteStatementRequestBody(str, 0L, new HashMap())).get()).getOperationHandle();
        Assertions.assertNotNull(operationHandle);
        OperationHandle operationHandle2 = new OperationHandle(UUID.fromString(operationHandle));
        Assertions.assertDoesNotThrow(() -> {
            return SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(this.sessionHandle).getOperationManager().getOperation(operationHandle2);
        });
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(SQL_GATEWAY_SERVICE_EXTENSION.getService().getOperationInfo(this.sessionHandle, operationHandle2).getStatus().isTerminalStatus());
        }, Duration.ofSeconds(100L), "Failed to wait operation finish.");
        FetchResultsResponseBody fetchResults = fetchResults(this.sessionHandle, operationHandle2, 0L);
        ResultInfo results = fetchResults.getResults();
        org.assertj.core.api.Assertions.assertThat(results).isNotNull();
        org.assertj.core.api.Assertions.assertThat(Arrays.asList(ResultSet.ResultType.PAYLOAD, ResultSet.ResultType.EOS)).contains(new ResultSet.ResultType[]{fetchResults.getResultType()});
        ResolvedSchema resultSchema = results.getResultSchema();
        return toString(AbstractSqlGatewayStatementITCase.StatementType.match(str), resultSchema, ((RestTestParameters) this.parameters).getRowFormat() == RowFormat.JSON ? new RowDataToStringConverterImpl(resultSchema.toPhysicalRowDataType(), DateTimeUtils.UTC_ZONE.toZoneId(), SqlGatewayRestEndpointStatementITCase.class.getClassLoader(), false, new CodeGeneratorContext(new Configuration(), SqlGatewayRestEndpointStatementITCase.class.getClassLoader())) : StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER, new RowDataIterator(this.sessionHandle, operationHandle2));
    }

    FetchResultsResponseBody fetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, Long l) throws Exception {
        return (FetchResultsResponseBody) restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), fetchResultsHeaders, new FetchResultsMessageParameters(sessionHandle, operationHandle, l, ((RestTestParameters) this.parameters).getRowFormat()), EmptyRequestBody.getInstance()).get();
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    protected String stringifyException(Throwable th) {
        if (StringUtils.isNullOrWhitespaceOnly(th.getMessage())) {
            return th.getClass().getCanonicalName();
        }
        String[] split = th.getMessage().split(PATTERN1);
        return split[split.length - 1].split(PATTERN2)[0];
    }

    @Override // org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase
    protected boolean isStreaming() {
        return ((RuntimeExecutionMode) Configuration.fromMap(service.getSessionConfig(this.sessionHandle)).get(ExecutionOptions.RUNTIME_MODE)).equals(RuntimeExecutionMode.STREAMING);
    }

    static {
        SqlGatewayServiceExtension sqlGatewayServiceExtension = SQL_GATEWAY_SERVICE_EXTENSION;
        sqlGatewayServiceExtension.getClass();
        SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(sqlGatewayServiceExtension::getService);
        executeStatementHeaders = ExecuteStatementHeaders.getInstance();
        fetchResultsHeaders = FetchResultsHeaders.getDefaultInstance();
    }
}
