package org.apache.flink.runtime.webmonitor.handlers;

import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import junit.framework.TestCase;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.runtime.webmonitor.handlers.JarMessageParameters;
import org.apache.flink.runtime.webmonitor.handlers.JarRequestBody;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.class */
public abstract class JarHandlerParameterTest<REQB extends JarRequestBody, M extends JarMessageParameters> extends TestLogger {
    static final int PARALLELISM = 4;
    static TestingDispatcherGateway restfulGateway;
    static Path jarDir;
    private static Path jarWithManifest;
    private static Path jarWithoutManifest;
    static final String[] PROG_ARGS = {"--host", "localhost", "--port", "1234"};

    @ClassRule
    public static final TemporaryFolder TMP = new TemporaryFolder();

    @ClassRule
    public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource();
    static final AtomicReference<JobGraph> LAST_SUBMITTED_JOB_GRAPH_REFERENCE = new AtomicReference<>();
    static GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () -> {
        return CompletableFuture.completedFuture(restfulGateway);
    };
    static CompletableFuture<String> localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
    static Time timeout = Time.seconds(10);
    static Map<String, String> responseHeaders = Collections.emptyMap();
    static Executor executor = TestingUtils.defaultExecutor();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest$ProgramArgsParType.class */
    public enum ProgramArgsParType {
        String,
        List,
        Both
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void init() throws Exception {
        jarDir = TMP.newFolder().toPath();
        String str = System.getProperty("parameterJarName") + ".jar";
        String str2 = System.getProperty("parameterJarWithoutManifestName") + ".jar";
        Path path = Paths.get(System.getProperty("targetDir"), new String[0]);
        jarWithManifest = Files.copy(path.resolve(str), jarDir.resolve("program-with-manifest.jar"), new CopyOption[0]);
        jarWithoutManifest = Files.copy(path.resolve(str2), jarDir.resolve("program-without-manifest.jar"), new CopyOption[0]);
        restfulGateway = new TestingDispatcherGateway.Builder().setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort()).setSubmitFunction(jobGraph -> {
            LAST_SUBMITTED_JOB_GRAPH_REFERENCE.set(jobGraph);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        gatewayRetriever = () -> {
            return CompletableFuture.completedFuture(restfulGateway);
        };
        localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345");
        timeout = Time.seconds(10L);
        responseHeaders = Collections.emptyMap();
        executor = TestingUtils.defaultExecutor();
    }

    @Before
    public void reset() {
        ParameterProgram.actualArguments = null;
    }

    @Test
    public void testDefaultParameters() throws Exception {
        handleRequest(createRequest(getDefaultJarRequestBody(), getUnresolvedJarMessageParameters(), getUnresolvedJarMessageParameters(), jarWithManifest));
        validateDefaultGraph();
    }

    @Test
    public void testConfigurationViaQueryParametersWithProgArgsAsString() throws Exception {
        testConfigurationViaQueryParameters(ProgramArgsParType.String);
    }

    @Test
    public void testConfigurationViaQueryParametersWithProgArgsAsList() throws Exception {
        testConfigurationViaQueryParameters(ProgramArgsParType.List);
    }

    @Test
    public void testConfigurationViaQueryParametersFailWithProgArgsAsStringAndList() throws Exception {
        try {
            testConfigurationViaQueryParameters(ProgramArgsParType.Both);
            TestCase.fail("RestHandlerException is excepted");
        } catch (RestHandlerException e) {
            TestCase.assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
        }
    }

    private void testConfigurationViaQueryParameters(ProgramArgsParType programArgsParType) throws Exception {
        handleRequest(createRequest(getDefaultJarRequestBody(), getJarMessageParameters(programArgsParType), getUnresolvedJarMessageParameters(), jarWithoutManifest));
        validateGraph();
    }

    @Test
    public void testConfigurationViaJsonRequestWithProgArgsAsString() throws Exception {
        testConfigurationViaJsonRequest(ProgramArgsParType.String);
    }

    @Test
    public void testConfigurationViaJsonRequestWithProgArgsAsList() throws Exception {
        testConfigurationViaJsonRequest(ProgramArgsParType.List);
    }

    @Test
    public void testConfigurationViaJsonRequestFailWithProgArgsAsStringAndList() throws Exception {
        try {
            testConfigurationViaJsonRequest(ProgramArgsParType.Both);
            TestCase.fail("RestHandlerException is excepted");
        } catch (RestHandlerException e) {
            TestCase.assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
        }
    }

    @Test
    public void testProvideJobId() throws Exception {
        JobID jobID = new JobID();
        handleRequest(createRequest(getJarRequestBodyWithJobId(jobID), getUnresolvedJarMessageParameters(), getUnresolvedJarMessageParameters(), jarWithManifest));
        Optional<JobGraph> lastSubmittedJobGraphAndReset = getLastSubmittedJobGraphAndReset();
        MatcherAssert.assertThat(Boolean.valueOf(lastSubmittedJobGraphAndReset.isPresent()), CoreMatchers.is(true));
        MatcherAssert.assertThat(lastSubmittedJobGraphAndReset.get().getJobID(), CoreMatchers.is(CoreMatchers.equalTo(jobID)));
    }

    private void testConfigurationViaJsonRequest(ProgramArgsParType programArgsParType) throws Exception {
        handleRequest(createRequest(getJarRequestBody(programArgsParType), getUnresolvedJarMessageParameters(), getUnresolvedJarMessageParameters(), jarWithoutManifest));
        validateGraph();
    }

    @Test
    public void testParameterPrioritizationWithProgArgsAsString() throws Exception {
        testParameterPrioritization(ProgramArgsParType.String);
    }

    @Test
    public void testParameterPrioritizationWithProgArgsAsList() throws Exception {
        testParameterPrioritization(ProgramArgsParType.List);
    }

    @Test
    public void testFailIfProgArgsAreAsStringAndAsList() throws Exception {
        try {
            testParameterPrioritization(ProgramArgsParType.Both);
            TestCase.fail("RestHandlerException is excepted");
        } catch (RestHandlerException e) {
            TestCase.assertEquals(HttpResponseStatus.BAD_REQUEST, e.getHttpResponseStatus());
        }
    }

    private void testParameterPrioritization(ProgramArgsParType programArgsParType) throws Exception {
        handleRequest(createRequest(getJarRequestBody(programArgsParType), getWrongJarMessageParameters(programArgsParType), getUnresolvedJarMessageParameters(), jarWithoutManifest));
        validateGraph();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String getProgramArgsString(ProgramArgsParType programArgsParType) {
        if (programArgsParType == ProgramArgsParType.String || programArgsParType == ProgramArgsParType.Both) {
            return String.join(" ", PROG_ARGS);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<String> getProgramArgsList(ProgramArgsParType programArgsParType) {
        if (programArgsParType == ProgramArgsParType.List || programArgsParType == ProgramArgsParType.Both) {
            return Arrays.asList(PROG_ARGS);
        }
        return null;
    }

    private static <REQB extends JarRequestBody, M extends JarMessageParameters> HandlerRequest<REQB, M> createRequest(REQB reqb, M m, M m2, Path path) throws HandlerRequestException {
        return new HandlerRequest<>(reqb, m2, Collections.singletonMap("jarid", path.getFileName().toString()), (Map) m.getQueryParameters().stream().filter((v0) -> {
            return v0.isResolved();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, JarHandlerParameterTest::getValuesAsString)), Collections.emptyList());
    }

    private static <X> List<String> getValuesAsString(MessageQueryParameter<X> messageQueryParameter) {
        Stream stream = ((List) messageQueryParameter.getValue()).stream();
        messageQueryParameter.getClass();
        return (List) stream.map(messageQueryParameter::convertValueToString).collect(Collectors.toList());
    }

    abstract M getUnresolvedJarMessageParameters();

    abstract M getJarMessageParameters(ProgramArgsParType programArgsParType);

    abstract M getWrongJarMessageParameters(ProgramArgsParType programArgsParType);

    abstract REQB getDefaultJarRequestBody();

    abstract REQB getJarRequestBody(ProgramArgsParType programArgsParType);

    abstract REQB getJarRequestBodyWithJobId(JobID jobID);

    abstract void handleRequest(HandlerRequest<REQB, M> handlerRequest) throws Exception;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobGraph validateDefaultGraph() {
        JobGraph andSet = LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
        Assert.assertEquals(0L, ParameterProgram.actualArguments.length);
        Assert.assertEquals(((Integer) CoreOptions.DEFAULT_PARALLELISM.defaultValue()).intValue(), getExecutionConfig(andSet).getParallelism());
        return andSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JobGraph validateGraph() {
        JobGraph andSet = LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
        Assert.assertArrayEquals(PROG_ARGS, ParameterProgram.actualArguments);
        Assert.assertEquals(4L, getExecutionConfig(andSet).getParallelism());
        return andSet;
    }

    private static Optional<JobGraph> getLastSubmittedJobGraphAndReset() {
        return Optional.ofNullable(LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null));
    }

    private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
        try {
            return (ExecutionConfig) jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
        } catch (Exception e) {
            throw new AssertionError("Exception while deserializing ExecutionConfig.", e);
        }
    }
}
