package org.apache.flink.runtime.rest.handler.job;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.class */
public class JobSubmitHandlerTest extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private final Configuration configuration;
    private BlobServer blobServer;

    @Parameterized.Parameters(name = "SSL enabled: {0}")
    public static Iterable<Boolean> data() {
        return Arrays.asList(true, false);
    }

    public JobSubmitHandlerTest(boolean z) {
        this.configuration = z ? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores() : new Configuration();
    }

    @Before
    public void setup() throws IOException {
        Configuration configuration = new Configuration(this.configuration);
        configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        this.blobServer = new BlobServer(configuration, new VoidBlobStore());
        this.blobServer.start();
    }

    @After
    public void teardown() throws IOException {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @Test
    public void testSerializationFailureHandling() throws Exception {
        Path path = TEMPORARY_FOLDER.newFile().toPath();
        TestingDispatcherGateway build = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        try {
            new JobSubmitHandler(CompletableFuture.completedFuture("http://localhost:1234"), () -> {
                return CompletableFuture.completedFuture(build);
            }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestingUtils.defaultExecutor(), this.configuration).handleRequest(new HandlerRequest(new JobSubmitRequestBody(path.toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance()), build);
            Assert.fail();
        } catch (RestHandlerException e) {
            Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
        }
    }

    @Test
    public void testSuccessfulJobSubmission() throws Exception {
        Path path = TEMPORARY_FOLDER.newFile().toPath();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(new JobGraph("testjob"));
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
                builder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> {
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }).setHostname("localhost");
                TestingDispatcherGateway build = builder.build();
                new JobSubmitHandler(CompletableFuture.completedFuture("http://localhost:1234"), () -> {
                    return CompletableFuture.completedFuture(build);
                }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestingUtils.defaultExecutor(), this.configuration).handleRequest(new HandlerRequest(new JobSubmitRequestBody(path.getFileName().toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(path.toFile())), build).get();
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRejectionOnCountMismatch() throws Exception {
        Path path = TEMPORARY_FOLDER.newFile().toPath();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(new JobGraph("testjob"));
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                Path path2 = TEMPORARY_FOLDER.newFile().toPath();
                TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
                builder.setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> {
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }).setHostname("localhost");
                TestingDispatcherGateway build = builder.build();
                try {
                    new JobSubmitHandler(CompletableFuture.completedFuture("http://localhost:1234"), () -> {
                        return CompletableFuture.completedFuture(build);
                    }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestingUtils.defaultExecutor(), this.configuration).handleRequest(new HandlerRequest(new JobSubmitRequestBody(path.getFileName().toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(path.toFile(), path2.toFile())), build).get();
                } catch (Exception e) {
                    ExceptionUtils.findThrowable(e, th3 -> {
                        return (th3 instanceof RestHandlerException) && th3.getMessage().contains("count");
                    });
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th5;
        }
    }

    @Test
    public void testFileHandling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        TestingDispatcherGateway build = new TestingDispatcherGateway.Builder().setBlobServerPort(this.blobServer.getPort()).setSubmitFunction(jobGraph -> {
            completableFuture.complete(jobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(CompletableFuture.completedFuture("http://localhost:1234"), () -> {
            return CompletableFuture.completedFuture(build);
        }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestingUtils.defaultExecutor(), this.configuration);
        Path path = TEMPORARY_FOLDER.newFile().toPath();
        Path path2 = TEMPORARY_FOLDER.newFile().toPath();
        Path path3 = TEMPORARY_FOLDER.newFile().toPath();
        JobGraph jobGraph2 = new JobGraph(new JobVertex[0]);
        jobGraph2.addUserArtifact("entry", new DistributedCache.DistributedCacheEntry("random", false));
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(jobGraph2);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                jobSubmitHandler.handleRequest(new HandlerRequest(new JobSubmitRequestBody(path.getFileName().toString(), Collections.singletonList(path2.getFileName().toString()), Collections.singleton(new JobSubmitRequestBody.DistributedCacheFile("entry", path3.getFileName().toString()))), EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(path.toFile(), path2.toFile(), path3.toFile())), build).get();
                Assert.assertTrue("No JobGraph was submitted.", completableFuture.isDone());
                JobGraph jobGraph3 = (JobGraph) completableFuture.get();
                Assert.assertEquals(1L, jobGraph3.getUserJarBlobKeys().size());
                Assert.assertEquals(1L, jobGraph3.getUserArtifacts().size());
                Assert.assertNotNull(((DistributedCache.DistributedCacheEntry) jobGraph3.getUserArtifacts().get("entry")).blobKey);
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFailedJobSubmission() throws Exception {
        TestingDispatcherGateway build = new TestingDispatcherGateway.Builder().setSubmitFunction(jobGraph -> {
            return FutureUtils.completedExceptionally(new Exception("test"));
        }).build();
        JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(CompletableFuture.completedFuture("http://localhost:1234"), () -> {
            return CompletableFuture.completedFuture(build);
        }, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestingUtils.defaultExecutor(), this.configuration);
        Path path = TEMPORARY_FOLDER.newFile().toPath();
        JobGraph jobGraph2 = new JobGraph("testjob");
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(Files.newOutputStream(path, new OpenOption[0]));
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(jobGraph2);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                try {
                    jobSubmitHandler.handleRequest(new HandlerRequest(new JobSubmitRequestBody(path.getFileName().toString(), Collections.emptyList(), Collections.emptyList()), EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonList(path.toFile())), build).get();
                } catch (Exception e) {
                    Assert.assertEquals("test", ExceptionUtils.stripExecutionException(e).getMessage());
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th4;
        }
    }
}
