package org.apache.flink.client.program.rest;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.NewClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.CheckedSupplier;

/* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClient.class */
public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {
    private final RestClusterClientConfiguration restClusterClientConfiguration;
    private final RestClient restClient;
    private final ExecutorService executorService;
    private final WaitStrategy waitStrategy;
    private final T clusterId;
    private final LeaderRetrievalService webMonitorRetrievalService;
    private final LeaderRetrievalService dispatcherRetrievalService;
    private final LeaderRetriever webMonitorLeaderRetriever;
    private final LeaderRetriever dispatcherLeaderRetriever;
    private ScheduledExecutorService retryExecutorService;

    public RestClusterClient(Configuration configuration, T t) throws Exception {
        this(configuration, null, t, new ExponentialWaitStrategy(10L, 2000L), null);
    }

    public RestClusterClient(Configuration configuration, T t, LeaderRetrievalService leaderRetrievalService) throws Exception {
        this(configuration, null, t, new ExponentialWaitStrategy(10L, 2000L), leaderRetrievalService);
    }

    @VisibleForTesting
    RestClusterClient(Configuration configuration, @Nullable RestClient restClient, T t, WaitStrategy waitStrategy, @Nullable LeaderRetrievalService leaderRetrievalService) throws Exception {
        super(configuration);
        this.executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
        this.webMonitorLeaderRetriever = new LeaderRetriever();
        this.dispatcherLeaderRetriever = new LeaderRetriever();
        this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
        if (restClient != null) {
            this.restClient = restClient;
        } else {
            this.restClient = new RestClient(this.restClusterClientConfiguration.getRestClientConfiguration(), this.executorService);
        }
        this.waitStrategy = (WaitStrategy) Preconditions.checkNotNull(waitStrategy);
        this.clusterId = (T) Preconditions.checkNotNull(t);
        if (leaderRetrievalService == null) {
            this.webMonitorRetrievalService = this.highAvailabilityServices.getWebMonitorLeaderRetriever();
        } else {
            this.webMonitorRetrievalService = leaderRetrievalService;
        }
        this.dispatcherRetrievalService = this.highAvailabilityServices.getDispatcherLeaderRetriever();
        this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
        startLeaderRetrievers();
    }

    private void startLeaderRetrievers() throws Exception {
        this.webMonitorRetrievalService.start(this.webMonitorLeaderRetriever);
        this.dispatcherRetrievalService.start(this.dispatcherLeaderRetriever);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void shutdown() {
        ExecutorUtils.gracefulShutdown(this.restClusterClientConfiguration.getRetryDelay(), TimeUnit.MILLISECONDS, this.retryExecutorService);
        this.restClient.shutdown(Time.seconds(5L));
        ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, this.executorService);
        try {
            this.webMonitorRetrievalService.stop();
        } catch (Exception e) {
            this.log.error("An error occurred during stopping the webMonitorRetrievalService", (Throwable) e);
        }
        try {
            this.dispatcherRetrievalService.stop();
        } catch (Exception e2) {
            this.log.error("An error occurred during stopping the dispatcherLeaderRetriever", (Throwable) e2);
        }
        try {
            super.shutdown();
        } catch (Exception e3) {
            this.log.error("An error occurred during the client shutdown.", (Throwable) e3);
        }
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        this.log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), Boolean.valueOf(isDetached()));
        CompletableFuture<JobSubmissionResult> submitJob = submitJob(jobGraph);
        if (isDetached()) {
            try {
                return submitJob.get();
            } catch (Exception e) {
                throw new ProgramInvocationException("Could not submit job " + jobGraph.getJobID() + '.', ExceptionUtils.stripExecutionException(e));
            }
        }
        try {
            try {
                this.lastJobExecutionResult = ((JobResult) submitJob.thenCompose(jobSubmissionResult -> {
                    return requestJobResult(jobGraph.getJobID());
                }).get()).toJobExecutionResult(classLoader);
                return this.lastJobExecutionResult;
            } catch (IOException | ClassNotFoundException e2) {
                throw new ProgramInvocationException(e2);
            } catch (JobExecutionException e3) {
                throw new ProgramInvocationException(String.format("Job %s failed.", jobGraph.getJobID()), e3);
            }
        } catch (Exception e4) {
            throw new ProgramInvocationException("Could not retrieve the execution result.", ExceptionUtils.stripExecutionException(e4));
        }
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<JobStatus> getJobStatus(JobID jobID) {
        JobDetailsHeaders jobDetailsHeaders = JobDetailsHeaders.getInstance();
        JobMessageParameters jobMessageParameters = new JobMessageParameters();
        jobMessageParameters.jobPathParameter.resolve(jobID);
        return sendRequest((RestClusterClient<T>) jobDetailsHeaders, (JobDetailsHeaders) jobMessageParameters).thenApply((Function<? super P, ? extends U>) (v0) -> {
            return v0.getJobStatus();
        });
    }

    @Override // org.apache.flink.client.program.NewClusterClient
    public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobID) {
        return pollResourceAsync(() -> {
            JobMessageParameters jobMessageParameters = new JobMessageParameters();
            jobMessageParameters.jobPathParameter.resolve(jobID);
            return sendRequest((RestClusterClient<T>) JobExecutionResultHeaders.getInstance(), (JobExecutionResultHeaders) jobMessageParameters);
        });
    }

    @Override // org.apache.flink.client.program.NewClusterClient
    public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
        jobGraph.setAllowQueuedScheduling(true);
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            try {
                Path createTempFile = Files.createTempFile("flink-jobgraph", ".bin", new FileAttribute[0]);
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(createTempFile, new OpenOption[0]));
                Throwable th = null;
                try {
                    try {
                        objectOutputStream.writeObject(jobGraph);
                        if (objectOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    objectOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                objectOutputStream.close();
                            }
                        }
                        return createTempFile;
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
            }
        }, this.executorService);
        CompletableFuture thenCompose = supplyAsync.thenApply(path -> {
            ArrayList arrayList = new ArrayList(8);
            ArrayList arrayList2 = new ArrayList(8);
            arrayList2.add(new FileUpload(path, "application/octet-stream"));
            for (org.apache.flink.core.fs.Path path : jobGraph.getUserJars()) {
                arrayList.add(path.getName());
                arrayList2.add(new FileUpload(Paths.get(path.toUri()), RestConstants.CONTENT_TYPE_JAR));
            }
            return Tuple2.of(new JobSubmitRequestBody(path.getFileName().toString(), arrayList), Collections.unmodifiableCollection(arrayList2));
        }).thenCompose(tuple2 -> {
            return sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), (RequestBody) tuple2.f0, (Collection) tuple2.f1, isConnectionProblemOrServiceUnavailable());
        });
        thenCompose.thenCombine((CompletionStage) supplyAsync, (jobSubmitResponseBody, path2) -> {
            return path2;
        }).thenAccept((Consumer) path3 -> {
            try {
                Files.delete(path3);
            } catch (IOException e) {
                this.log.warn("Could not delete temporary file {}.", path3, e);
            }
        });
        return thenCompose.thenApply(jobSubmitResponseBody2 -> {
            return new JobSubmissionResult(jobGraph.getJobID());
        }).exceptionally((Function) th -> {
            throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(th)));
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void stop(JobID jobID) throws Exception {
        JobTerminationMessageParameters jobTerminationMessageParameters = new JobTerminationMessageParameters();
        jobTerminationMessageParameters.jobPathParameter.resolve(jobID);
        jobTerminationMessageParameters.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP));
        sendRequest((RestClusterClient<T>) JobTerminationHeaders.getInstance(), (JobTerminationHeaders) jobTerminationMessageParameters).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void cancel(JobID jobID) throws Exception {
        JobTerminationMessageParameters jobTerminationMessageParameters = new JobTerminationMessageParameters();
        jobTerminationMessageParameters.jobPathParameter.resolve(jobID);
        jobTerminationMessageParameters.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
        sendRequest((RestClusterClient<T>) JobTerminationHeaders.getInstance(), (JobTerminationHeaders) jobTerminationMessageParameters).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public String cancelWithSavepoint(JobID jobID, @Nullable String str) throws Exception {
        return triggerSavepoint(jobID, str, true).get();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<String> triggerSavepoint(JobID jobID, @Nullable String str) {
        return triggerSavepoint(jobID, str, false);
    }

    private CompletableFuture<String> triggerSavepoint(JobID jobID, @Nullable String str, boolean z) {
        SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
        SavepointTriggerMessageParameters unresolvedMessageParameters = savepointTriggerHeaders.getUnresolvedMessageParameters();
        unresolvedMessageParameters.jobID.resolve(jobID);
        return sendRequest(savepointTriggerHeaders, unresolvedMessageParameters, new SavepointTriggerRequestBody(str, z)).thenCompose((Function<? super P, ? extends CompletionStage<U>>) triggerResponse -> {
            return pollSavepointAsync(jobID, triggerResponse.getTriggerId());
        }).thenApply(savepointInfo -> {
            if (savepointInfo.getFailureCause() != null) {
                throw new CompletionException(savepointInfo.getFailureCause());
            }
            return savepointInfo.getLocation();
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public Map<String, OptionalFailure<Object>> getAccumulators(JobID jobID, ClassLoader classLoader) throws Exception {
        JobAccumulatorsHeaders jobAccumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
        JobAccumulatorsMessageParameters unresolvedMessageParameters = jobAccumulatorsHeaders.getUnresolvedMessageParameters();
        unresolvedMessageParameters.jobPathParameter.resolve(jobID);
        unresolvedMessageParameters.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
        CompletableFuture<P> sendRequest = sendRequest((RestClusterClient<T>) jobAccumulatorsHeaders, (JobAccumulatorsHeaders) unresolvedMessageParameters);
        Map<String, OptionalFailure<Object>> emptyMap = Collections.emptyMap();
        try {
            emptyMap = (Map) sendRequest.thenApply((Function<? super P, ? extends U>) jobAccumulatorsInfo -> {
                try {
                    return AccumulatorHelper.deserializeAccumulators(jobAccumulatorsInfo.getSerializedUserAccumulators(), classLoader);
                } catch (Exception e) {
                    throw new CompletionException(new FlinkException(String.format("Deserialization of accumulators for job %s failed.", jobID), e));
                }
            }).get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(e));
        }
        return emptyMap;
    }

    private CompletableFuture<SavepointInfo> pollSavepointAsync(JobID jobID, TriggerId triggerId) {
        return pollResourceAsync(() -> {
            SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
            SavepointStatusMessageParameters unresolvedMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters();
            unresolvedMessageParameters.jobIdPathParameter.resolve(jobID);
            unresolvedMessageParameters.triggerIdPathParameter.resolve(triggerId);
            return sendRequest((RestClusterClient<T>) savepointStatusHeaders, (SavepointStatusHeaders) unresolvedMessageParameters);
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        return sendRequest(JobsOverviewHeaders.getInstance()).thenApply((Function<? super P, ? extends U>) multipleJobsDetails -> {
            return (List) multipleJobsDetails.getJobs().stream().map(jobDetails -> {
                return new JobStatusMessage(jobDetails.getJobId(), jobDetails.getJobName(), jobDetails.getStatus(), jobDetails.getStartTime());
            }).collect(Collectors.toList());
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public T getClusterId() {
        return this.clusterId;
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException {
        return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(this.highAvailabilityServices.getDispatcherLeaderRetriever(), this.timeout);
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<Acknowledge> rescaleJob(JobID jobID, int i) {
        RescalingTriggerHeaders rescalingTriggerHeaders = RescalingTriggerHeaders.getInstance();
        RescalingTriggerMessageParameters unresolvedMessageParameters = rescalingTriggerHeaders.getUnresolvedMessageParameters();
        unresolvedMessageParameters.jobPathParameter.resolve(jobID);
        unresolvedMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(Integer.valueOf(i)));
        return sendRequest((RestClusterClient<T>) rescalingTriggerHeaders, (RescalingTriggerHeaders) unresolvedMessageParameters).thenCompose((Function<? super P, ? extends CompletionStage<U>>) triggerResponse -> {
            TriggerId triggerId = triggerResponse.getTriggerId();
            RescalingStatusHeaders rescalingStatusHeaders = RescalingStatusHeaders.getInstance();
            RescalingStatusMessageParameters unresolvedMessageParameters2 = rescalingStatusHeaders.getUnresolvedMessageParameters();
            unresolvedMessageParameters2.jobPathParameter.resolve(jobID);
            unresolvedMessageParameters2.triggerIdPathParameter.resolve(triggerId);
            return pollResourceAsync(() -> {
                return sendRequest((RestClusterClient<T>) rescalingStatusHeaders, (RescalingStatusHeaders) unresolvedMessageParameters2);
            });
        }).thenApply(asynchronousOperationInfo -> {
            if (asynchronousOperationInfo.getFailureCause() == null) {
                return Acknowledge.get();
            }
            throw new CompletionException(asynchronousOperationInfo.getFailureCause());
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public CompletableFuture<Acknowledge> disposeSavepoint(String str) {
        return sendRequest((RestClusterClient<T>) SavepointDisposalTriggerHeaders.getInstance(), (SavepointDisposalTriggerHeaders) new SavepointDisposalRequest(str)).thenCompose((Function<? super P, ? extends CompletionStage<U>>) triggerResponse -> {
            TriggerId triggerId = triggerResponse.getTriggerId();
            SavepointDisposalStatusHeaders savepointDisposalStatusHeaders = SavepointDisposalStatusHeaders.getInstance();
            SavepointDisposalStatusMessageParameters unresolvedMessageParameters = savepointDisposalStatusHeaders.getUnresolvedMessageParameters();
            unresolvedMessageParameters.triggerIdPathParameter.resolve(triggerId);
            return pollResourceAsync(() -> {
                return sendRequest((RestClusterClient<T>) savepointDisposalStatusHeaders, (SavepointDisposalStatusHeaders) unresolvedMessageParameters);
            });
        }).thenApply(asynchronousOperationInfo -> {
            if (asynchronousOperationInfo.getFailureCause() == null) {
                return Acknowledge.get();
            }
            throw new CompletionException(asynchronousOperationInfo.getFailureCause());
        });
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void shutDownCluster() {
        try {
            sendRequest(ShutdownHeaders.getInstance()).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            this.log.error("Error while shutting down cluster", (Throwable) e2);
        }
    }

    private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(Supplier<CompletableFuture<A>> supplier) {
        return pollResourceAsync(supplier, new CompletableFuture<>(), 0L);
    }

    private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> pollResourceAsync(Supplier<CompletableFuture<A>> supplier, CompletableFuture<R> completableFuture, long j) {
        supplier.get().whenComplete((asynchronouslyCreatedResource, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) {
                completableFuture.complete(asynchronouslyCreatedResource.resource());
            } else {
                this.retryExecutorService.schedule(() -> {
                    pollResourceAsync(supplier, completableFuture, j + 1);
                }, this.waitStrategy.sleepTime(j), TimeUnit.MILLISECONDS);
            }
        });
        return completableFuture;
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public boolean hasUserJarsInClassPath(List<URL> list) {
        return false;
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public void waitForClusterToBeReady() {
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public String getWebInterfaceURL() {
        try {
            return getWebMonitorBaseUrl().get().toString();
        } catch (InterruptedException | ExecutionException e) {
            ExceptionUtils.checkInterrupted(e);
            this.log.warn("Could not retrieve the web interface URL for the cluster.", e);
            return "Unknown address.";
        }
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public GetClusterStatusResponse getClusterStatus() {
        return null;
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public List<String> getNewMessages() {
        return Collections.emptyList();
    }

    @Override // org.apache.flink.client.program.ClusterClient
    public int getMaxSlots() {
        return -1;
    }

    private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u) {
        return sendRequest(m, u, EmptyRequestBody.getInstance());
    }

    private <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, R r) {
        return sendRequest(m, EmptyMessageParameters.getInstance(), r);
    }

    @VisibleForTesting
    <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(M m) {
        return sendRequest(m, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u, R r) {
        return sendRetriableRequest(m, u, r, isConnectionProblemOrServiceUnavailable());
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M m, U u, R r, Predicate<Throwable> predicate) {
        return sendRetriableRequest(m, u, r, Collections.emptyList(), predicate);
    }

    private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M m, U u, R r, Collection<FileUpload> collection, Predicate<Throwable> predicate) {
        return (CompletableFuture<P>) retry(() -> {
            return getWebMonitorBaseUrl().thenCompose(url -> {
                try {
                    return this.restClient.sendRequest(url.getHost(), url.getPort(), m, u, r, collection);
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }, predicate);
    }

    private <C> CompletableFuture<C> retry(CheckedSupplier<CompletableFuture<C>> checkedSupplier, Predicate<Throwable> predicate) {
        return FutureUtils.retryWithDelay(CheckedSupplier.unchecked(checkedSupplier), this.restClusterClientConfiguration.getRetryMaxAttempts(), Time.milliseconds(this.restClusterClientConfiguration.getRetryDelay()), predicate, new ScheduledExecutorServiceAdapter(this.retryExecutorService));
    }

    private static Predicate<Throwable> isConnectionProblemOrServiceUnavailable() {
        return isConnectionProblemException().or(isServiceUnavailable());
    }

    private static Predicate<Throwable> isConnectionProblemException() {
        return th -> {
            return ExceptionUtils.findThrowable(th, ConnectException.class).isPresent() || ExceptionUtils.findThrowable(th, SocketTimeoutException.class).isPresent() || ExceptionUtils.findThrowable(th, ConnectTimeoutException.class).isPresent() || ExceptionUtils.findThrowable(th, IOException.class).isPresent();
        };
    }

    private static Predicate<Throwable> isServiceUnavailable() {
        return httpExceptionCodePredicate(num -> {
            return num.intValue() == HttpResponseStatus.SERVICE_UNAVAILABLE.code();
        });
    }

    private static Predicate<Throwable> httpExceptionCodePredicate(Predicate<Integer> predicate) {
        return th -> {
            return ((Boolean) ExceptionUtils.findThrowable(th, RestClientException.class).map(restClientException -> {
                return Boolean.valueOf(predicate.test(Integer.valueOf(restClientException.getHttpResponseStatus().code())));
            }).orElse(false)).booleanValue();
        };
    }

    @VisibleForTesting
    CompletableFuture<URL> getWebMonitorBaseUrl() {
        return FutureUtils.orTimeout(this.webMonitorLeaderRetriever.getLeaderFuture(), this.restClusterClientConfiguration.getAwaitLeaderTimeout(), TimeUnit.MILLISECONDS).thenApplyAsync(tuple2 -> {
            String str = (String) tuple2.f0;
            try {
                return new URL(str);
            } catch (MalformedURLException e) {
                throw new IllegalArgumentException("Could not parse URL from " + str, e);
            }
        }, (Executor) this.executorService);
    }
}
