package org.apache.beam.runners.direct;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.class */
public class UnboundedReadEvaluatorFactoryTest {
    private PCollection<Long> longs;
    private TransformEvaluatorFactory factory;
    private InProcessEvaluationContext context;
    private InProcessPipelineRunner.UncommittedBundle<Long> output;
    private BundleFactory bundleFactory = InProcessBundleFactory.create();

    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$LongToInstantFn.class */
    private static class LongToInstantFn implements SerializableFunction<Long, Instant> {
        private LongToInstantFn() {
        }

        public Instant apply(Long l) {
            return new Instant(l);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestCheckpointMark.class */
    public static class TestCheckpointMark implements UnboundedSource.CheckpointMark {

        /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestCheckpointMark$Coder.class */
        public static class Coder extends AtomicCoder<TestCheckpointMark> {
            public void encode(TestCheckpointMark testCheckpointMark, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
            }

            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public TestCheckpointMark m11decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
                return new TestCheckpointMark();
            }
        }

        private TestCheckpointMark() {
        }

        public void finalizeCheckpoint() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestUnboundedSource.class */
    public static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
        static int readerClosedCount;
        private final Coder<T> coder;
        private final List<T> elems;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestUnboundedSource$TestUnboundedReader.class */
        public class TestUnboundedReader extends UnboundedSource.UnboundedReader<T> {
            private final List<T> elems;
            private int index = -1;

            public TestUnboundedReader(List<T> list) {
                this.elems = list;
            }

            public boolean start() throws IOException {
                return advance();
            }

            public boolean advance() throws IOException {
                if (this.index + 1 >= this.elems.size()) {
                    return false;
                }
                this.index++;
                return true;
            }

            public Instant getWatermark() {
                return Instant.now();
            }

            public UnboundedSource.CheckpointMark getCheckpointMark() {
                return new TestCheckpointMark();
            }

            /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
            public UnboundedSource<T, ?> m12getCurrentSource() {
                return TestUnboundedSource.this;
            }

            public T getCurrent() throws NoSuchElementException {
                return this.elems.get(this.index);
            }

            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return Instant.now();
            }

            public void close() throws IOException {
                TestUnboundedSource.readerClosedCount++;
            }
        }

        public TestUnboundedSource(Coder<T> coder, T... tArr) {
            readerClosedCount = 0;
            this.coder = coder;
            this.elems = Arrays.asList(tArr);
        }

        public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(int i, PipelineOptions pipelineOptions) throws Exception {
            return ImmutableList.of(this);
        }

        public UnboundedSource.UnboundedReader<T> createReader(PipelineOptions pipelineOptions, TestCheckpointMark testCheckpointMark) {
            return new TestUnboundedReader(this.elems);
        }

        @Nullable
        public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
            return new TestCheckpointMark.Coder();
        }

        public void validate() {
        }

        public Coder<T> getDefaultOutputCoder() {
            return this.coder;
        }
    }

    @Before
    public void setup() {
        this.longs = TestPipeline.create().apply(Read.from(CountingSource.unboundedWithTimestampFn(new LongToInstantFn())));
        this.factory = new UnboundedReadEvaluatorFactory();
        this.context = (InProcessEvaluationContext) Mockito.mock(InProcessEvaluationContext.class);
        this.output = this.bundleFactory.createRootBundle(this.longs);
        Mockito.when(this.context.createRootBundle(this.longs)).thenReturn(this.output);
    }

    @Test
    public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle().getWatermarkHold(), Matchers.lessThan(DateTime.now().toInstant()));
        Assert.assertThat(this.output.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L)}));
    }

    @Test
    public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception {
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle().getWatermarkHold(), Matchers.lessThan(DateTime.now().toInstant()));
        Assert.assertThat(this.output.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L)}));
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(this.longs);
        Mockito.when(this.context.createRootBundle(this.longs)).thenReturn(createRootBundle);
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle().getWatermarkHold(), Matchers.lessThan(DateTime.now().toInstant()));
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), tgw(17L), tgw(16L), tgw(15L), tgw(13L), tgw(10L)}));
    }

    @Test
    public void evaluatorClosesReader() throws Exception {
        PCollection apply = TestPipeline.create().apply(Read.from(new TestUnboundedSource(BigEndianLongCoder.of(), 1L, 2L, 3L)));
        AppliedPTransform producingTransformInternal = apply.getProducingTransformInternal();
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(apply);
        Mockito.when(this.context.createRootBundle(apply)).thenReturn(createRootBundle);
        this.factory.forApplication(producingTransformInternal, (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle();
        Assert.assertThat(ImmutableList.copyOf(createRootBundle.commit(Instant.now()).getElements()), Matchers.hasSize(3));
        Assert.assertThat(Integer.valueOf(TestUnboundedSource.readerClosedCount), Matchers.equalTo(1));
    }

    @Test
    public void evaluatorNoElementsClosesReader() throws Exception {
        PCollection apply = TestPipeline.create().apply(Read.from(new TestUnboundedSource(BigEndianLongCoder.of(), new Long[0])));
        AppliedPTransform producingTransformInternal = apply.getProducingTransformInternal();
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(apply);
        Mockito.when(this.context.createRootBundle(apply)).thenReturn(createRootBundle);
        this.factory.forApplication(producingTransformInternal, (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle();
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.emptyIterable());
        Assert.assertThat(Integer.valueOf(TestUnboundedSource.readerClosedCount), Matchers.equalTo(1));
    }

    @Test
    public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception {
        TransformEvaluator forApplication = this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context);
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context), Matchers.nullValue());
        Assert.assertThat(forApplication.finishBundle().getWatermarkHold(), Matchers.lessThan(DateTime.now().toInstant()));
        Assert.assertThat(this.output.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L)}));
    }

    private static WindowedValue<Long> tgw(Long l) {
        return WindowedValue.timestampedValueInGlobalWindow(l, new Instant(l));
    }
}
