package org.apache.flink.runtime.rest;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameter;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase.class */
public class RestEndpointITCase extends TestLogger {
    private static final String JOB_ID_KEY = "jobid";
    private RestServerEndpoint serverEndpoint;
    private RestClient clientEndpoint;
    private static final JobID PATH_JOB_ID = new JobID();
    private static final JobID QUERY_JOB_ID = new JobID();
    private static final Time timeout = Time.seconds(10);

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$FaultyJobIDPathParameter.class */
    static class FaultyJobIDPathParameter extends MessagePathParameter<JobID> {
        FaultyJobIDPathParameter() {
            super(RestEndpointITCase.JOB_ID_KEY);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: convertFromString, reason: merged with bridge method [inline-methods] */
        public JobID m309convertFromString(String str) throws ConversionException {
            return JobID.fromHexString(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String convertToString(JobID jobID) {
            return "foobar";
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$FaultyTestParameters.class */
    private static class FaultyTestParameters extends TestParameters {
        private final FaultyJobIDPathParameter faultyJobIDPathParameter;

        private FaultyTestParameters() {
            super();
            this.faultyJobIDPathParameter = new FaultyJobIDPathParameter();
        }

        @Override // org.apache.flink.runtime.rest.RestEndpointITCase.TestParameters
        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.faultyJobIDPathParameter);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$JobIDPathParameter.class */
    static class JobIDPathParameter extends MessagePathParameter<JobID> {
        JobIDPathParameter() {
            super(RestEndpointITCase.JOB_ID_KEY);
        }

        /* renamed from: convertFromString, reason: merged with bridge method [inline-methods] */
        public JobID m310convertFromString(String str) {
            return JobID.fromHexString(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String convertToString(JobID jobID) {
            return jobID.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$JobIDQueryParameter.class */
    static class JobIDQueryParameter extends MessageQueryParameter<JobID> {
        JobIDQueryParameter() {
            super(RestEndpointITCase.JOB_ID_KEY, MessageParameter.MessageParameterRequisiteness.MANDATORY);
        }

        /* renamed from: convertValueFromString, reason: merged with bridge method [inline-methods] */
        public JobID m311convertValueFromString(String str) {
            return JobID.fromHexString(str);
        }

        public String convertStringToValue(JobID jobID) {
            return jobID.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestHandler.class */
    private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
        public static final Object LOCK = new Object();

        TestHandler(CompletableFuture<String> completableFuture, GatewayRetriever<RestfulGateway> gatewayRetriever, Time time) {
            super(completableFuture, gatewayRetriever, time, new TestHeaders());
        }

        protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> handlerRequest, RestfulGateway restfulGateway) throws RestHandlerException {
            Assert.assertEquals(handlerRequest.getPathParameter(JobIDPathParameter.class), RestEndpointITCase.PATH_JOB_ID);
            Assert.assertEquals(handlerRequest.getQueryParameter(JobIDQueryParameter.class).get(0), RestEndpointITCase.QUERY_JOB_ID);
            if (((TestRequest) handlerRequest.getRequestBody()).id == 1) {
                synchronized (LOCK) {
                    try {
                        LOCK.notifyAll();
                        LOCK.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
            return CompletableFuture.completedFuture(new TestResponse(((TestRequest) handlerRequest.getRequestBody()).id));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestHeaders.class */
    private static class TestHeaders implements MessageHeaders<TestRequest, TestResponse, TestParameters> {
        private TestHeaders() {
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "/test/:jobid";
        }

        public Class<TestRequest> getRequestClass() {
            return TestRequest.class;
        }

        public Class<TestResponse> getResponseClass() {
            return TestResponse.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public TestParameters m313getUnresolvedMessageParameters() {
            return new TestParameters();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestParameters.class */
    public static class TestParameters extends MessageParameters {
        private final JobIDPathParameter jobIDPathParameter;
        private final JobIDQueryParameter jobIDQueryParameter;

        private TestParameters() {
            this.jobIDPathParameter = new JobIDPathParameter();
            this.jobIDQueryParameter = new JobIDQueryParameter();
        }

        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.jobIDPathParameter);
        }

        public Collection<MessageQueryParameter<?>> getQueryParameters() {
            return Collections.singleton(this.jobIDQueryParameter);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestRequest.class */
    private static class TestRequest implements RequestBody {
        public final int id;

        @JsonCreator
        public TestRequest(@JsonProperty("id") int i) {
            this.id = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestResponse.class */
    private static class TestResponse implements ResponseBody {
        public final int id;

        @JsonCreator
        public TestResponse(@JsonProperty("id") int i) {
            this.id = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestRestClient.class */
    private static class TestRestClient extends RestClient {
        TestRestClient(RestClientConfiguration restClientConfiguration) {
            super(restClientConfiguration, TestingUtils.defaultExecutor());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestEndpointITCase$TestRestServerEndpoint.class */
    private static class TestRestServerEndpoint extends RestServerEndpoint {
        private final TestHandler testHandler;

        TestRestServerEndpoint(RestServerEndpointConfiguration restServerEndpointConfiguration, TestHandler testHandler) {
            super(restServerEndpointConfiguration);
            this.testHandler = (TestHandler) Preconditions.checkNotNull(testHandler);
        }

        protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> completableFuture) {
            return Collections.singleton(Tuple2.of(new TestHeaders(), this.testHandler));
        }
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        RestServerEndpointConfiguration fromConfiguration = RestServerEndpointConfiguration.fromConfiguration(configuration);
        RestClientConfiguration fromConfiguration2 = RestClientConfiguration.fromConfiguration(configuration);
        RestfulGateway restfulGateway = (RestfulGateway) Mockito.mock(RestfulGateway.class);
        Mockito.when(restfulGateway.requestRestAddress((Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture("http://localhost:1234"));
        GatewayRetriever gatewayRetriever = (GatewayRetriever) Mockito.mock(GatewayRetriever.class);
        Mockito.when(gatewayRetriever.getNow()).thenReturn(Optional.of(restfulGateway));
        this.serverEndpoint = new TestRestServerEndpoint(fromConfiguration, new TestHandler(CompletableFuture.completedFuture("http://localhost:1234"), gatewayRetriever, RpcUtils.INF_TIMEOUT));
        this.clientEndpoint = new TestRestClient(fromConfiguration2);
        this.serverEndpoint.start();
    }

    @After
    public void teardown() {
        if (this.clientEndpoint != null) {
            this.clientEndpoint.shutdown(timeout);
            this.clientEndpoint = null;
        }
        if (this.serverEndpoint != null) {
            this.serverEndpoint.shutdown(timeout);
            this.serverEndpoint = null;
        }
    }

    @Test
    public void testRequestInterleaving() throws Exception {
        CompletableFuture sendRequest;
        TestParameters testParameters = new TestParameters();
        testParameters.jobIDPathParameter.resolve(PATH_JOB_ID);
        testParameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
        InetSocketAddress serverAddress = this.serverEndpoint.getServerAddress();
        synchronized (TestHandler.LOCK) {
            sendRequest = this.clientEndpoint.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), new TestHeaders(), testParameters, new TestRequest(1));
            TestHandler.LOCK.wait();
        }
        Assert.assertEquals(2L, ((TestResponse) this.clientEndpoint.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), new TestHeaders(), testParameters, new TestRequest(2)).get()).id);
        synchronized (TestHandler.LOCK) {
            TestHandler.LOCK.notifyAll();
        }
        Assert.assertEquals(1L, ((TestResponse) sendRequest.get()).id);
    }

    @Test
    public void testBadHandlerRequest() throws Exception {
        InetSocketAddress serverAddress = this.serverEndpoint.getServerAddress();
        FaultyTestParameters faultyTestParameters = new FaultyTestParameters();
        faultyTestParameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
        ((TestParameters) faultyTestParameters).jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
        try {
            this.clientEndpoint.sendRequest(serverAddress.getHostName(), serverAddress.getPort(), new TestHeaders(), faultyTestParameters, new TestRequest(2)).get();
            Assert.fail("The request should fail with a bad request return code.");
        } catch (ExecutionException e) {
            RestClientException stripExecutionException = ExceptionUtils.stripExecutionException(e);
            Assert.assertTrue(stripExecutionException instanceof RestClientException);
            Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, stripExecutionException.getHttpResponseStatus());
        }
    }
}
