package org.apache.beam.runners.direct;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.SplittableParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ContiguousSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.DiscreteDomain;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Range;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.class */
public class UnboundedReadEvaluatorFactoryTest {
    private PCollection<Long> longs;
    private UnboundedReadEvaluatorFactory factory;
    private EvaluationContext context;
    private UncommittedBundle<Long> output;
    private UnboundedSource<Long, ?> source;
    private DirectGraph graph;
    private BundleFactory bundleFactory = ImmutableListBundleFactory.create();

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Rule
    public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);

    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$LongToInstantFn.class */
    private static class LongToInstantFn implements SerializableFunction<Long, Instant> {
        private LongToInstantFn() {
        }

        @Override // org.apache.beam.sdk.transforms.SerializableFunction, org.apache.beam.sdk.transforms.ProcessFunction
        public Instant apply(Long l) {
            return new Instant(l);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestCheckpointMark.class */
    public static class TestCheckpointMark implements UnboundedSource.CheckpointMark {
        final int index;
        private boolean finalized;
        private boolean decoded;

        /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestCheckpointMark$Coder.class */
        public static class Coder extends AtomicCoder<TestCheckpointMark> {
            @Override // org.apache.beam.sdk.coders.Coder
            public void encode(TestCheckpointMark testCheckpointMark, OutputStream outputStream) throws IOException {
                VarInt.encode(testCheckpointMark.index, outputStream);
            }

            @Override // org.apache.beam.sdk.coders.Coder
            public TestCheckpointMark decode(InputStream inputStream) throws IOException {
                TestCheckpointMark testCheckpointMark = new TestCheckpointMark(VarInt.decodeInt(inputStream));
                testCheckpointMark.decoded = true;
                return testCheckpointMark;
            }
        }

        private TestCheckpointMark(int i) {
            this.finalized = false;
            this.decoded = false;
            this.index = i;
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource.CheckpointMark
        public void finalizeCheckpoint() throws IOException {
            Preconditions.checkState(!this.finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
            Preconditions.checkState(!this.decoded, "%s was finalized after being decoded", TestCheckpointMark.class.getSimpleName());
            this.finalized = true;
        }

        boolean isFinalized() {
            return this.finalized;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestUnboundedSource.class */
    public static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
        private static int getWatermarkCalls = 0;
        static int readerCreatedCount;
        static int readerClosedCount;
        static int readerAdvancedCount;
        private final Coder<T> coder;
        private final List<T> elems;
        private boolean dedupes;
        private boolean advanceWatermarkToInfinity;
        private boolean throwOnClose;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest$TestUnboundedSource$TestUnboundedReader.class */
        public class TestUnboundedReader extends UnboundedSource.UnboundedReader<T> {
            private final List<T> elems;
            private int index;
            private boolean closed = false;

            public TestUnboundedReader(List<T> list, int i) {
                this.elems = list;
                this.index = i;
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
            public boolean start() throws IOException {
                return advance();
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
            public boolean advance() throws IOException {
                TestUnboundedSource.readerAdvancedCount++;
                if (this.index + 1 >= this.elems.size()) {
                    return false;
                }
                this.index++;
                return true;
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
            public Instant getWatermark() {
                TestUnboundedSource.access$508();
                return (this.index + 1 == this.elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) ? BoundedWindow.TIMESTAMP_MAX_VALUE : new Instant(this.index + TestUnboundedSource.getWatermarkCalls);
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
            public UnboundedSource.CheckpointMark getCheckpointMark() {
                return new TestCheckpointMark(this.index);
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader, org.apache.beam.sdk.io.Source.Reader
            public UnboundedSource<T, ?> getCurrentSource() {
                return TestUnboundedSource.this;
            }

            @Override // org.apache.beam.sdk.io.Source.Reader
            public T getCurrent() throws NoSuchElementException {
                return this.elems.get(this.index);
            }

            @Override // org.apache.beam.sdk.io.Source.Reader
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return new Instant(this.index);
            }

            @Override // org.apache.beam.sdk.io.UnboundedSource.UnboundedReader
            public byte[] getCurrentRecordId() {
                try {
                    return CoderUtils.encodeToByteArray(TestUnboundedSource.this.coder, getCurrent());
                } catch (CoderException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override // org.apache.beam.sdk.io.Source.Reader, java.lang.AutoCloseable
            public void close() throws IOException {
                try {
                    TestUnboundedSource.readerClosedCount++;
                    MatcherAssert.assertThat(Boolean.valueOf(this.closed), Matchers.is(false));
                    if (TestUnboundedSource.this.throwOnClose) {
                        throw new IOException(String.format("%s throws on close", TestUnboundedSource.this));
                    }
                } finally {
                    this.closed = true;
                }
            }
        }

        public TestUnboundedSource(Coder<T> coder, T... tArr) {
            this(coder, false, Arrays.asList(tArr));
        }

        private TestUnboundedSource(Coder<T> coder, boolean z, List<T> list) {
            this.dedupes = false;
            this.advanceWatermarkToInfinity = false;
            readerCreatedCount = 0;
            readerClosedCount = 0;
            readerAdvancedCount = 0;
            this.coder = coder;
            this.elems = list;
            this.throwOnClose = z;
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public List<? extends UnboundedSource<T, TestCheckpointMark>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            return ImmutableList.of(this);
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public UnboundedSource.UnboundedReader<T> createReader(PipelineOptions pipelineOptions, TestCheckpointMark testCheckpointMark) {
            Preconditions.checkState(testCheckpointMark == null || testCheckpointMark.decoded, "Cannot resume from a checkpoint that has not been decoded");
            readerCreatedCount++;
            return new TestUnboundedReader(this.elems, testCheckpointMark == null ? -1 : testCheckpointMark.index);
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
            return new TestCheckpointMark.Coder();
        }

        @Override // org.apache.beam.sdk.io.UnboundedSource
        public boolean requiresDeduping() {
            return this.dedupes;
        }

        @Override // org.apache.beam.sdk.io.Source
        public Coder<T> getOutputCoder() {
            return this.coder;
        }

        public TestUnboundedSource<T> throwsOnClose() {
            return new TestUnboundedSource<>(this.coder, true, this.elems);
        }

        static /* synthetic */ int access$508() {
            int i = getWatermarkCalls;
            getWatermarkCalls = i + 1;
            return i;
        }
    }

    @Before
    public void setup() {
        this.source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
        this.longs = (PCollection) this.p.apply(Read.from(this.source));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(this.p);
        this.context = (EvaluationContext) Mockito.mock(EvaluationContext.class);
        this.factory = new UnboundedReadEvaluatorFactory(this.context, this.p.getOptions());
        this.output = this.bundleFactory.createBundle(this.longs);
        this.graph = DirectGraphs.getGraph(this.p);
        Mockito.when(this.context.createBundle(this.longs)).thenReturn(this.output);
    }

    @Test
    public void generatesInitialSplits() throws Exception {
        Mockito.when(this.context.createRootBundle()).thenAnswer(invocationOnMock -> {
            return this.bundleFactory.createRootBundle();
        });
        Collection initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(this.context, this.p.getOptions()).getInitialInputs(this.graph.getProducer(this.longs), 5);
        MatcherAssert.assertThat(initialInputs, Matchers.hasSize(5));
        int i = 5 * 100;
        ContiguousSet create = ContiguousSet.create(Range.closedOpen(0L, Long.valueOf(i)), DiscreteDomain.longs());
        ArrayList arrayList = new ArrayList(i);
        Iterator it = initialInputs.iterator();
        while (it.hasNext()) {
            WindowedValue windowedValue = (WindowedValue) Iterables.getOnlyElement(((CommittedBundle) it.next()).getElements());
            MatcherAssert.assertThat(windowedValue.getTimestamp(), Matchers.equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
            MatcherAssert.assertThat(windowedValue.getWindows(), Matchers.contains(new BoundedWindow[]{GlobalWindow.INSTANCE}));
            arrayList.addAll(SourceTestUtils.readNItemsFromUnstartedReader(((UnboundedReadEvaluatorFactory.UnboundedSourceShard) windowedValue.getValue()).getSource().createReader(PipelineOptionsFactory.create(), null), 100));
        }
        MatcherAssert.assertThat(arrayList, Matchers.containsInAnyOrder((Long[]) create.toArray(new Long[0])));
    }

    @Test
    public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        CommittedBundle committedBundle = (CommittedBundle) Iterables.getOnlyElement(new UnboundedReadEvaluatorFactory.InputProvider(this.context, this.p.getOptions()).getInitialInputs(this.graph.getProducer(this.longs), 1));
        UnboundedReadEvaluatorFactory.UnboundedSourceShard unboundedSourceShard = (UnboundedReadEvaluatorFactory.UnboundedSourceShard) ((WindowedValue) Iterables.getOnlyElement(committedBundle.getElements())).getValue();
        TransformEvaluator forApplication = this.factory.forApplication(this.graph.getProducer(this.longs), committedBundle);
        forApplication.processElement((WindowedValue) Iterables.getOnlyElement(committedBundle.getElements()));
        WindowedValue windowedValue = (WindowedValue) Iterables.getOnlyElement(forApplication.finishBundle().getUnprocessedElements());
        MatcherAssert.assertThat(windowedValue.getTimestamp(), Matchers.lessThan(DateTime.now().toInstant()));
        UnboundedReadEvaluatorFactory.UnboundedSourceShard unboundedSourceShard2 = (UnboundedReadEvaluatorFactory.UnboundedSourceShard) windowedValue.getValue();
        MatcherAssert.assertThat(unboundedSourceShard2.getSource(), Matchers.equalTo(unboundedSourceShard.getSource()));
        MatcherAssert.assertThat(unboundedSourceShard2.getCheckpoint(), Matchers.not(Matchers.nullValue()));
        MatcherAssert.assertThat(this.output.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), tgw(0L)}));
    }

    @Test
    public void unboundedSourceWithDuplicatesMultipleCalls() throws Exception {
        Long[] lArr = new Long[20];
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 20) {
                break;
            }
            lArr[(int) j2] = Long.valueOf(j2 % 5);
            j = j2 + 1;
        }
        TestUnboundedSource testUnboundedSource = new TestUnboundedSource(BigEndianLongCoder.of(), lArr);
        testUnboundedSource.dedupes = true;
        PCollection pCollection = (PCollection) this.p.apply(Read.from(testUnboundedSource));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(this.p);
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(pCollection);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Collection initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(this.context, this.p.getOptions()).getInitialInputs(producer, 1);
        UncommittedBundle createBundle = this.bundleFactory.createBundle(pCollection);
        Mockito.when(this.context.createBundle(pCollection)).thenReturn(createBundle);
        CommittedBundle committedBundle = (CommittedBundle) Iterables.getOnlyElement(initialInputs);
        TransformEvaluator forApplication = this.factory.forApplication(producer, committedBundle);
        Iterator it = committedBundle.getElements().iterator();
        while (it.hasNext()) {
            forApplication.processElement((WindowedValue) it.next());
        }
        TransformResult finishBundle = forApplication.finishBundle();
        MatcherAssert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)}));
        UncommittedBundle createBundle2 = this.bundleFactory.createBundle(this.longs);
        Mockito.when(this.context.createBundle(this.longs)).thenReturn(createBundle2);
        TransformEvaluator forApplication2 = this.factory.forApplication(producer, committedBundle);
        forApplication2.processElement((WindowedValue) Iterables.getOnlyElement(finishBundle.getUnprocessedElements()));
        forApplication2.finishBundle();
        MatcherAssert.assertThat(createBundle2.commit(Instant.now()).getElements(), Matchers.emptyIterable());
    }

    @Test
    public void noElementsAvailableReaderIncludedInResidual() throws Exception {
        PCollection pCollection = (PCollection) this.p.apply(Read.from(new TestUnboundedSource(VarLongCoder.of(), 1L)));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(this.p);
        AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(pCollection);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Collection initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(this.context, this.p.getOptions()).getInitialInputs(producer, 1);
        Mockito.when(this.context.createBundle(pCollection)).thenReturn(this.bundleFactory.createBundle(pCollection));
        CommittedBundle committedBundle = (CommittedBundle) Iterables.getOnlyElement(initialInputs);
        TransformEvaluator forApplication = this.factory.forApplication(producer, committedBundle);
        Iterator it = committedBundle.getElements().iterator();
        while (it.hasNext()) {
            forApplication.processElement((WindowedValue) it.next());
        }
        TransformResult finishBundle = forApplication.finishBundle();
        UncommittedBundle createBundle = this.bundleFactory.createBundle(this.longs);
        Mockito.when(this.context.createBundle(this.longs)).thenReturn(createBundle);
        TransformEvaluator forApplication2 = this.factory.forApplication(producer, committedBundle);
        WindowedValue windowedValue = (WindowedValue) Iterables.getOnlyElement(finishBundle.getUnprocessedElements());
        forApplication2.processElement(windowedValue);
        TransformResult finishBundle2 = forApplication2.finishBundle();
        MatcherAssert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.emptyIterable());
        WindowedValue windowedValue2 = (WindowedValue) Iterables.getOnlyElement(finishBundle2.getUnprocessedElements());
        MatcherAssert.assertThat(windowedValue2.getTimestamp(), Matchers.greaterThan(windowedValue.getTimestamp()));
        MatcherAssert.assertThat(((UnboundedReadEvaluatorFactory.UnboundedSourceShard) windowedValue2.getValue()).getExistingReader(), Matchers.not(Matchers.nullValue()));
    }

    @Test
    public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception {
        TestUnboundedSource testUnboundedSource = new TestUnboundedSource(BigEndianLongCoder.of(), (Long[]) ContiguousSet.create(Range.openClosed(0L, Long.valueOf(1000)), DiscreteDomain.longs()).toArray(new Long[0]));
        testUnboundedSource.advanceWatermarkToInfinity = true;
        PCollection pCollection = (PCollection) this.p.apply(Read.from(testUnboundedSource));
        SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReads(this.p);
        AppliedPTransform producer = DirectGraphs.getGraph(this.p).getProducer(pCollection);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        UncommittedBundle uncommittedBundle = (UncommittedBundle) Mockito.mock(UncommittedBundle.class);
        Mockito.when(this.context.createBundle(pCollection)).thenReturn(uncommittedBundle);
        CommittedBundle commit = this.bundleFactory.createRootBundle().add(WindowedValue.valueInGlobalWindow(UnboundedReadEvaluatorFactory.UnboundedSourceShard.unstarted(testUnboundedSource, UnboundedReadDeduplicator.NeverDeduplicator.create()))).commit(Instant.now());
        UnboundedReadEvaluatorFactory unboundedReadEvaluatorFactory = new UnboundedReadEvaluatorFactory(this.context, this.p.getOptions(), 1.0d);
        new UnboundedReadEvaluatorFactory.InputProvider(this.context, this.p.getOptions()).getInitialInputs(producer, 1);
        CommittedBundle committedBundle = commit;
        do {
            TransformEvaluator forApplication = unboundedReadEvaluatorFactory.forApplication(producer, committedBundle);
            forApplication.processElement((WindowedValue) Iterables.getOnlyElement(committedBundle.getElements()));
            committedBundle = commit.withElements(forApplication.finishBundle().getUnprocessedElements());
        } while (!Iterables.isEmpty(committedBundle.getElements()));
        ((UncommittedBundle) Mockito.verify(uncommittedBundle, Mockito.times(1000))).add((WindowedValue) org.mockito.Matchers.any());
        MatcherAssert.assertThat(Integer.valueOf(TestUnboundedSource.readerCreatedCount), Matchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(TestUnboundedSource.readerClosedCount), Matchers.equalTo(1));
    }

    @Test
    public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception {
        TestUnboundedSource testUnboundedSource = new TestUnboundedSource(BigEndianLongCoder.of(), (Long[]) ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs()).toArray(new Long[0]));
        PCollection pCollection = (PCollection) this.p.apply(Read.from(testUnboundedSource));
        AppliedPTransform producer = DirectGraphs.getGraph(this.p).getProducer(pCollection);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Mockito.when(this.context.createBundle(pCollection)).thenReturn(this.bundleFactory.createBundle(pCollection));
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(UnboundedReadEvaluatorFactory.UnboundedSourceShard.unstarted(testUnboundedSource, UnboundedReadDeduplicator.NeverDeduplicator.create()));
        CommittedBundle commit = this.bundleFactory.createRootBundle().add(valueInGlobalWindow).commit(Instant.now());
        UnboundedReadEvaluatorFactory unboundedReadEvaluatorFactory = new UnboundedReadEvaluatorFactory(this.context, this.p.getOptions(), 0.0d);
        TransformEvaluator forApplication = unboundedReadEvaluatorFactory.forApplication(producer, commit);
        forApplication.processElement(valueInGlobalWindow);
        CommittedBundle withElements = commit.withElements(forApplication.finishBundle().getUnprocessedElements());
        TransformEvaluator forApplication2 = unboundedReadEvaluatorFactory.forApplication(producer, withElements);
        forApplication2.processElement((WindowedValue) Iterables.getOnlyElement(withElements.getElements()));
        forApplication2.finishBundle();
        MatcherAssert.assertThat(Integer.valueOf(TestUnboundedSource.readerClosedCount), Matchers.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(((TestCheckpointMark) ((UnboundedReadEvaluatorFactory.UnboundedSourceShard) ((WindowedValue) Iterables.getOnlyElement(withElements.getElements())).getValue()).getCheckpoint()).isFinalized()), Matchers.is(true));
    }

    @Test
    public void evaluatorThrowsInCloseRethrows() throws Exception {
        TestUnboundedSource throwsOnClose = new TestUnboundedSource(BigEndianLongCoder.of(), (Long[]) ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs()).toArray(new Long[0])).throwsOnClose();
        PCollection pCollection = (PCollection) this.p.apply(Read.from(throwsOnClose));
        AppliedPTransform producer = DirectGraphs.getGraph(this.p).getProducer(pCollection);
        Mockito.when(this.context.createRootBundle()).thenReturn(this.bundleFactory.createRootBundle());
        Mockito.when(this.context.createBundle(pCollection)).thenReturn(this.bundleFactory.createBundle(pCollection));
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(UnboundedReadEvaluatorFactory.UnboundedSourceShard.unstarted(throwsOnClose, UnboundedReadDeduplicator.NeverDeduplicator.create()));
        TransformEvaluator forApplication = new UnboundedReadEvaluatorFactory(this.context, this.p.getOptions(), 0.0d).forApplication(producer, this.bundleFactory.createRootBundle().add(valueInGlobalWindow).commit(Instant.now()));
        this.thrown.expect(IOException.class);
        this.thrown.expectMessage("throws on close");
        forApplication.processElement(valueInGlobalWindow);
    }

    @Test
    public void emptySource() throws Exception {
        TestUnboundedSource.readerClosedCount = 0;
        TestUnboundedSource<String> testUnboundedSource = new TestUnboundedSource<>(StringUtf8Coder.of(), new String[0]);
        ((TestUnboundedSource) testUnboundedSource).advanceWatermarkToInfinity = true;
        processElement(testUnboundedSource);
        Assert.assertEquals(1L, TestUnboundedSource.readerClosedCount);
        TestUnboundedSource.readerClosedCount = 0;
    }

    @Test(expected = IOException.class)
    public void sourceThrowingException() throws Exception {
        TestUnboundedSource<String> testUnboundedSource = new TestUnboundedSource<>(StringUtf8Coder.of(), new String[0]);
        ((TestUnboundedSource) testUnboundedSource).advanceWatermarkToInfinity = true;
        ((TestUnboundedSource) testUnboundedSource).throwOnClose = true;
        processElement(testUnboundedSource);
    }

    private void processElement(TestUnboundedSource<String> testUnboundedSource) throws Exception {
        UnboundedReadEvaluatorFactory unboundedReadEvaluatorFactory = new UnboundedReadEvaluatorFactory(EvaluationContext.create(MockClock.fromInstant(Instant.now()), CloningBundleFactory.create(), DirectGraph.create(Collections.emptyMap(), Collections.emptyMap(), LinkedListMultimap.create(), Collections.emptySet(), Collections.emptyMap()), Collections.emptySet(), Executors.newCachedThreadPool()), this.p.getOptions());
        SplittableParDo.PrimitiveUnboundedRead primitiveUnboundedRead = new SplittableParDo.PrimitiveUnboundedRead(Read.from(testUnboundedSource));
        Pipeline create = Pipeline.create(this.p.getOptions());
        TransformEvaluator forApplication = unboundedReadEvaluatorFactory.forApplication(AppliedPTransform.of("test", new HashMap(), Collections.singletonMap(new TupleTag(), (PCollection) create.apply(primitiveUnboundedRead)), primitiveUnboundedRead, ResourceHints.create(), create), (CommittedBundle) null);
        WindowedValue of = WindowedValue.of(UnboundedReadEvaluatorFactory.UnboundedSourceShard.of(testUnboundedSource, new UnboundedReadDeduplicator.NeverDeduplicator(), testUnboundedSource.createReader(this.p.getOptions(), (TestCheckpointMark) null), (UnboundedSource.CheckpointMark) null), BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
        TestUnboundedSource.readerClosedCount = 0;
        forApplication.processElement(of);
    }

    private static WindowedValue<Long> tgw(Long l) {
        return WindowedValue.timestampedValueInGlobalWindow(l, new Instant(l));
    }
}
