/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.classloading;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ClassLoaderITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
    private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar";
    private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "target/streaming-customsplit-test-jar.jar";
    private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar";
    private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "target/streaming-checkpointed-classloader-test-jar.jar";
    private static final String KMEANS_JAR_PATH = "target/kmeans-test-jar.jar";
    private static final String USERCODETYPE_JAR_PATH = "target/usercodetype-test-jar.jar";
    private static final String CUSTOM_KV_STATE_JAR_PATH = "target/custom_kv_state-test-jar.jar";
    private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "target/checkpointing_custom_kv_state-test-jar.jar";
    private static final String CLASSLOADING_POLICY_JAR_PATH = "target/classloading_policy-test-jar.jar";
    @ClassRule
    public static final TemporaryFolder FOLDER = new TemporaryFolder();
    private static MiniClusterResource miniClusterResource = null;
    private static final int parallelism = 4;

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration config = new Configuration();
        config.set(StateBackendOptions.STATE_BACKEND, (Object)"hashmap");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)FOLDER.newFolder().getAbsoluteFile().toURI().toString());
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)FOLDER.newFolder().getAbsoluteFile().toURI().toString());
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"80m"));
        config.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)"filesystem");
        config.set(FsStateChangelogOptions.BASE_PATH, (Object)FOLDER.newFolder().getAbsolutePath());
        config.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, (Object)true);
        miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).setConfiguration(config).build());
        miniClusterResource.before();
    }

    @AfterClass
    public static void tearDownClass() {
        if (miniClusterResource != null) {
            miniClusterResource.after();
        }
    }

    @After
    public void tearDown() {
        TestStreamEnvironment.unsetAsContext();
    }

    @Test
    public void testStreamingCustomSplitJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram streamingInputSplitTestProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)), Collections.emptyList());
        streamingInputSplitTestProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram streamingProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_PROG_JAR_FILE)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)), Collections.emptyList());
        streamingProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram streamingCheckpointedProg = PackagedProgram.newBuilder().setJarFile(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList());
        Assertions.assertThatThrownBy(() -> Class.forName("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")).isInstanceOf(ClassNotFoundException.class);
        Assertions.assertThatThrownBy(() -> streamingCheckpointedProg.invokeInteractiveModeForExecution()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SerializedThrowable.class, (String)"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram$SuccessException")});
    }

    @Test
    public void testUserCodeTypeJobWithCustomClassLoader() throws ProgramInvocationException {
        PackagedProgram userCodeTypeProg = PackagedProgram.newBuilder().setJarFile(new File(USERCODETYPE_JAR_PATH)).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(USERCODETYPE_JAR_PATH)), Collections.emptyList());
        userCodeTypeProg.invokeInteractiveModeForExecution();
    }

    @Test
    public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
        File checkpointDir = FOLDER.newFolder();
        File outputDir = FOLDER.newFolder();
        PackagedProgram program = PackagedProgram.newBuilder().setJarFile(new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)).setArguments(new String[]{checkpointDir.toURI().toString(), outputDir.toURI().toString()}).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)), Collections.emptyList());
        try {
            program.invokeInteractiveModeForExecution();
            Assert.fail((String)"exception should happen");
        }
        catch (ProgramInvocationException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, SuccessException.class).isPresent());
        }
    }

    @Test
    public void testDisposeSavepointWithCustomKvState() throws Exception {
        MiniClusterClient clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster());
        Deadline deadline = new FiniteDuration(100L, TimeUnit.SECONDS).fromNow();
        File checkpointDir = FOLDER.newFolder();
        File outputDir = FOLDER.newFolder();
        PackagedProgram program = PackagedProgram.newBuilder().setJarFile(new File(CUSTOM_KV_STATE_JAR_PATH)).setArguments(new String[]{String.valueOf(4), checkpointDir.toURI().toString(), "5000", outputDir.toURI().toString(), "false"}).build();
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)), Collections.emptyList());
        Thread invokeThread = new Thread(() -> {
            block2: {
                try {
                    program.invokeInteractiveModeForExecution();
                }
                catch (ProgramInvocationException ex) {
                    if (ex.getCause() != null && ex.getCause() instanceof JobCancellationException) break block2;
                    ex.printStackTrace();
                }
            }
        });
        LOG.info("Starting program invoke thread");
        invokeThread.start();
        JobID jobId = null;
        LOG.info("Waiting for job status running.");
        while (jobId == null && deadline.hasTimeLeft()) {
            Collection jobs = (Collection)clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            for (JobStatusMessage job : jobs) {
                if (job.getJobState() != JobStatus.RUNNING) continue;
                jobId = job.getJobId();
                LOG.info("Job running. ID: " + jobId);
                break;
            }
            if (jobId != null) continue;
            Thread.sleep(100L);
        }
        String savepointPath = null;
        for (int i = 0; i < 20; ++i) {
            LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
            try {
                savepointPath = (String)clusterClient.triggerSavepoint(jobId, null, SavepointFormatType.CANONICAL).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                continue;
            }
            catch (Exception cause) {
                LOG.info("Failed to trigger savepoint. Retrying...", (Throwable)cause);
                Thread.sleep(500L);
            }
        }
        Assert.assertNotNull((String)"Failed to trigger savepoint", savepointPath);
        clusterClient.disposeSavepoint(savepointPath).get();
        clusterClient.cancel(jobId).get();
        invokeThread.join(deadline.timeLeft().toMillis());
        Assert.assertFalse((String)"Program invoke thread still running", (boolean)invokeThread.isAlive());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgramWithChildFirstClassLoader() throws IOException, ProgramInvocationException {
        String childResourceDirName = "child0";
        String testResourceName = "test-resource";
        File childResourceDir = FOLDER.newFolder(childResourceDirName);
        File childResource = new File(childResourceDir, testResourceName);
        Assert.assertTrue((boolean)childResource.createNewFile());
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CLASSLOADING_POLICY_JAR_PATH)), Collections.emptyList());
        Configuration childFirstConf = new Configuration();
        childFirstConf.setString("classloader.resolve-order", "child-first");
        PackagedProgram childFirstProgram = PackagedProgram.newBuilder().setJarFile(new File(CLASSLOADING_POLICY_JAR_PATH)).setUserClassPaths(Collections.singletonList(childResourceDir.toURI().toURL())).setConfiguration(childFirstConf).setArguments(new String[]{testResourceName, childResourceDirName}).build();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(childFirstProgram.getUserCodeClassLoader());
        try {
            childFirstProgram.invokeInteractiveModeForExecution();
        }
        finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgramWithParentFirstClassLoader() throws IOException, ProgramInvocationException {
        String childResourceDirName = "child1";
        String testResourceName = "test-resource";
        File childResourceDir = FOLDER.newFolder(childResourceDirName);
        File childResource = new File(childResourceDir, testResourceName);
        Assert.assertTrue((boolean)childResource.createNewFile());
        TestStreamEnvironment.setAsContext((MiniCluster)miniClusterResource.getMiniCluster(), (int)4, Collections.singleton(new Path(CLASSLOADING_POLICY_JAR_PATH)), Collections.emptyList());
        Configuration parentFirstConf = new Configuration();
        parentFirstConf.setString("classloader.resolve-order", "parent-first");
        PackagedProgram parentFirstProgram = PackagedProgram.newBuilder().setJarFile(new File(CLASSLOADING_POLICY_JAR_PATH)).setUserClassPaths(Collections.singletonList(childResourceDir.toURI().toURL())).setConfiguration(parentFirstConf).setArguments(new String[]{testResourceName, "test-classes"}).build();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(parentFirstProgram.getUserCodeClassLoader());
        try {
            parentFirstProgram.invokeInteractiveModeForExecution();
        }
        finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }
}

