/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CountingSourceTest {
    @Rule
    public TestPipeline p = TestPipeline.create();

    public static void addCountingAsserts(PCollection<Long> input, long numElements) {
        PAssert.thatSingleton((PCollection)((PCollection)input.apply("Count", Count.globally()))).isEqualTo((Object)numElements);
        PAssert.thatSingleton((PCollection)((PCollection)((PCollection)input.apply((PTransform)Distinct.create())).apply("UniqueCount", Count.globally()))).isEqualTo((Object)numElements);
        PAssert.thatSingleton((PCollection)((PCollection)input.apply("Min", (PTransform)Min.globally()))).isEqualTo((Object)0L);
        PAssert.thatSingleton((PCollection)((PCollection)input.apply("Max", (PTransform)Max.globally()))).isEqualTo((Object)(numElements - 1L));
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testBoundedSource() {
        long numElements = 1000L;
        PCollection input = (PCollection)this.p.apply((PTransform)Read.from((BoundedSource)CountingSource.upTo((long)numElements)));
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testEmptyBoundedSource() {
        PCollection input = (PCollection)this.p.apply((PTransform)Read.from((BoundedSource)CountingSource.upTo((long)0L)));
        PAssert.that((PCollection)input).empty();
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testBoundedSourceSplits() throws Exception {
        long numElements = 1000L;
        long numSplits = 10L;
        long splitSizeBytes = numElements * 8L / numSplits;
        BoundedSource initial = CountingSource.upTo((long)numElements);
        List splits = initial.split(splitSizeBytes, this.p.getOptions());
        Assert.assertEquals((String)"Expected exact splitting", (long)numSplits, (long)splits.size());
        PCollectionList pcollections = PCollectionList.empty((Pipeline)this.p);
        for (int i = 0; i < splits.size(); ++i) {
            BoundedSource split = (BoundedSource)splits.get(i);
            pcollections = pcollections.and((PCollection)this.p.apply("split" + i, (PTransform)Read.from((BoundedSource)split)));
            Assert.assertEquals((String)"Expected even splitting", (long)splitSizeBytes, (long)split.getEstimatedSizeBytes(this.p.getOptions()));
        }
        PCollection input = (PCollection)pcollections.apply((PTransform)Flatten.pCollections());
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    public void testProgress() throws IOException {
        int numRecords = 5;
        BoundedSource source = CountingSource.upTo((long)5L);
        try (BoundedSource.BoundedReader reader = source.createReader(PipelineOptionsFactory.create());){
            Assert.assertEquals((double)0.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
            Assert.assertEquals((long)0L, (long)reader.getSplitPointsConsumed());
            Assert.assertEquals((long)5L, (long)reader.getSplitPointsRemaining());
            Assert.assertTrue((boolean)reader.start());
            int i = 0;
            do {
                Assert.assertEquals((long)i, (long)reader.getSplitPointsConsumed());
                Assert.assertEquals((long)(5 - i), (long)reader.getSplitPointsRemaining());
                ++i;
            } while (reader.advance());
            Assert.assertEquals((long)5L, (long)i);
            Assert.assertEquals((double)1.0, (double)reader.getFractionConsumed(), (double)1.0E-6);
            Assert.assertEquals((long)5L, (long)reader.getSplitPointsConsumed());
            Assert.assertEquals((long)0L, (long)reader.getSplitPointsRemaining());
        }
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testUnboundedSource() {
        long numElements = 1000L;
        PCollection input = (PCollection)this.p.apply((PTransform)Read.from((UnboundedSource)CountingSource.unbounded()).withMaxNumRecords(numElements));
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testUnboundedSourceTimestamps() {
        long numElements = 1000L;
        PCollection input = (PCollection)this.p.apply((PTransform)Read.from((UnboundedSource)CountingSource.unboundedWithTimestampFn((SerializableFunction)new ValueAsTimestampFn())).withMaxNumRecords(numElements));
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        PCollection diffs = (PCollection)((PCollection)input.apply("TimestampDiff", (PTransform)ParDo.of((DoFn)new ElementValueDiff()))).apply("DistinctTimestamps", (PTransform)Distinct.create());
        PAssert.thatSingleton((PCollection)diffs).isEqualTo((Object)0L);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testUnboundedSourceWithRate() {
        Duration period = Duration.millis((long)5L);
        long numElements = 1000L;
        PCollection input = (PCollection)this.p.apply((PTransform)Read.from((UnboundedSource)CountingSource.createUnboundedFrom((long)0L).withTimestampFn((SerializableFunction)new ValueAsTimestampFn()).withRate(1L, period)).withMaxNumRecords(numElements));
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        PCollection diffs = (PCollection)((PCollection)input.apply("TimestampDiff", (PTransform)ParDo.of((DoFn)new ElementValueDiff()))).apply("DistinctTimestamps", (PTransform)Distinct.create());
        PAssert.thatSingleton((PCollection)diffs).isEqualTo((Object)0L);
        Instant started = Instant.now();
        this.p.run();
        Instant finished = Instant.now();
        Duration expectedDuration = period.multipliedBy((long)((int)numElements));
        Assert.assertThat((Object)started.plus((ReadableDuration)expectedDuration).isBefore((ReadableInstant)finished), (Matcher)Matchers.is((Object)true));
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testUnboundedSourceSplits() throws Exception {
        long numElements = 1000L;
        int numSplits = 10;
        UnboundedSource initial = CountingSource.unbounded();
        List splits = initial.split(numSplits, this.p.getOptions());
        Assert.assertEquals((String)"Expected exact splitting", (long)numSplits, (long)splits.size());
        long elementsPerSplit = numElements / (long)numSplits;
        Assert.assertEquals((String)"Expected even splits", (long)numElements, (long)(elementsPerSplit * (long)numSplits));
        PCollectionList pcollections = PCollectionList.empty((Pipeline)this.p);
        for (int i = 0; i < splits.size(); ++i) {
            pcollections = pcollections.and((PCollection)this.p.apply("split" + i, (PTransform)Read.from((UnboundedSource)((UnboundedSource)splits.get(i))).withMaxNumRecords(elementsPerSplit)));
        }
        PCollection input = (PCollection)pcollections.apply((PTransform)Flatten.pCollections());
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testUnboundedSourceRateSplits() throws Exception {
        int elementsPerPeriod = 10;
        Duration period = Duration.millis((long)5L);
        long numElements = 1000L;
        int numSplits = 10;
        CountingSource.UnboundedCountingSource initial = CountingSource.createUnboundedFrom((long)0L).withRate((long)elementsPerPeriod, period);
        List splits = initial.split(numSplits, this.p.getOptions());
        Assert.assertEquals((String)"Expected exact splitting", (long)numSplits, (long)splits.size());
        long elementsPerSplit = numElements / (long)numSplits;
        Assert.assertEquals((String)"Expected even splits", (long)numElements, (long)(elementsPerSplit * (long)numSplits));
        PCollectionList pcollections = PCollectionList.empty((Pipeline)this.p);
        for (int i = 0; i < splits.size(); ++i) {
            pcollections = pcollections.and((PCollection)this.p.apply("split" + i, (PTransform)Read.from((UnboundedSource)((UnboundedSource)splits.get(i))).withMaxNumRecords(elementsPerSplit)));
        }
        PCollection input = (PCollection)pcollections.apply((PTransform)Flatten.pCollections());
        CountingSourceTest.addCountingAsserts((PCollection<Long>)input, numElements);
        Instant startTime = Instant.now();
        this.p.run();
        Instant endTime = Instant.now();
        long expectedMinimumMillis = numElements * period.getMillis() / (long)elementsPerPeriod;
        Assert.assertThat((Object)expectedMinimumMillis, (Matcher)Matchers.lessThan((Comparable)Long.valueOf(endTime.getMillis() - startTime.getMillis())));
    }

    @Test
    public void testUnboundedSourceCheckpointMark() throws Exception {
        UnboundedSource source = CountingSource.unboundedWithTimestampFn((SerializableFunction)new ValueAsTimestampFn());
        UnboundedSource.UnboundedReader reader = source.createReader(null, null);
        long numToSkip = 3L;
        Assert.assertTrue((boolean)reader.start());
        for (long l = 0L; l < 3L; ++l) {
            reader.advance();
        }
        Assert.assertEquals((long)3L, (long)((Long)reader.getCurrent()));
        Assert.assertEquals((long)3L, (long)reader.getCurrentTimestamp().getMillis());
        CountingSource.CounterMark mark = (CountingSource.CounterMark)CoderUtils.clone((Coder)source.getCheckpointMarkCoder(), (Object)((CountingSource.CounterMark)reader.getCheckpointMark()));
        reader = source.createReader(null, (UnboundedSource.CheckpointMark)mark);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((long)4L, (long)((Long)reader.getCurrent()));
        Assert.assertEquals((long)4L, (long)reader.getCurrentTimestamp().getMillis());
    }

    private static class ValueAsTimestampFn
    implements SerializableFunction<Long, Instant> {
        private ValueAsTimestampFn() {
        }

        public Instant apply(Long input) {
            return new Instant((Object)input);
        }
    }

    private static class ElementValueDiff
    extends DoFn<Long, Long> {
        private ElementValueDiff() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws Exception {
            c.output((Object)((Long)c.element() - c.timestamp().getMillis()));
        }
    }
}

