package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/ReadSourcePortableTest.class */
public class ReadSourcePortableTest implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(PortableExecutionTest.class);

    @Parameterized.Parameter
    public boolean isStreaming;
    private static ListeningExecutorService flinkJobExecutor;

    @Parameterized.Parameters(name = "streaming: {0}")
    public static Object[] data() {
        return new Object[]{true, false};
    }

    @BeforeClass
    public static void setup() {
        flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
    }

    @AfterClass
    public static void tearDown() throws InterruptedException {
        flinkJobExecutor.shutdown();
        flinkJobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        if (!flinkJobExecutor.isShutdown()) {
            LOG.warn("Could not shutdown Flink job executor");
        }
        flinkJobExecutor = null;
    }

    @Test(timeout = 120000)
    public void testExecution() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.fromArgs(new String[]{"--experiments=beam_fn_api"}).create();
        create.setRunner(CrashingRunner.class);
        create.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
        create.as(FlinkPipelineOptions.class).setStreaming(this.isStreaming);
        create.as(FlinkPipelineOptions.class).setParallelism(2);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        PAssert.that(create2.apply(GenerateSequence.from(0L).to(10L))).containsInAnyOrder(ImmutableList.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
        JobInvocation createJobInvocation = FlinkJobInvoker.create((FlinkJobServerDriver.FlinkServerConfiguration) null).createJobInvocation("fakeId", "fakeRetrievalToken", flinkJobExecutor, PipelineTranslation.toProto(create2), create.as(FlinkPipelineOptions.class), new FlinkPipelineRunner(create.as(FlinkPipelineOptions.class), (String) null, Collections.emptyList()));
        createJobInvocation.start();
        while (createJobInvocation.getState() != JobApi.JobState.Enum.DONE) {
            Thread.sleep(100L);
        }
    }
}
