package org.apache.beam.sdk.io;

import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/CountingSourceTest.class */
public class CountingSourceTest {

    @Rule
    public TestPipeline p = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSourceTest$ElementValueDiff.class */
    private static class ElementValueDiff extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Long>.ProcessContext processContext) throws Exception {
            processContext.output(Long.valueOf(processContext.element().longValue() - processContext.timestamp().getMillis()));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/CountingSourceTest$ValueAsTimestampFn.class */
    private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> {
        private ValueAsTimestampFn() {
        }

        @Override // org.apache.beam.sdk.transforms.SerializableFunction
        public Instant apply(Long l) {
            return new Instant(l);
        }
    }

    public static void addCountingAsserts(PCollection<Long> pCollection, long j) {
        PAssert.thatSingleton((PCollection) pCollection.apply("Count", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton((PCollection) ((PCollection) pCollection.apply(Distinct.create())).apply("UniqueCount", Count.globally())).isEqualTo(Long.valueOf(j));
        PAssert.thatSingleton((PCollection) pCollection.apply("Min", Min.globally())).isEqualTo(0L);
        PAssert.thatSingleton((PCollection) pCollection.apply("Max", Max.globally())).isEqualTo(Long.valueOf(j - 1));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testBoundedSource() {
        addCountingAsserts((PCollection) this.p.apply(Read.from(CountingSource.upTo(1000L))), 1000L);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEmptyBoundedSource() {
        PAssert.that((PCollection) this.p.apply(Read.from(CountingSource.upTo(0L)))).empty();
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testBoundedSourceSplits() throws Exception {
        long j = (1000 * 8) / 10;
        List<? extends BoundedSource<Long>> split = CountingSource.upTo(1000L).split(j, this.p.getOptions());
        Assert.assertEquals("Expected exact splitting", 10L, split.size());
        PCollectionList empty = PCollectionList.empty(this.p);
        for (int i = 0; i < split.size(); i++) {
            BoundedSource<Long> boundedSource = split.get(i);
            empty = empty.and((PCollection) this.p.apply("split" + i, Read.from(boundedSource)));
            Assert.assertEquals("Expected even splitting", j, boundedSource.getEstimatedSizeBytes(this.p.getOptions()));
        }
        addCountingAsserts((PCollection) empty.apply(Flatten.pCollections()), 1000L);
        this.p.run();
    }

    @Test
    public void testProgress() throws IOException {
        BoundedSource.BoundedReader<Long> createReader = CountingSource.upTo(5L).createReader(PipelineOptionsFactory.create());
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(0.0d, createReader.getFractionConsumed().doubleValue(), 1.0E-6d);
                Assert.assertEquals(0L, createReader.getSplitPointsConsumed());
                Assert.assertEquals(5L, createReader.getSplitPointsRemaining());
                Assert.assertTrue(createReader.start());
                int i = 0;
                do {
                    Assert.assertEquals(i, createReader.getSplitPointsConsumed());
                    Assert.assertEquals(5 - i, createReader.getSplitPointsRemaining());
                    i++;
                } while (createReader.advance());
                Assert.assertEquals(5L, i);
                Assert.assertEquals(1.0d, createReader.getFractionConsumed().doubleValue(), 1.0E-6d);
                Assert.assertEquals(5L, createReader.getSplitPointsConsumed());
                Assert.assertEquals(0L, createReader.getSplitPointsRemaining());
                if (createReader != null) {
                    if (0 == 0) {
                        createReader.close();
                        return;
                    }
                    try {
                        createReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createReader != null) {
                if (th != null) {
                    try {
                        createReader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createReader.close();
                }
            }
            throw th4;
        }
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSource() {
        addCountingAsserts((PCollection) this.p.apply(Read.from(CountingSource.unbounded()).withMaxNumRecords(1000L)), 1000L);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSourceTimestamps() {
        PCollection pCollection = (PCollection) this.p.apply(Read.from(CountingSource.unboundedWithTimestampFn(new ValueAsTimestampFn())).withMaxNumRecords(1000L));
        addCountingAsserts(pCollection, 1000L);
        PAssert.thatSingleton((PCollection) ((PCollection) pCollection.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSourceWithRate() {
        Duration millis = Duration.millis(5L);
        PCollection pCollection = (PCollection) this.p.apply(Read.from(CountingSource.createUnboundedFrom(0L).withTimestampFn(new ValueAsTimestampFn()).withRate(1L, millis)).withMaxNumRecords(1000L));
        addCountingAsserts(pCollection, 1000L);
        PAssert.thatSingleton((PCollection) ((PCollection) pCollection.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))).apply("DistinctTimestamps", Distinct.create())).isEqualTo(0L);
        Instant now = Instant.now();
        this.p.run();
        Assert.assertThat(Boolean.valueOf(now.plus(millis.multipliedBy((int) 1000)).isBefore(Instant.now())), Matchers.is(true));
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testUnboundedSourceSplits() throws Exception {
        List<? extends UnboundedSource<Long, CountingSource.CounterMark>> split = CountingSource.unbounded().split(10, this.p.getOptions());
        Assert.assertEquals("Expected exact splitting", 10, split.size());
        long j = 1000 / 10;
        Assert.assertEquals("Expected even splits", 1000L, j * 10);
        PCollectionList empty = PCollectionList.empty(this.p);
        for (int i = 0; i < split.size(); i++) {
            empty = empty.and((PCollection) this.p.apply("split" + i, Read.from(split.get(i)).withMaxNumRecords(j)));
        }
        addCountingAsserts((PCollection) empty.apply(Flatten.pCollections()), 1000L);
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testUnboundedSourceRateSplits() throws Exception {
        Duration millis = Duration.millis(5L);
        List<? extends UnboundedSource<Long, CountingSource.CounterMark>> split = CountingSource.createUnboundedFrom(0L).withRate(10, millis).split(10, this.p.getOptions());
        Assert.assertEquals("Expected exact splitting", 10, split.size());
        long j = 1000 / 10;
        Assert.assertEquals("Expected even splits", 1000L, j * 10);
        PCollectionList empty = PCollectionList.empty(this.p);
        for (int i = 0; i < split.size(); i++) {
            empty = empty.and((PCollection) this.p.apply("split" + i, Read.from(split.get(i)).withMaxNumRecords(j)));
        }
        addCountingAsserts((PCollection) empty.apply(Flatten.pCollections()), 1000L);
        Instant now = Instant.now();
        this.p.run();
        Assert.assertThat(Long.valueOf((1000 * millis.getMillis()) / 10), Matchers.lessThan(Long.valueOf(Instant.now().getMillis() - now.getMillis())));
    }

    @Test
    public void testUnboundedSourceCheckpointMark() throws Exception {
        UnboundedSource<Long, CountingSource.CounterMark> unboundedWithTimestampFn = CountingSource.unboundedWithTimestampFn(new ValueAsTimestampFn());
        UnboundedSource.UnboundedReader<Long> createReader = unboundedWithTimestampFn.createReader(null, null);
        Assert.assertTrue(createReader.start());
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 3) {
                Assert.assertEquals(3L, createReader.getCurrent().longValue());
                Assert.assertEquals(3L, createReader.getCurrentTimestamp().getMillis());
                UnboundedSource.UnboundedReader<Long> createReader2 = unboundedWithTimestampFn.createReader(null, (CountingSource.CounterMark) CoderUtils.clone(unboundedWithTimestampFn.getCheckpointMarkCoder(), (CountingSource.CounterMark) createReader.getCheckpointMark()));
                Assert.assertTrue(createReader2.start());
                Assert.assertEquals(4L, createReader2.getCurrent().longValue());
                Assert.assertEquals(4L, createReader2.getCurrentTimestamp().getMillis());
                return;
            }
            createReader.advance();
            j = j2 + 1;
        }
    }
}
