package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.OffsetBasedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.class */
public class BoundedReadEvaluatorFactoryTest {
    private BoundedSource<Long> source;
    private PCollection<Long> longs;
    private BoundedReadEvaluatorFactory factory;

    @Mock
    private EvaluationContext context;
    private BundleFactory bundleFactory;
    private AppliedPTransform<?, ?, ?> longsProducer;

    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest$TestReader.class */
    private static class TestReader<T> extends OffsetBasedSource.OffsetBasedReader<T> {
        private final Source<T> initialSource;
        private final int sleepIndex;
        private final CountDownLatch dynamicallySplit;
        private int index;

        TestReader(OffsetBasedSource<T> offsetBasedSource, int i, CountDownLatch countDownLatch) {
            super(offsetBasedSource);
            this.initialSource = offsetBasedSource;
            this.sleepIndex = i;
            this.dynamicallySplit = countDownLatch;
            this.index = -1;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public TestSource<T> m3getCurrentSource() {
            return (TestSource) super.getCurrentSource();
        }

        protected long getCurrentOffset() throws NoSuchElementException {
            return this.index;
        }

        public boolean startImpl() throws IOException {
            return advanceImpl();
        }

        public boolean advanceImpl() throws IOException {
            if (this.index + 1 == this.sleepIndex && this.sleepIndex < ((TestSource) m3getCurrentSource()).elems.length) {
                try {
                    this.dynamicallySplit.await();
                    do {
                    } while (this.initialSource.equals(m3getCurrentSource()));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException(e);
                }
            }
            if (((TestSource) m3getCurrentSource()).elems.length <= this.index + 1) {
                return false;
            }
            this.index++;
            return true;
        }

        public T getCurrent() throws NoSuchElementException {
            return (T) ((TestSource) m3getCurrentSource()).elems[this.index];
        }

        public void close() throws IOException {
            boolean unused = TestSource.readerClosed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest$TestSource.class */
    public static class TestSource<T> extends OffsetBasedSource<T> {
        private static boolean readerClosed;
        private final Coder<T> coder;
        private final T[] elems;
        private final int firstSplitIndex;
        private transient CountDownLatch subrangesCompleted;

        public TestSource(Coder<T> coder, T... tArr) {
            this(coder, tArr.length, tArr);
        }

        public TestSource(Coder<T> coder, int i, T... tArr) {
            super(0L, tArr.length, 1L);
            this.elems = tArr;
            this.coder = coder;
            this.firstSplitIndex = i;
            readerClosed = false;
            this.subrangesCompleted = new CountDownLatch(2);
        }

        public List<? extends OffsetBasedSource<T>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
            return ImmutableList.of(this);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            return this.elems.length;
        }

        public boolean producesSortedKeys(PipelineOptions pipelineOptions) throws Exception {
            return false;
        }

        public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
            this.subrangesCompleted = new CountDownLatch(2);
            return new TestReader(this, this.firstSplitIndex, this.subrangesCompleted);
        }

        public void validate() {
        }

        public long getMaxEndOffset(PipelineOptions pipelineOptions) throws Exception {
            return this.elems.length;
        }

        public OffsetBasedSource<T> createSourceForSubrange(long j, long j2) {
            this.subrangesCompleted.countDown();
            return new TestSource(this.coder, Arrays.copyOfRange(this.elems, (int) j, (int) j2));
        }

        public Coder<T> getDefaultOutputCoder() {
            return this.coder;
        }
    }

    @Before
    public void setup() {
        MockitoAnnotations.initMocks(this);
        this.source = CountingSource.upTo(10L);
        this.longs = TestPipeline.create().apply(Read.from(this.source));
        this.factory = new BoundedReadEvaluatorFactory(this.context, Long.MAX_VALUE);
        this.bundleFactory = ImmutableListBundleFactory.create();
        this.longsProducer = DirectGraphs.getProducer(this.longs);
    }

    @Test
    public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Mockito.when(this.context.createBundle(this.longs)).thenReturn(this.bundleFactory.createBundle(this.longs));
        Collection<DirectRunner.CommittedBundle> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(this.context).getInitialInputs(this.longsProducer, 1);
        ArrayList arrayList = new ArrayList();
        for (DirectRunner.CommittedBundle committedBundle : initialInputs) {
            TransformEvaluator forApplication = this.factory.forApplication(this.longsProducer, (DirectRunner.CommittedBundle) null);
            Iterator it = committedBundle.getElements().iterator();
            while (it.hasNext()) {
                forApplication.processElement((WindowedValue) it.next());
            }
            TransformResult finishBundle = forApplication.finishBundle();
            Assert.assertThat(finishBundle.getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
            Assert.assertThat(Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(Integer.valueOf(Iterables.size(committedBundle.getElements()))));
            Iterator it2 = finishBundle.getOutputBundles().iterator();
            while (it2.hasNext()) {
                Iterator it3 = ((DirectRunner.UncommittedBundle) it2.next()).commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements().iterator();
                while (it3.hasNext()) {
                    arrayList.add((WindowedValue) it3.next());
                }
            }
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
    }

    @Test
    public void boundedSourceEvaluatorProducesDynamicSplits() throws Exception {
        BoundedReadEvaluatorFactory boundedReadEvaluatorFactory = new BoundedReadEvaluatorFactory(this.context, 0L);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Long[] lArr = new Long[10];
        for (int i = 0; i < 10; i++) {
            lArr[i] = Long.valueOf(i);
        }
        PCollection apply = TestPipeline.create().apply(Read.from(new TestSource(VarLongCoder.of(), 5, lArr)));
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(apply);
        Collection<DirectRunner.CommittedBundle> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(this.context).getInitialInputs(producer, 1);
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (!initialInputs.isEmpty()) {
            i2++;
            Mockito.when(this.context.createBundle(apply)).thenReturn(this.bundleFactory.createBundle(apply));
            ArrayList arrayList2 = new ArrayList();
            for (DirectRunner.CommittedBundle committedBundle : initialInputs) {
                TransformEvaluator forApplication = boundedReadEvaluatorFactory.forApplication(producer, (DirectRunner.CommittedBundle) null);
                Iterator it = committedBundle.getElements().iterator();
                while (it.hasNext()) {
                    forApplication.processElement((WindowedValue) it.next());
                }
                TransformResult finishBundle = forApplication.finishBundle();
                Assert.assertThat(finishBundle.getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
                Assert.assertThat(Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(Integer.valueOf(Iterables.size(committedBundle.getElements()))));
                Iterator it2 = finishBundle.getOutputBundles().iterator();
                while (it2.hasNext()) {
                    Iterator it3 = ((DirectRunner.UncommittedBundle) it2.next()).commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements().iterator();
                    while (it3.hasNext()) {
                        arrayList.add((WindowedValue) it3.next());
                    }
                }
                if (!Iterables.isEmpty(finishBundle.getUnprocessedElements())) {
                    arrayList2.add(committedBundle.withElements(finishBundle.getUnprocessedElements()));
                }
            }
            initialInputs = arrayList2;
        }
        Assert.assertThat(Integer.valueOf(i2), Matchers.greaterThan(1));
        WindowedValue[] windowedValueArr = new WindowedValue[10];
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 10) {
                Assert.assertThat(arrayList, Matchers.containsInAnyOrder(windowedValueArr));
                return;
            } else {
                windowedValueArr[(int) j2] = gw(Long.valueOf(j2));
                j = j2 + 1;
            }
        }
    }

    @Test
    public void boundedSourceEvaluatorDynamicSplitsUnsplittable() throws Exception {
        BoundedReadEvaluatorFactory boundedReadEvaluatorFactory = new BoundedReadEvaluatorFactory(this.context, 0L);
        PCollection apply = TestPipeline.create().apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(apply);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Collection<DirectRunner.CommittedBundle> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(this.context).getInitialInputs(producer, 1);
        Mockito.when(this.context.createBundle(apply)).thenReturn(this.bundleFactory.createBundle(apply));
        ArrayList arrayList = new ArrayList();
        for (DirectRunner.CommittedBundle committedBundle : initialInputs) {
            TransformEvaluator forApplication = boundedReadEvaluatorFactory.forApplication(producer, (DirectRunner.CommittedBundle) null);
            Iterator it = committedBundle.getElements().iterator();
            while (it.hasNext()) {
                forApplication.processElement((WindowedValue) it.next());
            }
            TransformResult finishBundle = forApplication.finishBundle();
            Assert.assertThat(finishBundle.getWatermarkHold(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
            Assert.assertThat(Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(Integer.valueOf(Iterables.size(committedBundle.getElements()))));
            Iterator it2 = finishBundle.getOutputBundles().iterator();
            while (it2.hasNext()) {
                Iterator it3 = ((DirectRunner.UncommittedBundle) it2.next()).commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements().iterator();
                while (it3.hasNext()) {
                    arrayList.add((WindowedValue) it3.next());
                }
            }
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
    }

    @Test
    public void getInitialInputsSplitsIntoBundles() throws Exception {
        Mockito.when(this.context.createRootBundle()).thenAnswer(new Answer<DirectRunner.UncommittedBundle<?>>() { // from class: org.apache.beam.runners.direct.BoundedReadEvaluatorFactoryTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public DirectRunner.UncommittedBundle<?> m0answer(InvocationOnMock invocationOnMock) throws Throwable {
                return BoundedReadEvaluatorFactoryTest.this.bundleFactory.createRootBundle();
            }
        });
        Collection initialInputs = new BoundedReadEvaluatorFactory.InputProvider(this.context).getInitialInputs(this.longsProducer, 3);
        Assert.assertThat(initialInputs, Matchers.hasSize(Matchers.allOf(Matchers.greaterThanOrEqualTo(3), Matchers.lessThanOrEqualTo(4))));
        ArrayList arrayList = new ArrayList();
        Iterator it = initialInputs.iterator();
        while (it.hasNext()) {
            WindowedValue windowedValue = (WindowedValue) Iterables.getOnlyElement(((DirectRunner.CommittedBundle) it.next()).getElements());
            Assert.assertThat(windowedValue.getWindows(), Matchers.contains(new BoundedWindow[]{GlobalWindow.INSTANCE}));
            Assert.assertThat(windowedValue.getTimestamp(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
            arrayList.add(((BoundedReadEvaluatorFactory.BoundedSourceShard) windowedValue.getValue()).getSource());
        }
        SourceTestUtils.assertSourcesEqualReferenceSource(this.source, arrayList, PipelineOptionsFactory.create());
    }

    @Test
    public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        List splitIntoBundles = this.source.splitIntoBundles(this.source.getEstimatedSizeBytes(create) / 2, create);
        DirectRunner.UncommittedBundle createRootBundle = this.bundleFactory.createRootBundle();
        Iterator it = splitIntoBundles.iterator();
        while (it.hasNext()) {
            createRootBundle.add(WindowedValue.valueInGlobalWindow(BoundedReadEvaluatorFactory.BoundedSourceShard.of((BoundedSource) it.next())));
        }
        DirectRunner.CommittedBundle commit = createRootBundle.commit(Instant.now());
        TransformEvaluator forApplication = this.factory.forApplication(this.longsProducer, commit);
        for (WindowedValue windowedValue : commit.getElements()) {
            Mockito.when(this.context.createBundle(this.longs)).thenReturn(this.bundleFactory.createBundle(this.longs));
            forApplication.processElement(windowedValue);
        }
        TransformResult finishBundle = forApplication.finishBundle();
        Assert.assertThat(Integer.valueOf(Iterables.size(finishBundle.getOutputBundles())), Matchers.equalTo(Integer.valueOf(splitIntoBundles.size())));
        ArrayList arrayList = new ArrayList();
        Iterator it2 = finishBundle.getOutputBundles().iterator();
        while (it2.hasNext()) {
            Iterator it3 = ((DirectRunner.UncommittedBundle) it2.next()).commit(Instant.now()).getElements().iterator();
            while (it3.hasNext()) {
                arrayList.add((WindowedValue) it3.next());
            }
        }
        Assert.assertThat(arrayList, Matchers.containsInAnyOrder(new WindowedValue[]{gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)}));
    }

    @Test
    public void boundedSourceEvaluatorClosesReader() throws Exception {
        TestSource testSource = new TestSource(BigEndianLongCoder.of(), 1L, 2L, 3L);
        PCollection apply = TestPipeline.create().apply(Read.from(testSource));
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(apply);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(apply);
        Mockito.when(this.context.createBundle(apply)).thenReturn(createBundle);
        TransformEvaluator forApplication = this.factory.forApplication(producer, this.bundleFactory.createRootBundle().commit(Instant.now()));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(BoundedReadEvaluatorFactory.BoundedSourceShard.of(testSource)));
        forApplication.finishBundle();
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{gw(2L), gw(3L), gw(1L)}));
        Assert.assertThat(Boolean.valueOf(TestSource.readerClosed), Matchers.is(true));
    }

    @Test
    public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
        TestSource testSource = new TestSource(BigEndianLongCoder.of(), new Long[0]);
        PCollection apply = TestPipeline.create().apply(Read.from(testSource));
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(apply);
        DirectRunner.UncommittedBundle createBundle = this.bundleFactory.createBundle(apply);
        Mockito.when(this.context.createBundle(apply)).thenReturn(createBundle);
        TransformEvaluator forApplication = this.factory.forApplication(producer, this.bundleFactory.createRootBundle().commit(Instant.now()));
        forApplication.processElement(WindowedValue.valueInGlobalWindow(BoundedReadEvaluatorFactory.BoundedSourceShard.of(testSource)));
        forApplication.finishBundle();
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.emptyIterable());
        Assert.assertThat(Boolean.valueOf(TestSource.readerClosed), Matchers.is(true));
    }

    @Test
    public void cleanupShutsDownExecutor() {
        this.factory.cleanup();
        Assert.assertThat(Boolean.valueOf(this.factory.executor.isShutdown()), Matchers.is(true));
    }

    private static WindowedValue<Long> gw(Long l) {
        return WindowedValue.valueInGlobalWindow(l);
    }
}
