package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/PortableExecutionTest.class */
public class PortableExecutionTest implements Serializable {

    @Parameterized.Parameter
    public boolean isStreaming;
    private transient ListeningExecutorService flinkJobExecutor;
    private static ArrayList<KV<String, Iterable<Long>>> outputValues = new ArrayList<>();

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{true, false};
    }

    @Before
    public void setup() {
        this.flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    }

    @After
    public void tearDown() {
        this.flinkJobExecutor.shutdown();
    }

    @Test
    public void testExecution() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        create.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
        create.as(FlinkPipelineOptions.class).setStreaming(this.isStreaming);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        create2.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.flink.PortableExecutionTest.1
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).apply("len", ParDo.of(new DoFn<String, Long>() { // from class: org.apache.beam.runners.flink.PortableExecutionTest.2
            @DoFn.ProcessElement
            public void process(DoFn<String, Long>.ProcessContext processContext) {
                processContext.output(Long.valueOf(((String) processContext.element()).length()));
            }
        })).apply("addKeys", WithKeys.of("foo")).setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())).apply("gbk", GroupByKey.create()).apply("collect", ParDo.of(new DoFn<KV<String, Iterable<Long>>, Void>() { // from class: org.apache.beam.runners.flink.PortableExecutionTest.3
            @DoFn.ProcessElement
            public void process(DoFn<KV<String, Iterable<Long>>, Void>.ProcessContext processContext) {
                PortableExecutionTest.outputValues.add((KV) processContext.element());
            }
        }));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create2);
        outputValues.clear();
        FlinkJobInvocation create3 = FlinkJobInvocation.create("fakeId", "fakeRetrievalToken", this.flinkJobExecutor, proto, create.as(FlinkPipelineOptions.class), Collections.EMPTY_LIST);
        create3.start();
        long currentTimeMillis = System.currentTimeMillis() + 60000;
        while (create3.getState() != JobApi.JobState.Enum.DONE && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals("job state", JobApi.JobState.Enum.DONE, create3.getState());
        Assert.assertEquals(1L, outputValues.size());
        Assert.assertEquals("foo", outputValues.get(0).getKey());
        Assert.assertThat((Iterable) outputValues.get(0).getValue(), Matchers.containsInAnyOrder(new Long[]{4L, 3L, 3L}));
    }
}
