package org.apache.flink.table.gateway.rest;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils;
import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.gateway.rest.util.TestingSqlGatewayRestEndpoint;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.class */
class SqlGatewayRestEndpointITCase {
    private static final SqlGatewayService SERVICE;
    private static SqlGatewayRestEndpoint serverEndpoint;
    private static TestingRestClient restClient;
    private static InetSocketAddress serverAddress;
    private static TestBadCaseHeaders badCaseHeader;
    private static TestBadCaseHandler testHandler;
    private static TestVersionSelectionHeaders0 header0;
    private static TestVersionSelectionHeadersNot0 headerNot0;
    private static TestVersionHandler testVersionHandler0;
    private static TestVersionHandler testVersionHandlerNot0;
    private static Configuration config;
    private static final Time timeout;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestBadCaseHandler.class */
    public static class TestBadCaseHandler extends AbstractSqlGatewayRestHandler<TestRequest, TestResponse, EmptyMessageParameters> {
        private final OneShotLatch closeLatch;
        private CompletableFuture<Void> closeFuture;
        private Function<Integer, CompletableFuture<TestResponse>> handlerBody;

        TestBadCaseHandler(SqlGatewayService sqlGatewayService) {
            super(sqlGatewayService, Collections.emptyMap(), SqlGatewayRestEndpointITCase.badCaseHeader);
            this.closeLatch = new OneShotLatch();
            this.closeFuture = CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Void> closeHandlerAsync() {
            this.closeLatch.trigger();
            return this.closeFuture;
        }

        protected CompletableFuture<TestResponse> handleRequest(@Nullable SqlGatewayRestAPIVersion sqlGatewayRestAPIVersion, @Nonnull HandlerRequest<TestRequest> handlerRequest) {
            int i = ((TestRequest) handlerRequest.getRequestBody()).id;
            return i == 3 ? FutureUtils.completedExceptionally(new EndpointNotStartedException("test exception")) : this.handlerBody.apply(Integer.valueOf(i));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestBadCaseHeaders.class */
    public static class TestBadCaseHeaders implements SqlGatewayMessageHeaders<TestRequest, TestResponse, EmptyMessageParameters> {
        private TestBadCaseHeaders() {
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "/test/";
        }

        public Class<TestRequest> getRequestClass() {
            return TestRequest.class;
        }

        public Class<TestResponse> getResponseClass() {
            return TestResponse.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public EmptyMessageParameters m7getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }
    }

    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestRequest.class */
    private static class TestRequest implements RequestBody {
        public final int id;

        @JsonCreator
        public TestRequest(@JsonProperty("id") int i) {
            this.id = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestResponse.class */
    public static class TestResponse implements ResponseBody {
        private final String status;

        @JsonCreator
        public TestResponse(@JsonProperty("status") String str) {
            this.status = str;
        }

        public String getStatus() {
            return this.status;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestVersionHandler.class */
    public static class TestVersionHandler extends AbstractSqlGatewayRestHandler<EmptyRequestBody, TestResponse, EmptyMessageParameters> {
        static final /* synthetic */ boolean $assertionsDisabled;

        TestVersionHandler(SqlGatewayService sqlGatewayService, TestVersionSelectionHeadersBase testVersionSelectionHeadersBase) {
            super(sqlGatewayService, Collections.emptyMap(), testVersionSelectionHeadersBase);
        }

        protected CompletableFuture<TestResponse> handleRequest(@Nullable SqlGatewayRestAPIVersion sqlGatewayRestAPIVersion, @Nonnull HandlerRequest<EmptyRequestBody> handlerRequest) {
            if ($assertionsDisabled || sqlGatewayRestAPIVersion != null) {
                return CompletableFuture.completedFuture(new TestResponse(sqlGatewayRestAPIVersion.name()));
            }
            throw new AssertionError();
        }

        static {
            $assertionsDisabled = !SqlGatewayRestEndpointITCase.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestVersionSelectionHeaders0.class */
    public static class TestVersionSelectionHeaders0 extends TestVersionSelectionHeadersBase {
        private TestVersionSelectionHeaders0() {
            super();
        }

        public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(SqlGatewayRestAPIVersion.V0);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestVersionSelectionHeadersBase.class */
    private static class TestVersionSelectionHeadersBase implements SqlGatewayMessageHeaders<EmptyRequestBody, TestResponse, EmptyMessageParameters> {
        private TestVersionSelectionHeadersBase() {
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/test/select-version";
        }

        public Class<TestResponse> getResponseClass() {
            return TestResponse.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return null;
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public EmptyMessageParameters m9getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase$TestVersionSelectionHeadersNot0.class */
    public static class TestVersionSelectionHeadersNot0 extends TestVersionSelectionHeadersBase {
        private TestVersionSelectionHeadersNot0() {
            super();
        }

        public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() {
            ArrayList arrayList = new ArrayList(Arrays.asList(SqlGatewayRestAPIVersion.values()));
            arrayList.remove(SqlGatewayRestAPIVersion.V0);
            return arrayList;
        }
    }

    SqlGatewayRestEndpointITCase() {
    }

    @BeforeEach
    void setup() throws Exception {
        header0 = new TestVersionSelectionHeaders0();
        headerNot0 = new TestVersionSelectionHeadersNot0();
        testVersionHandler0 = new TestVersionHandler(SERVICE, header0);
        testVersionHandlerNot0 = new TestVersionHandler(SERVICE, headerNot0);
        badCaseHeader = new TestBadCaseHeaders();
        testHandler = new TestBadCaseHandler(SERVICE);
        String hostAddress = InetAddress.getLoopbackAddress().getHostAddress();
        config = SqlGatewayRestEndpointTestUtils.getBaseConfig(SqlGatewayRestEndpointTestUtils.getFlinkConfig(hostAddress, hostAddress, "0"));
        serverEndpoint = TestingSqlGatewayRestEndpoint.builder(config, SERVICE).withHandler(badCaseHeader, testHandler).withHandler(header0, testVersionHandler0).withHandler(headerNot0, testVersionHandlerNot0).buildAndStart();
        restClient = TestingRestClient.getTestingRestClient();
        serverAddress = serverEndpoint.getServerAddress();
    }

    @AfterEach
    void stop() throws Exception {
        if (restClient != null) {
            restClient.shutdown();
            restClient = null;
        }
        if (serverEndpoint != null) {
            serverEndpoint.stop();
            serverEndpoint = null;
        }
    }

    @Test
    void testSqlGatewayMessageHeaders() throws Exception {
        Assertions.assertThatThrownBy(() -> {
            restClient.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), headerNot0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), SqlGatewayRestAPIVersion.V0);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(IllegalArgumentException.class, String.format("The requested version V0 is not supported by the request (method=%s URL=%s). Supported versions are: %s.", headerNot0.getHttpMethod(), headerNot0.getTargetRestEndpointURL(), headerNot0.getSupportedAPIVersions().stream().map((v0) -> {
            return v0.getURLVersionPrefix();
        }).collect(Collectors.joining(","))))});
        Assertions.assertThat(((TestResponse) restClient.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), header0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), SqlGatewayRestAPIVersion.V0).get(timeout.getSize(), timeout.getUnit())).getStatus()).isEqualTo("V0");
        Assertions.assertThat(((TestResponse) restClient.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), header0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList()).get(timeout.getSize(), timeout.getUnit())).getStatus()).isEqualTo("V0");
        Assertions.assertThat(((TestResponse) restClient.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), headerNot0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList()).get(timeout.getSize(), timeout.getUnit())).getStatus()).isEqualTo(RestAPIVersion.getLatestVersion(headerNot0.getSupportedAPIVersions()).name());
    }

    @Test
    void testVersionSelection() throws Exception {
        for (RestAPIVersion restAPIVersion : SqlGatewayRestAPIVersion.values()) {
            if (restAPIVersion != SqlGatewayRestAPIVersion.V0) {
                Assertions.assertThat(((TestResponse) restClient.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), headerNot0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), restAPIVersion).get(timeout.getSize(), timeout.getUnit())).getStatus()).isEqualTo(restAPIVersion.name());
            }
        }
    }

    @Test
    void testDefaultVersionRouting() throws Exception {
        Assertions.assertThat(config.getBoolean(SecurityOptions.SSL_REST_ENABLED)).isFalse();
        Response execute = new OkHttpClient().newCall(new Request.Builder().url(serverEndpoint.getRestBaseUrl() + header0.getTargetRestEndpointURL()).build()).execute();
        if (!$assertionsDisabled && execute.body() == null) {
            throw new AssertionError();
        }
        Assertions.assertThat(execute.body().string()).contains(new CharSequence[]{SqlGatewayRestAPIVersion.getDefaultVersion().name()});
    }

    @Test
    void testRequestInterleaving() throws Exception {
        BlockerSync blockerSync = new BlockerSync();
        testHandler.handlerBody = num -> {
            if (num.intValue() == 1) {
                try {
                    blockerSync.block();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return CompletableFuture.completedFuture(new TestResponse(num.toString()));
        };
        CompletableFuture<TestResponse> sendRequestToTestHandler = sendRequestToTestHandler(new TestRequest(1));
        blockerSync.awaitBlocker();
        Assertions.assertThat(sendRequestToTestHandler(new TestRequest(2)).get().getStatus()).isEqualTo("2");
        blockerSync.releaseBlocker();
        Assertions.assertThat(sendRequestToTestHandler.get().getStatus()).isEqualTo("1");
    }

    @Test
    void testDuplicateHandlerRegistrationIsForbidden() {
        Assertions.assertThatThrownBy(() -> {
            TestingSqlGatewayRestEndpoint build = TestingSqlGatewayRestEndpoint.builder(config, SERVICE).withHandler(header0, testHandler).withHandler(badCaseHeader, testHandler).build();
            Throwable th = null;
            try {
                build.start();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        build.close();
                    }
                }
                throw th3;
            }
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, "Duplicate REST handler instance found. Please ensure each instance is registered only once.")});
    }

    @Test
    void testHandlerRegistrationOverlappingIsForbidden() {
        Assertions.assertThatThrownBy(() -> {
            TestingSqlGatewayRestEndpoint build = TestingSqlGatewayRestEndpoint.builder(config, SERVICE).withHandler(badCaseHeader, testHandler).withHandler(badCaseHeader, testVersionHandler0).build();
            Throwable th = null;
            try {
                build.start();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        build.close();
                    }
                }
                throw th3;
            }
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, "REST handler registration overlaps with another registration for")});
    }

    @Test
    void testShouldWaitForHandlersWhenClosing() throws Exception {
        testHandler.closeFuture = new CompletableFuture();
        BlockerSync blockerSync = new BlockerSync();
        testHandler.handlerBody = num -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    blockerSync.block();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return new TestResponse(num.toString());
            });
        };
        CompletableFuture closeAsync = serverEndpoint.closeAsync();
        Assertions.assertThat(closeAsync).isNotDone();
        CompletableFuture<TestResponse> sendRequestToTestHandler = sendRequestToTestHandler(new TestRequest(1));
        blockerSync.awaitBlocker();
        testHandler.closeFuture.complete(null);
        Assertions.assertThat(closeAsync).isNotDone();
        blockerSync.releaseBlocker();
        sendRequestToTestHandler.get(timeout.getSize(), timeout.getUnit());
        closeAsync.get(timeout.getSize(), timeout.getUnit());
    }

    @Test
    void testOnUnavailableRpcEndpointReturns503() {
        CompletableFuture<TestResponse> sendRequestToTestHandler = sendRequestToTestHandler(new TestRequest(3));
        sendRequestToTestHandler.getClass();
        Assertions.assertThatThrownBy(sendRequestToTestHandler::get).extracting(th -> {
            return ExceptionUtils.findThrowable(th, RestClientException.class);
        }).extracting((v0) -> {
            return v0.get();
        }).extracting((v0) -> {
            return v0.getHttpResponseStatus();
        }).isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE);
    }

    private CompletableFuture<TestResponse> sendRequestToTestHandler(TestRequest testRequest) {
        try {
            return restClient.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), badCaseHeader, EmptyMessageParameters.getInstance(), testRequest);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static {
        $assertionsDisabled = !SqlGatewayRestEndpointITCase.class.desiredAssertionStatus();
        SERVICE = null;
        timeout = Time.seconds(10L);
    }
}
