package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/ReadSourcePortableTest.class */
public class ReadSourcePortableTest implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ReadSourcePortableTest.class);

    @Parameterized.Parameter
    public boolean isStreaming;
    private static ListeningExecutorService flinkJobExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/ReadSourcePortableTest$Source.class */
    public static class Source extends UnboundedSource<Long, Checkpoint> {
        private final int count;
        private final Instant now = Instant.now();

        /* loaded from: input_file:org/apache/beam/runners/flink/ReadSourcePortableTest$Source$Checkpoint.class */
        private static class Checkpoint implements UnboundedSource.CheckpointMark, Serializable {
            final int pos;

            Checkpoint(int i) {
                this.pos = i;
            }

            public void finalizeCheckpoint() {
            }
        }

        Source(int i) {
            this.count = i;
        }

        public List<? extends UnboundedSource<Long, Checkpoint>> split(int i, PipelineOptions pipelineOptions) {
            return Collections.singletonList(this);
        }

        public UnboundedSource.UnboundedReader<Long> createReader(PipelineOptions pipelineOptions, Checkpoint checkpoint) {
            return new UnboundedSource.UnboundedReader<Long>() { // from class: org.apache.beam.runners.flink.ReadSourcePortableTest.Source.1
                int pos = -1;

                public boolean start() {
                    return advance();
                }

                public boolean advance() {
                    int i = this.pos + 1;
                    this.pos = i;
                    return i < Source.this.count;
                }

                public Instant getWatermark() {
                    return this.pos < Source.this.count ? BoundedWindow.TIMESTAMP_MIN_VALUE : BoundedWindow.TIMESTAMP_MAX_VALUE;
                }

                public UnboundedSource.CheckpointMark getCheckpointMark() {
                    return new Checkpoint(this.pos);
                }

                /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
                public UnboundedSource<Long, ?> m11getCurrentSource() {
                    return Source.this;
                }

                /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
                public Long m12getCurrent() throws NoSuchElementException {
                    return Long.valueOf(this.pos);
                }

                public Instant getCurrentTimestamp() throws NoSuchElementException {
                    return Source.this.now;
                }

                public void close() {
                }
            };
        }

        public boolean requiresDeduping() {
            return false;
        }

        public Coder<Long> getOutputCoder() {
            return SerializableCoder.of(Long.class);
        }

        public Coder<Checkpoint> getCheckpointMarkCoder() {
            return SerializableCoder.of(Checkpoint.class);
        }
    }

    @Parameterized.Parameters(name = "streaming: {0}")
    public static Object[] data() {
        return new Object[]{true, false};
    }

    @BeforeClass
    public static void setup() {
        flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
    }

    @AfterClass
    public static void tearDown() throws InterruptedException {
        flinkJobExecutor.shutdown();
        flinkJobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        if (!flinkJobExecutor.isShutdown()) {
            LOG.warn("Could not shutdown Flink job executor");
        }
        flinkJobExecutor = null;
    }

    @Test(timeout = 120000)
    public void testExecution() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.fromArgs(new String[]{"--experiments=use_deprecated_read"}).create();
        create.setRunner(CrashingRunner.class);
        create.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
        create.as(FlinkPipelineOptions.class).setStreaming(this.isStreaming);
        create.as(FlinkPipelineOptions.class).setParallelism(2);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        PAssert.that(create2.apply(Read.from(new Source(10))).apply(Window.into(FixedWindows.of(Duration.millis(1L))))).containsInAnyOrder(ImmutableList.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(create2);
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create2);
        MatcherAssert.assertThat((List) proto.getComponents().getTransformsMap().values().stream().filter(pTransform -> {
            return pTransform.getSpec().getUrn().equals("beam:transform:read:v1");
        }).collect(Collectors.toList()), Matchers.not(Matchers.empty()));
        JobInvocation createJobInvocation = FlinkJobInvoker.create((FlinkJobServerDriver.FlinkServerConfiguration) null).createJobInvocation("fakeId", "fakeRetrievalToken", flinkJobExecutor, proto, create.as(FlinkPipelineOptions.class), new FlinkPipelineRunner(create.as(FlinkPipelineOptions.class), (String) null, Collections.emptyList()));
        createJobInvocation.start();
        while (createJobInvocation.getState() != JobApi.JobState.Enum.DONE) {
            MatcherAssert.assertThat(createJobInvocation.getState(), Matchers.not(JobApi.JobState.Enum.FAILED));
            Thread.sleep(100L);
        }
    }
}
