package org.apache.flink.runtime.webmonitor.handlers;

import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.class */
public class JarRunHandlerParameterTest extends TestLogger {

    @ClassRule
    public static final TemporaryFolder TMP = new TemporaryFolder();

    @ClassRule
    public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    private static final AtomicReference<JobGraph> lastSubmittedJobGraphReference = new AtomicReference<>();
    private static JarRunHandler handler;
    private static Path jarWithManifest;
    private static Path jarWithoutManifest;
    private static TestingDispatcherGateway restfulGateway;

    @BeforeClass
    public static void setup() throws Exception {
        Path path = TMP.newFolder().toPath();
        String str = System.getProperty("parameterJarName") + ".jar";
        String str2 = System.getProperty("parameterJarWithoutManifestName") + ".jar";
        Path path2 = Paths.get(System.getProperty("targetDir"), new String[0]);
        jarWithManifest = Files.copy(path2.resolve(str), path.resolve("program-with-manifest.jar"), new CopyOption[0]);
        jarWithoutManifest = Files.copy(path2.resolve(str2), path.resolve("program-without-manifest.jar"), new CopyOption[0]);
        new Configuration().setString(BlobServerOptions.STORAGE_DIRECTORY, TMP.newFolder().getAbsolutePath());
        restfulGateway = new TestingDispatcherGateway.Builder().setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort()).setSubmitFunction(jobGraph -> {
            lastSubmittedJobGraphReference.set(jobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        handler = new JarRunHandler(CompletableFuture.completedFuture("shazam://localhost:12345"), () -> {
            return CompletableFuture.completedFuture(restfulGateway);
        }, Time.seconds(10L), Collections.emptyMap(), JarRunHeaders.getInstance(), path, new Configuration(), TestingUtils.defaultExecutor());
    }

    @Before
    public void reset() {
        ParameterProgram.actualArguments = null;
    }

    @Test
    public void testDefaultParameters() throws Exception {
        sendRequestAndValidateGraph(handler, restfulGateway, () -> {
            return createRequest(new JarRunRequestBody(), JarRunHeaders.getInstance().getUnresolvedMessageParameters(), jarWithManifest);
        }, jobGraph -> {
            Assert.assertEquals(0L, ParameterProgram.actualArguments.length);
            Assert.assertEquals(-1L, getExecutionConfig(jobGraph).getParallelism());
            SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
            Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
            Assert.assertNull(savepointRestoreSettings.getRestorePath());
        });
    }

    @Test
    public void testConfigurationViaQueryParameters() throws Exception {
        sendRequestAndValidateGraph(handler, restfulGateway, () -> {
            JarRunMessageParameters unresolvedMessageParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
            unresolvedMessageParameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
            unresolvedMessageParameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
            unresolvedMessageParameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
            unresolvedMessageParameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
            unresolvedMessageParameters.programArgsQueryParameter.resolve(Collections.singletonList("--host localhost --port 1234"));
            return createRequest(new JarRunRequestBody(), unresolvedMessageParameters, jarWithoutManifest);
        }, jobGraph -> {
            Assert.assertEquals(4L, ParameterProgram.actualArguments.length);
            Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
            Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
            Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
            Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
            Assert.assertEquals(4L, getExecutionConfig(jobGraph).getParallelism());
            SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
            Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
            Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
        });
    }

    @Test
    public void testConfigurationViaJsonRequest() throws Exception {
        sendRequestAndValidateGraph(handler, restfulGateway, () -> {
            return createRequest(new JarRunRequestBody(ParameterProgram.class.getCanonicalName(), "--host localhost --port 1234", 4, true, "/foo/bar"), JarRunHeaders.getInstance().getUnresolvedMessageParameters(), jarWithoutManifest);
        }, jobGraph -> {
            Assert.assertEquals(4L, ParameterProgram.actualArguments.length);
            Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
            Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
            Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
            Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
            Assert.assertEquals(4L, getExecutionConfig(jobGraph).getParallelism());
            SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
            Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
            Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
        });
    }

    @Test
    public void testParameterPrioritization() throws Exception {
        sendRequestAndValidateGraph(handler, restfulGateway, () -> {
            JarRunRequestBody jarRunRequestBody = new JarRunRequestBody(ParameterProgram.class.getCanonicalName(), "--host localhost --port 1234", 4, true, "/foo/bar");
            JarRunMessageParameters unresolvedMessageParameters = JarRunHeaders.getInstance().getUnresolvedMessageParameters();
            unresolvedMessageParameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
            unresolvedMessageParameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
            unresolvedMessageParameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
            unresolvedMessageParameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
            unresolvedMessageParameters.programArgsQueryParameter.resolve(Collections.singletonList("--host wrong --port wrong"));
            return createRequest(jarRunRequestBody, unresolvedMessageParameters, jarWithoutManifest);
        }, jobGraph -> {
            Assert.assertEquals(4L, ParameterProgram.actualArguments.length);
            Assert.assertEquals("--host", ParameterProgram.actualArguments[0]);
            Assert.assertEquals("localhost", ParameterProgram.actualArguments[1]);
            Assert.assertEquals("--port", ParameterProgram.actualArguments[2]);
            Assert.assertEquals("1234", ParameterProgram.actualArguments[3]);
            Assert.assertEquals(4L, getExecutionConfig(jobGraph).getParallelism());
            SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
            Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
            Assert.assertEquals("/foo/bar", savepointRestoreSettings.getRestorePath());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static HandlerRequest<JarRunRequestBody, JarRunMessageParameters> createRequest(JarRunRequestBody jarRunRequestBody, JarRunMessageParameters jarRunMessageParameters, Path path) throws HandlerRequestException {
        return new HandlerRequest<>(jarRunRequestBody, JarRunHeaders.getInstance().getUnresolvedMessageParameters(), Collections.singletonMap("jarid", path.getFileName().toString()), (Map) jarRunMessageParameters.getQueryParameters().stream().filter((v0) -> {
            return v0.isResolved();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, JarRunHandlerParameterTest::getValuesAsString)), Collections.emptyList());
    }

    private static void sendRequestAndValidateGraph(JarRunHandler jarRunHandler, DispatcherGateway dispatcherGateway, SupplierWithException<HandlerRequest<JarRunRequestBody, JarRunMessageParameters>, HandlerRequestException> supplierWithException, ThrowingConsumer<JobGraph, AssertionError> throwingConsumer) throws Exception {
        jarRunHandler.handleRequest((HandlerRequest) supplierWithException.get(), dispatcherGateway).get();
        throwingConsumer.accept(lastSubmittedJobGraphReference.getAndSet(null));
    }

    private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
        try {
            return (ExecutionConfig) jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
        } catch (Exception e) {
            throw new AssertionError("Exception while deserializing ExecutionConfig.", e);
        }
    }

    private static <X> List<String> getValuesAsString(MessageQueryParameter<X> messageQueryParameter) {
        Stream stream = ((List) messageQueryParameter.getValue()).stream();
        messageQueryParameter.getClass();
        return (List) stream.map(messageQueryParameter::convertValueToString).collect(Collectors.toList());
    }
}
