/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class BoundedReadFromUnboundedSourceTest
implements Serializable {
    private static final int NUM_RECORDS = 100;
    @Rule
    public transient TestPipeline p = TestPipeline.create();

    @Test
    @Category(value={ValidatesRunner.class})
    public void testNoDedup() throws Exception {
        this.test(false, false);
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testDedup() throws Exception {
        this.test(true, false);
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testTimeBound() throws Exception {
        this.test(false, true);
    }

    @Test
    public void testForwardsDisplayData() {
        TestCountingSource src = new TestCountingSource(1234){

            public void populateDisplayData(DisplayData.Builder builder) {
                builder.add(DisplayData.item((String)"foo", (String)"bar"));
            }
        };
        BoundedReadFromUnboundedSource read = Read.from((UnboundedSource)src).withMaxNumRecords(5L);
        Assert.assertThat((Object)DisplayData.from((HasDisplayData)read), DisplayDataMatchers.includesDisplayDataFor("source", (HasDisplayData)src));
    }

    private void test(boolean dedup, boolean timeBound) throws Exception {
        TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
        if (dedup) {
            source = source.withDedup();
        }
        PCollection output = timeBound ? (PCollection)this.p.apply((PTransform)Read.from((UnboundedSource)source).withMaxReadTime(Duration.millis((long)200L))) : (PCollection)this.p.apply((PTransform)Read.from((UnboundedSource)source).withMaxNumRecords(100L));
        PAssert.that((PCollection)output).satisfies((SerializableFunction)new Checker(dedup, timeBound));
        this.p.run();
    }

    @Test
    public void testRespectsCheckpointContract() throws IOException {
        TestCountingSource source = new TestCountingSource(3);
        PipelineOptions options = PipelineOptionsFactory.create();
        TestCountingSource.CountingSourceReader reader = source.createReader(options, (TestCountingSource.CounterMark)null);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((long)0L, (long)((Integer)reader.getCurrent().getValue()).intValue());
        Assert.assertTrue((boolean)reader.advance());
        Assert.assertEquals((long)1L, (long)((Integer)reader.getCurrent().getValue()).intValue());
        TestCountingSource.CounterMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        reader = source.createReader(options, checkpoint);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((long)2L, (long)((Integer)reader.getCurrent().getValue()).intValue());
        Assert.assertFalse((boolean)reader.advance());
    }

    @Test
    public void testCanResumeWithExpandedCount() throws IOException {
        TestCountingSource source = new TestCountingSource(1);
        PipelineOptions options = PipelineOptionsFactory.create();
        TestCountingSource.CountingSourceReader reader = source.createReader(options, (TestCountingSource.CounterMark)null);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((long)0L, (long)((Integer)reader.getCurrent().getValue()).intValue());
        Assert.assertFalse((boolean)reader.advance());
        TestCountingSource.CounterMark checkpoint = reader.getCheckpointMark();
        checkpoint.finalizeCheckpoint();
        source = new TestCountingSource(2);
        reader = source.createReader(options, checkpoint);
        Assert.assertTrue((boolean)reader.start());
        Assert.assertEquals((long)1L, (long)((Integer)reader.getCurrent().getValue()).intValue());
        Assert.assertFalse((boolean)reader.advance());
    }

    public static class TestCountingSource
    extends UnboundedSource<KV<Integer, Integer>, CounterMark> {
        private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
        private static List<Integer> finalizeTracker;
        private final int numMessagesPerShard;
        private final int shardNumber;
        private final boolean dedup;
        private final boolean throwOnFirstSnapshot;
        private final boolean allowSplitting;
        private static boolean thrown;

        public static void setFinalizeTracker(List<Integer> finalizeTracker) {
            TestCountingSource.finalizeTracker = finalizeTracker;
        }

        public TestCountingSource(int numMessagesPerShard) {
            this(numMessagesPerShard, 0, false, false, true);
        }

        public TestCountingSource withDedup() {
            return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, true, this.throwOnFirstSnapshot, true);
        }

        private TestCountingSource withShardNumber(int shardNumber) {
            return new TestCountingSource(this.numMessagesPerShard, shardNumber, this.dedup, this.throwOnFirstSnapshot, true);
        }

        public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) {
            return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, throwOnFirstSnapshot, true);
        }

        public TestCountingSource withoutSplitting() {
            return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, this.throwOnFirstSnapshot, false);
        }

        private TestCountingSource(int numMessagesPerShard, int shardNumber, boolean dedup, boolean throwOnFirstSnapshot, boolean allowSplitting) {
            this.numMessagesPerShard = numMessagesPerShard;
            this.shardNumber = shardNumber;
            this.dedup = dedup;
            this.throwOnFirstSnapshot = throwOnFirstSnapshot;
            this.allowSplitting = allowSplitting;
        }

        public int getShardNumber() {
            return this.shardNumber;
        }

        public List<TestCountingSource> split(int desiredNumSplits, PipelineOptions options) {
            ArrayList<TestCountingSource> splits = new ArrayList<TestCountingSource>();
            int numSplits = this.allowSplitting ? desiredNumSplits : 1;
            for (int i = 0; i < numSplits; ++i) {
                splits.add(this.withShardNumber(i));
            }
            return splits;
        }

        public Coder<CounterMark> getCheckpointMarkCoder() {
            return DelegateCoder.of((Coder)VarIntCoder.of(), (DelegateCoder.CodingFunction & Serializable)input -> input.current, CounterMark::new);
        }

        public boolean requiresDeduping() {
            return this.dedup;
        }

        public CountingSourceReader createReader(PipelineOptions options, @Nullable CounterMark checkpointMark) {
            if (checkpointMark == null) {
                LOG.debug("creating reader");
            } else {
                LOG.debug("restoring reader from checkpoint with current = {}", (Object)checkpointMark.current);
            }
            return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
        }

        public Coder<KV<Integer, Integer>> getOutputCoder() {
            return KvCoder.of((Coder)VarIntCoder.of(), (Coder)VarIntCoder.of());
        }

        static {
            thrown = false;
        }

        public class CountingSourceReader
        extends UnboundedSource.UnboundedReader<KV<Integer, Integer>> {
            private int current;

            public CountingSourceReader(int startingPoint) {
                this.current = startingPoint;
            }

            public boolean start() {
                return this.advance();
            }

            public boolean advance() {
                if (this.current >= TestCountingSource.this.numMessagesPerShard - 1) {
                    return false;
                }
                if (this.current >= 0 && TestCountingSource.this.dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
                    return true;
                }
                ++this.current;
                return true;
            }

            public KV<Integer, Integer> getCurrent() {
                return KV.of((Object)TestCountingSource.this.shardNumber, (Object)this.current);
            }

            public Instant getCurrentTimestamp() {
                return new Instant((long)this.current);
            }

            public byte[] getCurrentRecordId() {
                try {
                    return CoderUtils.encodeToByteArray((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)VarIntCoder.of()), this.getCurrent());
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            public void close() {
            }

            public TestCountingSource getCurrentSource() {
                return TestCountingSource.this;
            }

            public Instant getWatermark() {
                return new Instant((long)(this.current + 1));
            }

            public CounterMark getCheckpointMark() {
                if (TestCountingSource.this.throwOnFirstSnapshot && !thrown) {
                    thrown = true;
                    LOG.error("Throwing exception while checkpointing counter");
                    throw new RuntimeException("failed during checkpoint");
                }
                return new CounterMark(this.current);
            }

            public long getSplitBacklogBytes() {
                return 7L;
            }
        }

        static class CounterMark
        implements UnboundedSource.CheckpointMark {
            int current;

            public CounterMark(int current) {
                this.current = current;
            }

            public void finalizeCheckpoint() {
                if (finalizeTracker != null) {
                    finalizeTracker.add(this.current);
                }
            }
        }
    }

    private static class Checker
    implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void> {
        private final boolean dedup;
        private final boolean timeBound;

        Checker(boolean dedup, boolean timeBound) {
            this.dedup = dedup;
            this.timeBound = timeBound;
        }

        public Void apply(Iterable<KV<Integer, Integer>> input) {
            ArrayList<Integer> values = new ArrayList<Integer>();
            for (KV<Integer, Integer> kv : input) {
                Assert.assertEquals((long)0L, (long)((Integer)kv.getKey()).intValue());
                values.add((Integer)kv.getValue());
            }
            if (this.timeBound) {
                Assert.assertTrue((values.size() >= 1 ? 1 : 0) != 0);
            } else if (this.dedup) {
                Assert.assertTrue((values.size() > 10 && values.size() <= 100 ? 1 : 0) != 0);
            } else {
                Assert.assertEquals((long)100L, (long)values.size());
            }
            Collections.sort(values);
            for (int i = 0; i < values.size(); ++i) {
                Assert.assertEquals((long)i, (long)((Integer)values.get(i)).intValue());
            }
            return null;
        }
    }
}

