package org.apache.beam.runners.flink;

import java.io.File;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.security.Permission;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkSubmissionTest.class */
public class FlinkSubmissionTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Map<String, String> ENV = System.getenv();
    private static final SecurityManager SECURITY_MANAGER = System.getSecurityManager();
    private static transient RemoteMiniCluster flinkCluster;

    @Rule
    public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
    private static boolean streaming;
    private static int expectedNumberOfJobs;

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkSubmissionTest$SystemExitException.class */
    private static class SystemExitException extends SecurityException {
        private SystemExitException() {
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT.key(), 0);
        flinkCluster = new RemoteMiniClusterImpl(new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setRpcServiceSharing(RpcServiceSharing.SHARED).build());
        flinkCluster.start();
        prepareEnvironment();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        restoreEnvironment();
        flinkCluster.close();
        flinkCluster = null;
    }

    @Test
    public void testSubmissionBatch() throws Exception {
        runSubmission(false, false);
    }

    @Test
    public void testSubmissionStreaming() throws Exception {
        runSubmission(false, true);
    }

    @Test
    public void testDetachedSubmissionBatch() throws Exception {
        runSubmission(true, false);
    }

    @Test
    public void testDetachedSubmissionStreaming() throws Exception {
        runSubmission(true, true);
    }

    private void runSubmission(boolean z, boolean z2) throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setTempLocation(TEMP_FOLDER.getRoot().getPath());
        String str = (String) Iterables.getFirst(PipelineResources.detectClassPathResourcesToStage(getClass().getClassLoader(), create), (Object) null);
        try {
            try {
                throwExceptionOnSystemExit();
                ImmutableList.Builder builder = ImmutableList.builder();
                builder.add("run").add("-c").add(getClass().getName());
                if (z) {
                    builder.add("-d");
                }
                builder.add(str);
                streaming = z2;
                expectedNumberOfJobs++;
                CliFrontend.main((String[]) builder.build().toArray(new String[0]));
                restoreDefaultSystemExitBehavior();
            } catch (SystemExitException e) {
                restoreDefaultSystemExitBehavior();
            }
            waitUntilJobIsCompleted();
        } catch (Throwable th) {
            restoreDefaultSystemExitBehavior();
            throw th;
        }
    }

    private void waitUntilJobIsCompleted() throws Exception {
        while (true) {
            Collection collection = (Collection) flinkCluster.listJobs().get();
            if (collection.size() == expectedNumberOfJobs && collection.stream().allMatch(jobStatusMessage -> {
                return jobStatusMessage.getJobState().name().equals("FINISHED");
            })) {
                return;
            } else {
                Thread.sleep(50L);
            }
        }
    }

    public static void main(String[] strArr) {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setRunner(FlinkRunner.class);
        defaults.setStreaming(streaming);
        defaults.setParallelism(1);
        Pipeline create = Pipeline.create(defaults);
        create.apply(GenerateSequence.from(0L).to(1L));
        create.run();
    }

    private static void prepareEnvironment() throws Exception {
        File newFile = TEMP_FOLDER.newFile("flink-conf.yaml");
        Files.write(newFile.toPath(), String.format("%s: %s\n%s: %s\n%s: %s", JobManagerOptions.ADDRESS.key(), "localhost", JobManagerOptions.PORT.key(), Integer.valueOf(flinkCluster.getClusterPort()), RestOptions.PORT.key(), Integer.valueOf(flinkCluster.getRestPort())).getBytes(Charsets.UTF_8), new OpenOption[0]);
        modifyEnv(ImmutableMap.builder().putAll(ENV.entrySet()).put("FLINK_CONF_DIR", newFile.getParent()).build());
    }

    private static void restoreEnvironment() throws Exception {
        modifyEnv(ENV);
    }

    private static void modifyEnv(Map<String, String> map) throws Exception {
        Field declaredField = Class.forName("java.lang.ProcessEnvironment").getDeclaredField("theUnmodifiableEnvironment");
        Field declaredField2 = Field.class.getDeclaredField("modifiers");
        declaredField2.setAccessible(true);
        declaredField2.setInt(declaredField, declaredField.getModifiers() & (-17));
        declaredField.setAccessible(true);
        declaredField.set(null, map);
        declaredField.setAccessible(false);
        declaredField2.setInt(declaredField, declaredField.getModifiers() & 16);
        declaredField2.setAccessible(false);
    }

    private static void throwExceptionOnSystemExit() {
        System.setSecurityManager(new SecurityManager() { // from class: org.apache.beam.runners.flink.FlinkSubmissionTest.1
            @Override // java.lang.SecurityManager
            public void checkPermission(Permission permission) {
                if (permission.getName().startsWith("exitVM")) {
                    throw new SystemExitException();
                }
                if (FlinkSubmissionTest.SECURITY_MANAGER != null) {
                    FlinkSubmissionTest.SECURITY_MANAGER.checkPermission(permission);
                }
            }
        });
    }

    private static void restoreDefaultSystemExitBehavior() {
        System.setSecurityManager(SECURITY_MANAGER);
    }
}
