/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.File;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.security.Permission;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.RemoteMiniCluster;
import org.apache.beam.runners.flink.RemoteMiniClusterImpl;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class FlinkSubmissionTest {
    @ClassRule
    public static final @UnknownKeyFor @NonNull @Initialized TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> ENV = System.getenv();
    private static final @UnknownKeyFor @NonNull @Initialized SecurityManager SECURITY_MANAGER = System.getSecurityManager();
    private static transient @UnknownKeyFor @NonNull @Initialized RemoteMiniCluster flinkCluster;
    @Rule
    public @UnknownKeyFor @NonNull @Initialized Timeout timeout = new Timeout(60L, TimeUnit.SECONDS);
    private static @UnknownKeyFor @NonNull @Initialized boolean streaming;
    private static @UnknownKeyFor @NonNull @Initialized int expectedNumberOfJobs;

    @BeforeClass
    public static void beforeClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT.key(), 0);
        MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder().setConfiguration(config).setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setRpcServiceSharing(RpcServiceSharing.SHARED).build();
        flinkCluster = new RemoteMiniClusterImpl(clusterConfig);
        flinkCluster.start();
        FlinkSubmissionTest.prepareEnvironment();
    }

    @AfterClass
    public static void afterClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkSubmissionTest.restoreEnvironment();
        flinkCluster.close();
        flinkCluster = null;
    }

    @Test
    public void testSubmissionBatch() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.runSubmission(false, false);
    }

    @Test
    public void testSubmissionStreaming() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.runSubmission(false, true);
    }

    @Test
    public void testDetachedSubmissionBatch() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.runSubmission(true, false);
    }

    @Test
    public void testDetachedSubmissionStreaming() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.runSubmission(true, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runSubmission(@UnknownKeyFor @NonNull @Initialized boolean isDetached, @UnknownKeyFor @NonNull @Initialized boolean isStreaming) throws @UnknownKeyFor @NonNull @Initialized Exception {
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setTempLocation(TEMP_FOLDER.getRoot().getPath());
        String jarPath = (String)Iterables.getFirst((Iterable)PipelineResources.detectClassPathResourcesToStage((ClassLoader)this.getClass().getClassLoader(), (PipelineOptions)options), null);
        try {
            FlinkSubmissionTest.throwExceptionOnSystemExit();
            ImmutableList.Builder argsBuilder = ImmutableList.builder();
            argsBuilder.add((Object)"run").add((Object)"-c").add((Object)this.getClass().getName());
            if (isDetached) {
                argsBuilder.add((Object)"-d");
            }
            argsBuilder.add((Object)jarPath);
            streaming = isStreaming;
            ++expectedNumberOfJobs;
            CliFrontend.main((String[])((String[])argsBuilder.build().toArray((Object[])new String[0])));
        }
        catch (SystemExitException systemExitException) {
        }
        finally {
            FlinkSubmissionTest.restoreDefaultSystemExitBehavior();
        }
        this.waitUntilJobIsCompleted();
    }

    private void waitUntilJobIsCompleted() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Collection allJobsStates;
        while ((allJobsStates = (Collection)flinkCluster.listJobs().get()).size() != expectedNumberOfJobs || !allJobsStates.stream().allMatch(jobStatus -> jobStatus.getJobState().name().equals("FINISHED"))) {
            Thread.sleep(50L);
        }
        return;
    }

    public static void main(@UnknownKeyFor @NonNull @Initialized String @UnknownKeyFor @NonNull @Initialized [] args) {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setRunner(FlinkRunner.class);
        options.setStreaming(streaming);
        options.setParallelism(Integer.valueOf(1));
        Pipeline p = Pipeline.create((PipelineOptions)options);
        p.apply((PTransform)GenerateSequence.from((long)0L).to(1L));
        p.run();
    }

    private static void prepareEnvironment() throws @UnknownKeyFor @NonNull @Initialized Exception {
        File file = TEMP_FOLDER.newFile("flink-conf.yaml");
        String config = String.format("%s: %s\n%s: %s\n%s: %s", JobManagerOptions.ADDRESS.key(), "localhost", JobManagerOptions.PORT.key(), flinkCluster.getClusterPort(), RestOptions.PORT.key(), flinkCluster.getRestPort());
        Files.write(file.toPath(), config.getBytes(Charsets.UTF_8), new OpenOption[0]);
        ImmutableMap newEnv = ImmutableMap.builder().putAll(ENV.entrySet()).put((Object)"FLINK_CONF_DIR", (Object)file.getParent()).build();
        FlinkSubmissionTest.modifyEnv((Map<String, String>)newEnv);
    }

    private static void restoreEnvironment() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkSubmissionTest.modifyEnv(ENV);
    }

    private static void modifyEnv(@UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> env) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Class<?> processEnv = Class.forName("java.lang.ProcessEnvironment");
        Field envField = processEnv.getDeclaredField("theUnmodifiableEnvironment");
        Field modifiersField = Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(envField, envField.getModifiers() & 0xFFFFFFEF);
        envField.setAccessible(true);
        envField.set(null, env);
        envField.setAccessible(false);
        modifiersField.setInt(envField, envField.getModifiers() & 0x10);
        modifiersField.setAccessible(false);
    }

    private static void throwExceptionOnSystemExit() {
        System.setSecurityManager(new SecurityManager(){

            @Override
            public void checkPermission(@UnknownKeyFor @NonNull @Initialized Permission permission) {
                if (permission.getName().startsWith("exitVM")) {
                    throw new SystemExitException();
                }
                if (SECURITY_MANAGER != null) {
                    SECURITY_MANAGER.checkPermission(permission);
                }
            }
        });
    }

    private static void restoreDefaultSystemExitBehavior() {
        System.setSecurityManager(SECURITY_MANAGER);
    }

    private static class SystemExitException
    extends SecurityException {
        private SystemExitException() {
        }
    }
}

