package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.class */
public class BoundedReadEvaluatorFactoryTest {
    private BoundedSource<Long> source;
    private PCollection<Long> longs;
    private TransformEvaluatorFactory factory;

    @Mock
    private InProcessEvaluationContext context;
    private BundleFactory bundleFactory;

    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest$TestReader.class */
    private static class TestReader<T> extends BoundedSource.BoundedReader<T> {
        private final BoundedSource<T> source;
        private final List<T> elems;
        private int index = -1;

        public TestReader(BoundedSource<T> boundedSource, T... tArr) {
            this.source = boundedSource;
            this.elems = Arrays.asList(tArr);
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public BoundedSource<T> m0getCurrentSource() {
            return this.source;
        }

        public boolean start() throws IOException {
            return advance();
        }

        public boolean advance() throws IOException {
            if (this.elems.size() <= this.index + 1) {
                return false;
            }
            this.index++;
            return true;
        }

        public T getCurrent() throws NoSuchElementException {
            return this.elems.get(this.index);
        }

        public void close() throws IOException {
            boolean unused = TestSource.readerClosed = true;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest$TestSource.class */
    private static class TestSource<T> extends BoundedSource<T> {
        private static boolean readerClosed;
        private final Coder<T> coder;
        private final T[] elems;

        public TestSource(Coder<T> coder, T... tArr) {
            this.elems = tArr;
            this.coder = coder;
            readerClosed = false;
        }

        public List<? extends BoundedSource<T>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
            return ImmutableList.of(this);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            return 0L;
        }

        public boolean producesSortedKeys(PipelineOptions pipelineOptions) throws Exception {
            return false;
        }

        public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
            return new TestReader(this, this.elems);
        }

        public void validate() {
        }

        public Coder<T> getDefaultOutputCoder() {
            return this.coder;
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.source = CountingSource.upTo(10L);
        this.longs = TestPipeline.create().apply(Read.from(this.source));
        this.factory = new BoundedReadEvaluatorFactory();
        this.bundleFactory = InProcessBundleFactory.create();
    }

    @Test
    public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(this.longs);
        Mockito.when(this.context.createRootBundle(this.longs)).thenReturn(createRootBundle);
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle().getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        Assert.assertThat(createRootBundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
    }

    @Test
    public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception {
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(this.longs);
        Mockito.when(this.context.createRootBundle(this.longs)).thenReturn(createRootBundle);
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle().getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        Assert.assertThat(createRootBundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
        Mockito.when(this.context.createRootBundle(this.longs)).thenReturn(this.bundleFactory.createRootBundle(this.longs));
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context), Matchers.nullValue());
    }

    @Test
    public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(this.longs);
        InProcessPipelineRunner.UncommittedBundle createRootBundle2 = this.bundleFactory.createRootBundle(this.longs);
        Mockito.when(this.context.createRootBundle(this.longs)).thenReturn(createRootBundle).thenReturn(createRootBundle2);
        TransformEvaluator forApplication = this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context);
        Assert.assertThat(this.factory.forApplication(this.longs.getProducingTransformInternal(), (InProcessPipelineRunner.CommittedBundle) null, this.context), Matchers.nullValue());
        Assert.assertThat(forApplication.finishBundle().getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
        Iterable elements = createRootBundle.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
        Assert.assertThat(elements, Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
        Assert.assertThat(createRootBundle2.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), Matchers.emptyIterable());
        Assert.assertThat(elements, Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
    }

    @Test
    public void boundedSourceEvaluatorClosesReader() throws Exception {
        PCollection apply = TestPipeline.create().apply(Read.from(new TestSource(BigEndianLongCoder.of(), 1L, 2L, 3L)));
        AppliedPTransform producingTransformInternal = apply.getProducingTransformInternal();
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(apply);
        Mockito.when(this.context.createRootBundle(apply)).thenReturn(createRootBundle);
        this.factory.forApplication(producingTransformInternal, (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle();
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{gw(2L), gw(3L), gw(1L)}));
        Assert.assertThat(Boolean.valueOf(TestSource.readerClosed), Matchers.is(true));
    }

    @Test
    public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
        PCollection apply = TestPipeline.create().apply(Read.from(new TestSource(BigEndianLongCoder.of(), new Long[0])));
        AppliedPTransform producingTransformInternal = apply.getProducingTransformInternal();
        InProcessPipelineRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle(apply);
        Mockito.when(this.context.createRootBundle(apply)).thenReturn(createRootBundle);
        this.factory.forApplication(producingTransformInternal, (InProcessPipelineRunner.CommittedBundle) null, this.context).finishBundle();
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.emptyIterable());
        Assert.assertThat(Boolean.valueOf(TestSource.readerClosed), Matchers.is(true));
    }

    private static WindowedValue<Long> gw(Long l) {
        return WindowedValue.valueInGlobalWindow(l);
    }
}
