package org.apache.flink.client.program.rest;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest.class */
public class RestClusterClientTest extends TestLogger {
    private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;
    private RestServerEndpointConfiguration restServerEndpointConfiguration;
    private ExecutorService executor;
    private JobGraph jobGraph;
    private JobID jobId;
    private static final Configuration restConfig;
    private final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build();
    private volatile FailHttpRequestPredicate failHttpRequest = FailHttpRequestPredicate.never();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.client.program.rest.RestClusterClientTest$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$rest$messages$TerminationModeQueryParameter$TerminationMode = new int[TerminationModeQueryParameter.TerminationMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$rest$messages$TerminationModeQueryParameter$TerminationMode[TerminationModeQueryParameter.TerminationMode.CANCEL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rest$messages$TerminationModeQueryParameter$TerminationMode[TerminationModeQueryParameter.TerminationMode.STOP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$FailHttpRequestPredicate.class */
    private interface FailHttpRequestPredicate {
        boolean test(MessageHeaders<?, ?, ?> messageHeaders, MessageParameters messageParameters, RequestBody requestBody);

        static FailHttpRequestPredicate never() {
            return (messageHeaders, messageParameters, requestBody) -> {
                return false;
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$PingRestHandler.class */
    private class PingRestHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private final Queue<CompletableFuture<EmptyResponseBody>> responseQueue;

        private PingRestHandler(CompletableFuture<EmptyResponseBody>... completableFutureArr) {
            super(PingRestHandlerHeaders.INSTANCE);
            this.responseQueue = new ArrayDeque(Arrays.asList(completableFutureArr));
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            CompletableFuture<EmptyResponseBody> poll = this.responseQueue.poll();
            return poll != null ? poll : CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<EmptyRequestBody, EmptyMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$PingRestHandlerHeaders.class */
    public static final class PingRestHandlerHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        static final PingRestHandlerHeaders INSTANCE = new PingRestHandlerHeaders();

        private PingRestHandlerHeaders() {
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "foobar";
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        /* renamed from: getUnresolvedMessageParameters, reason: merged with bridge method [inline-methods] */
        public EmptyMessageParameters m8getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/foobar";
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$SubmissionFailingHandler.class */
    private final class SubmissionFailingHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
        private SubmissionFailingHandler() {
            super(JobSubmitHeaders.getInstance());
        }

        protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            throw new RestHandlerException("expected", HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestAccumulatorHandler.class */
    private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
        public TestAccumulatorHandler() {
            super(JobAccumulatorsHeaders.getInstance());
        }

        protected CompletableFuture<JobAccumulatorsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            JobAccumulatorsInfo jobAccumulatorsInfo;
            List queryParameter = handlerRequest.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
            boolean booleanValue = !queryParameter.isEmpty() ? ((Boolean) queryParameter.get(0)).booleanValue() : false;
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(new JobAccumulatorsInfo.UserTaskAccumulator("testName", "testType", "testValue"));
            if (booleanValue) {
                HashMap hashMap = new HashMap(1);
                try {
                    hashMap.put("testKey", new SerializedValue(OptionalFailure.of("testValue")));
                    jobAccumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), arrayList, hashMap);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } else {
                jobAccumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), arrayList, Collections.emptyMap());
            }
            return CompletableFuture.completedFuture(jobAccumulatorsInfo);
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestHandler.class */
    private abstract class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
        private TestHandler(MessageHeaders<R, P, M> messageHeaders) {
            super(RestClusterClientTest.this.mockGatewayRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), messageHeaders);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestJobExecutionResultHandler.class */
    private class TestJobExecutionResultHandler extends TestHandler<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
        private final Iterator<Object> jobExecutionResults;
        private Object lastJobExecutionResult;

        private TestJobExecutionResultHandler(Object... objArr) {
            super(JobExecutionResultHeaders.getInstance());
            Preconditions.checkArgument(Arrays.stream(objArr).allMatch(obj -> {
                return (obj instanceof JobExecutionResultResponseBody) || (obj instanceof RestHandlerException);
            }));
            this.jobExecutionResults = Arrays.asList(objArr).iterator();
        }

        protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            if (this.jobExecutionResults.hasNext()) {
                this.lastJobExecutionResult = this.jobExecutionResults.next();
            }
            Preconditions.checkState(this.lastJobExecutionResult != null);
            if (this.lastJobExecutionResult instanceof JobExecutionResultResponseBody) {
                return CompletableFuture.completedFuture((JobExecutionResultResponseBody) this.lastJobExecutionResult);
            }
            if (this.lastJobExecutionResult instanceof RestHandlerException) {
                return FutureUtils.completedExceptionally((RestHandlerException) this.lastJobExecutionResult);
            }
            throw new AssertionError();
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<EmptyRequestBody, JobMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestJobSubmitHandler.class */
    private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
        private volatile boolean jobSubmitted;

        private TestJobSubmitHandler() {
            super(JobSubmitHeaders.getInstance());
            this.jobSubmitted = false;
        }

        protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            this.jobSubmitted = true;
            return CompletableFuture.completedFuture(new JobSubmitResponseBody("/url"));
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestJobTerminationHandler.class */
    private class TestJobTerminationHandler extends TestHandler<EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
        private volatile boolean jobCanceled;
        private volatile boolean jobStopped;

        private TestJobTerminationHandler() {
            super(JobTerminationHeaders.getInstance());
            this.jobCanceled = false;
            this.jobStopped = false;
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobTerminationMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            switch (AnonymousClass2.$SwitchMap$org$apache$flink$runtime$rest$messages$TerminationModeQueryParameter$TerminationMode[((TerminationModeQueryParameter.TerminationMode) handlerRequest.getQueryParameter(TerminationModeQueryParameter.class).get(0)).ordinal()]) {
                case 1:
                    this.jobCanceled = true;
                    break;
                case 2:
                    this.jobStopped = true;
                    break;
            }
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<EmptyRequestBody, JobTerminationMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestListJobsHandler.class */
    private class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
        private TestListJobsHandler() {
            super(JobsOverviewHeaders.getInstance());
        }

        protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(new MultipleJobsDetails(Arrays.asList(new JobDetails(new JobID(), "job1", 0L, 0L, 0L, JobStatus.RUNNING, 0L, new int[9], 0), new JobDetails(new JobID(), "job2", 0L, 0L, 0L, JobStatus.FINISHED, 0L, new int[9], 0))));
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<EmptyRequestBody, EmptyMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestSavepointDisposalHandlers.class */
    public class TestSavepointDisposalHandlers {
        private final TriggerId triggerId;
        private final String savepointPath;

        /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestSavepointDisposalHandlers$TestSavepointDisposalStatusHandler.class */
        private class TestSavepointDisposalStatusHandler extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<AsynchronousOperationInfo>, SavepointDisposalStatusMessageParameters> {
            private final Queue<OptionalFailure<AsynchronousOperationInfo>> responses;

            private TestSavepointDisposalStatusHandler(OptionalFailure<AsynchronousOperationInfo>... optionalFailureArr) {
                super(SavepointDisposalStatusHeaders.getInstance());
                this.responses = new ArrayDeque(Arrays.asList(optionalFailureArr));
            }

            protected CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
                if (!((TriggerId) handlerRequest.getPathParameter(TriggerIdPathParameter.class)).equals(TestSavepointDisposalHandlers.this.triggerId)) {
                    throw new AssertionError();
                }
                OptionalFailure<AsynchronousOperationInfo> poll = this.responses.poll();
                if (poll == null) {
                    throw new AssertionError();
                }
                if (poll.isFailure()) {
                    throw new RestHandlerException("Failure", HttpResponseStatus.BAD_REQUEST, poll.getFailureCause());
                }
                return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(poll.getUnchecked()));
            }

            protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
                return handleRequest((HandlerRequest<EmptyRequestBody, SavepointDisposalStatusMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
            }
        }

        /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientTest$TestSavepointDisposalHandlers$TestSavepointDisposalTriggerHandler.class */
        private class TestSavepointDisposalTriggerHandler extends TestHandler<SavepointDisposalRequest, TriggerResponse, EmptyMessageParameters> {
            private TestSavepointDisposalTriggerHandler() {
                super(SavepointDisposalTriggerHeaders.getInstance());
            }

            protected CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<SavepointDisposalRequest, EmptyMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) {
                Assert.assertThat(handlerRequest.getRequestBody().getSavepointPath(), Matchers.is(TestSavepointDisposalHandlers.this.savepointPath));
                return CompletableFuture.completedFuture(new TriggerResponse(TestSavepointDisposalHandlers.this.triggerId));
            }

            protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
                return handleRequest((HandlerRequest<SavepointDisposalRequest, EmptyMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
            }
        }

        private TestSavepointDisposalHandlers(String str) {
            this.triggerId = new TriggerId();
            this.savepointPath = (String) Preconditions.checkNotNull(str);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
        this.mockGatewayRetriever = () -> {
            return CompletableFuture.completedFuture(this.mockRestfulGateway);
        };
        this.executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
        this.jobGraph = new JobGraph("testjob");
        this.jobId = this.jobGraph.getJobID();
    }

    @After
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    private RestClusterClient<StandaloneClusterId> createRestClusterClient(int i) throws Exception {
        Configuration configuration = new Configuration(restConfig);
        configuration.setInteger(RestOptions.PORT, i);
        return new RestClusterClient<>(configuration, createRestClient(), StandaloneClusterId.getInstance(), j -> {
            return 0L;
        }, (LeaderRetrievalService) null);
    }

    @Nonnull
    private RestClient createRestClient() throws ConfigurationException {
        return new RestClient(RestClientConfiguration.fromConfiguration(restConfig), this.executor) { // from class: org.apache.flink.client.program.rest.RestClusterClientTest.1
            public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m, U u, R r, Collection<FileUpload> collection) throws IOException {
                return RestClusterClientTest.this.failHttpRequest.test(m, u, r) ? FutureUtils.completedExceptionally(new IOException("expected")) : super.sendRequest(str, i, m, u, r, collection);
            }
        };
    }

    @Test
    public void testJobSubmitCancelStop() throws Exception {
        TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
        TestJobTerminationHandler testJobTerminationHandler = new TestJobTerminationHandler();
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(testJobSubmitHandler, testJobTerminationHandler, new TestJobExecutionResultHandler(new Object[]{JobExecutionResultResponseBody.created(new JobResult.Builder().applicationStatus(ApplicationStatus.SUCCEEDED).jobId(this.jobId).netRuntime(Long.MAX_VALUE).build())}));
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                Assert.assertFalse(testJobSubmitHandler.jobSubmitted);
                createRestClusterClient.submitJob(this.jobGraph, ClassLoader.getSystemClassLoader());
                Assert.assertTrue(testJobSubmitHandler.jobSubmitted);
                Assert.assertFalse(testJobTerminationHandler.jobCanceled);
                createRestClusterClient.cancel(this.jobId);
                Assert.assertTrue(testJobTerminationHandler.jobCanceled);
                Assert.assertFalse(testJobTerminationHandler.jobStopped);
                createRestClusterClient.stop(this.jobId);
                Assert.assertTrue(testJobTerminationHandler.jobStopped);
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testDetachedJobSubmission() throws Exception {
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new TestJobSubmitHandler());
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                createRestClusterClient.setDetached(true);
                JobSubmissionResult submitJob = createRestClusterClient.submitJob(this.jobGraph, ClassLoader.getSystemClassLoader());
                Assert.assertThat(submitJob, Matchers.is(Matchers.not(Matchers.instanceOf(JobExecutionResult.class))));
                Assert.assertThat(submitJob.getJobID(), Matchers.is(this.jobId));
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSubmitJobAndWaitForExecutionResult() throws Exception {
        TestJobExecutionResultHandler testJobExecutionResultHandler = new TestJobExecutionResultHandler(new Object[]{new RestHandlerException("should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE), JobExecutionResultResponseBody.inProgress(), JobExecutionResultResponseBody.created(new JobResult.Builder().applicationStatus(ApplicationStatus.SUCCEEDED).jobId(this.jobId).netRuntime(Long.MAX_VALUE).accumulatorResults(Collections.singletonMap("testName", new SerializedValue(OptionalFailure.of(Double.valueOf(1.0d))))).build()), JobExecutionResultResponseBody.created(new JobResult.Builder().applicationStatus(ApplicationStatus.FAILED).jobId(this.jobId).netRuntime(Long.MAX_VALUE).serializedThrowable(new SerializedThrowable(new RuntimeException("expected"))).build())});
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.failHttpRequest = (messageHeaders, messageParameters, requestBody) -> {
            return (messageHeaders instanceof JobExecutionResultHeaders) && !atomicBoolean.getAndSet(true);
        };
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(testJobExecutionResultHandler, new TestJobSubmitHandler());
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                JobExecutionResult submitJob = createRestClusterClient.submitJob(this.jobGraph, ClassLoader.getSystemClassLoader());
                Assert.assertThat(submitJob.getJobID(), Matchers.equalTo(this.jobId));
                Assert.assertThat(Long.valueOf(submitJob.getNetRuntime()), Matchers.equalTo(Long.MAX_VALUE));
                Assert.assertThat(submitJob.getAllAccumulatorResults(), Matchers.equalTo(Collections.singletonMap("testName", Double.valueOf(1.0d))));
                try {
                    createRestClusterClient.submitJob(this.jobGraph, ClassLoader.getSystemClassLoader());
                    Assert.fail("Expected exception not thrown.");
                } catch (ProgramInvocationException e) {
                    Optional findThrowable = ExceptionUtils.findThrowable(e, RuntimeException.class);
                    Assert.assertThat(Boolean.valueOf(findThrowable.isPresent()), Matchers.is(true));
                    Assert.assertThat(((RuntimeException) findThrowable.get()).getMessage(), Matchers.equalTo("expected"));
                }
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testDisposeSavepoint() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception.");
        TestSavepointDisposalHandlers testSavepointDisposalHandlers = new TestSavepointDisposalHandlers("foobar");
        testSavepointDisposalHandlers.getClass();
        TestSavepointDisposalHandlers.TestSavepointDisposalTriggerHandler testSavepointDisposalTriggerHandler = new TestSavepointDisposalHandlers.TestSavepointDisposalTriggerHandler();
        testSavepointDisposalHandlers.getClass();
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new TestSavepointDisposalHandlers.TestSavepointDisposalStatusHandler(new OptionalFailure[]{OptionalFailure.of(AsynchronousOperationInfo.complete()), OptionalFailure.of(AsynchronousOperationInfo.completeExceptional(new SerializedThrowable(flinkException))), OptionalFailure.ofFailure(flinkException)}), testSavepointDisposalTriggerHandler);
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                Assert.assertThat(createRestClusterClient.disposeSavepoint("foobar").get(), Matchers.is(Acknowledge.get()));
                try {
                    createRestClusterClient.disposeSavepoint("foobar").get();
                    Assert.fail("Expected an exception");
                } catch (ExecutionException e) {
                    Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(e, "Test exception.").isPresent()), Matchers.is(true));
                }
                try {
                    createRestClusterClient.disposeSavepoint("foobar").get();
                    Assert.fail("Expected an exception.");
                } catch (ExecutionException e2) {
                    Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e2, RestClientException.class).isPresent()), Matchers.is(true));
                }
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testListJobs() throws Exception {
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new TestListJobsHandler());
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                Iterator it = ((Collection) createRestClusterClient.listJobs().get()).iterator();
                Assert.assertNotEquals("The job status should not be equal.", ((JobStatusMessage) it.next()).getJobState(), ((JobStatusMessage) it.next()).getJobState());
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testGetAccumulators() throws Exception {
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new TestAccumulatorHandler());
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                Map accumulators = createRestClusterClient.getAccumulators(new JobID());
                Assert.assertNotNull(accumulators);
                Assert.assertEquals(1L, accumulators.size());
                Assert.assertEquals(true, Boolean.valueOf(accumulators.containsKey("testKey")));
                Assert.assertEquals("testValue", ((OptionalFailure) accumulators.get("testKey")).get().toString());
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRESTManualConfigurationOverride() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.setInteger(JobManagerOptions.PORT, 1234);
        configuration.setString(RestOptions.ADDRESS, "localhost");
        configuration.setInteger(RestOptions.PORT, 1234);
        DefaultCLI defaultCLI = new DefaultCLI(configuration);
        CommandLine parseCommandLineOptions = defaultCLI.parseCommandLineOptions(new String[]{"-m", "123.123.123.123:4321"}, false);
        URL url = (URL) defaultCLI.createClusterDescriptor(parseCommandLineOptions).retrieve(defaultCLI.getClusterId(parseCommandLineOptions)).getWebMonitorBaseUrl().get();
        Assert.assertThat(url.getHost(), Matchers.equalTo("123.123.123.123"));
        Assert.assertThat(Integer.valueOf(url.getPort()), Matchers.equalTo(4321));
    }

    @Test
    public void testRetriableSendOperationIfConnectionErrorOrServiceUnavailable() throws Exception {
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new PingRestHandler(new CompletableFuture[]{FutureUtils.completedExceptionally(new RestHandlerException("test exception", HttpResponseStatus.SERVICE_UNAVAILABLE)), CompletableFuture.completedFuture(EmptyResponseBody.getInstance())}));
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                this.failHttpRequest = (messageHeaders, messageParameters, requestBody) -> {
                    return (messageHeaders instanceof PingRestHandlerHeaders) && !atomicBoolean.getAndSet(true);
                };
                createRestClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
                createRestClusterClient.shutdown();
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testJobSubmissionFailureThrowsProgramInvocationException() throws Exception {
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new SubmissionFailingHandler());
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                createRestClusterClient.submitJob(this.jobGraph, ClassLoader.getSystemClassLoader());
                createRestClusterClient.shutdown();
            } catch (ProgramInvocationException e) {
                createRestClusterClient.shutdown();
            } catch (Throwable th2) {
                createRestClusterClient.shutdown();
                throw th2;
            }
            if (createRestServerEndpoint != null) {
                if (0 == 0) {
                    createRestServerEndpoint.close();
                    return;
                }
                try {
                    createRestServerEndpoint.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSendIsNotRetriableIfHttpNotFound() throws Exception {
        TestRestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(new PingRestHandler(new CompletableFuture[]{FutureUtils.completedExceptionally(new RestHandlerException("test exception", HttpResponseStatus.NOT_FOUND))}));
        Throwable th = null;
        try {
            RestClusterClient<StandaloneClusterId> createRestClusterClient = createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort());
            try {
                try {
                    createRestClusterClient.sendRequest(PingRestHandlerHeaders.INSTANCE).get();
                    Assert.fail("The rest request should have failed.");
                    createRestClusterClient.shutdown();
                } catch (Exception e) {
                    Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(e, "test exception").isPresent()), Matchers.is(true));
                    createRestClusterClient.shutdown();
                }
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                createRestClusterClient.shutdown();
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    private TestRestServerEndpoint createRestServerEndpoint(AbstractRestHandler<?, ?, ?, ?>... abstractRestHandlerArr) throws Exception {
        return TestRestServerEndpoint.createAndStartRestServerEndpoint(this.restServerEndpointConfiguration, abstractRestHandlerArr);
    }

    static {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
        configuration.setLong(RestOptions.RETRY_DELAY, 0L);
        configuration.setInteger(RestOptions.PORT, 0);
        restConfig = configuration;
    }
}
