package org.apache.flink.client.program.rest;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.class */
public class RestClusterClientSavepointTriggerTest extends TestLogger {
    private static final DispatcherGateway mockRestfulGateway = new TestingDispatcherGateway.Builder().build();
    private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever = () -> {
        return CompletableFuture.completedFuture(mockRestfulGateway);
    };
    private static RestServerEndpointConfiguration restServerEndpointConfiguration;
    private static ExecutorService executor;
    private static final Configuration REST_CONFIG;

    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest$TestHandler.class */
    private static abstract class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
        private TestHandler(MessageHeaders<R, P, M> messageHeaders) {
            super(RestClusterClientSavepointTriggerTest.mockGatewayRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), messageHeaders);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest$TestSavepointHandler.class */
    public static class TestSavepointHandler extends TestHandler<EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> {
        private final FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> savepointHandlerLogic;

        TestSavepointHandler(FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> functionWithException) {
            super(SavepointStatusHeaders.getInstance());
            this.savepointHandlerLogic = functionWithException;
        }

        protected CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(this.savepointHandlerLogic.apply((TriggerId) handlerRequest.getPathParameter(TriggerIdPathParameter.class))));
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<EmptyRequestBody, SavepointStatusMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest$TestSavepointTriggerHandler.class */
    public static final class TestSavepointTriggerHandler extends TestHandler<SavepointTriggerRequestBody, TriggerResponse, SavepointTriggerMessageParameters> {
        private final FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> triggerHandlerLogic;

        TestSavepointTriggerHandler(FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> functionWithException) {
            super(SavepointTriggerHeaders.getInstance());
            this.triggerHandlerLogic = functionWithException;
        }

        protected CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters> handlerRequest, @Nonnull DispatcherGateway dispatcherGateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(new TriggerResponse((TriggerId) this.triggerHandlerLogic.apply(handlerRequest.getRequestBody())));
        }

        protected /* bridge */ /* synthetic */ CompletableFuture handleRequest(@Nonnull HandlerRequest handlerRequest, @Nonnull RestfulGateway restfulGateway) throws RestHandlerException {
            return handleRequest((HandlerRequest<SavepointTriggerRequestBody, SavepointTriggerMessageParameters>) handlerRequest, (DispatcherGateway) restfulGateway);
        }
    }

    @BeforeClass
    public static void setUp() throws ConfigurationException {
        restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(REST_CONFIG);
        executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientSavepointTriggerTest.class.getSimpleName()));
    }

    @AfterClass
    public static void tearDown() {
        if (executor != null) {
            executor.shutdown();
        }
    }

    @Test
    public void testTriggerSavepointDefaultDirectory() throws Exception {
        TriggerId triggerId = new TriggerId();
        RestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(savepointTriggerRequestBody -> {
            Assert.assertNull(savepointTriggerRequestBody.getTargetDirectory());
            Assert.assertFalse(savepointTriggerRequestBody.isCancelJob());
            return triggerId;
        }, triggerId2 -> {
            Assert.assertEquals(triggerId, triggerId2);
            return new SavepointInfo("hello", (SerializedThrowable) null);
        });
        Throwable th = null;
        try {
            try {
                Assert.assertEquals("hello", (String) createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort()).triggerSavepoint(new JobID(), (String) null).get());
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (th != null) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggerSavepointTargetDirectory() throws Exception {
        TriggerId triggerId = new TriggerId();
        RestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(savepointTriggerRequestBody -> {
            Assert.assertEquals("world", savepointTriggerRequestBody.getTargetDirectory());
            Assert.assertFalse(savepointTriggerRequestBody.isCancelJob());
            return triggerId;
        }, triggerId2 -> {
            Assert.assertEquals(triggerId, triggerId2);
            return new SavepointInfo("hello", (SerializedThrowable) null);
        });
        Throwable th = null;
        try {
            try {
                Assert.assertEquals("hello", (String) createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort()).triggerSavepoint(new JobID(), "world").get());
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (th != null) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggerSavepointCancelJob() throws Exception {
        TriggerId triggerId = new TriggerId();
        RestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(savepointTriggerRequestBody -> {
            Assert.assertTrue(savepointTriggerRequestBody.isCancelJob());
            return triggerId;
        }, triggerId2 -> {
            Assert.assertEquals(triggerId, triggerId2);
            return new SavepointInfo("hello", (SerializedThrowable) null);
        });
        Throwable th = null;
        try {
            try {
                Assert.assertEquals("hello", (String) createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort()).cancelWithSavepoint(new JobID(), (String) null).get());
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (th != null) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggerSavepointFailure() throws Exception {
        TriggerId triggerId = new TriggerId();
        RestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(savepointTriggerRequestBody -> {
            return triggerId;
        }, triggerId2 -> {
            return new SavepointInfo((String) null, new SerializedThrowable(new RuntimeException("expected")));
        });
        Throwable th = null;
        try {
            try {
                try {
                    createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort()).triggerSavepoint(new JobID(), (String) null).get();
                } catch (ExecutionException e) {
                    SerializedThrowable cause = e.getCause();
                    Assert.assertThat(cause, Matchers.instanceOf(SerializedThrowable.class));
                    Assert.assertThat(cause.deserializeError(ClassLoader.getSystemClassLoader()).getMessage(), Matchers.equalTo("expected"));
                }
                if (createRestServerEndpoint != null) {
                    if (0 == 0) {
                        createRestServerEndpoint.close();
                        return;
                    }
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createRestServerEndpoint != null) {
                if (th != null) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testTriggerSavepointRetry() throws Exception {
        TriggerId triggerId = new TriggerId();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        RestServerEndpoint createRestServerEndpoint = createRestServerEndpoint(savepointTriggerRequestBody -> {
            return triggerId;
        }, triggerId2 -> {
            if (atomicBoolean.compareAndSet(true, false)) {
                throw new RestHandlerException("expected", HttpResponseStatus.SERVICE_UNAVAILABLE);
            }
            return new SavepointInfo("hello", (SerializedThrowable) null);
        });
        Throwable th = null;
        try {
            Assert.assertEquals("hello", (String) createRestClusterClient(createRestServerEndpoint.getServerAddress().getPort()).triggerSavepoint(new JobID(), (String) null).get());
            if (createRestServerEndpoint != null) {
                if (0 == 0) {
                    createRestServerEndpoint.close();
                    return;
                }
                try {
                    createRestServerEndpoint.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createRestServerEndpoint != null) {
                if (0 != 0) {
                    try {
                        createRestServerEndpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createRestServerEndpoint.close();
                }
            }
            throw th3;
        }
    }

    private static RestServerEndpoint createRestServerEndpoint(FunctionWithException<SavepointTriggerRequestBody, TriggerId, RestHandlerException> functionWithException, FunctionWithException<TriggerId, SavepointInfo, RestHandlerException> functionWithException2) throws Exception {
        return TestRestServerEndpoint.builder(restServerEndpointConfiguration).withHandler(new TestSavepointTriggerHandler(functionWithException)).withHandler(new TestSavepointHandler(functionWithException2)).buildAndStart();
    }

    private RestClusterClient<StandaloneClusterId> createRestClusterClient(int i) throws Exception {
        Configuration configuration = new Configuration(REST_CONFIG);
        configuration.setInteger(RestOptions.PORT, i);
        return new RestClusterClient<>(configuration, new RestClient(RestClientConfiguration.fromConfiguration(REST_CONFIG), executor), StandaloneClusterId.getInstance(), j -> {
            return 0L;
        });
    }

    static {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10);
        configuration.setLong(RestOptions.RETRY_DELAY, 0L);
        configuration.setInteger(RestOptions.PORT, 0);
        REST_CONFIG = new UnmodifiableConfiguration(configuration);
    }
}
