/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCountingSource
extends UnboundedSource<KV<Integer, Integer>, CounterMark> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
    private static @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Integer> finalizeTracker;
    private final @UnknownKeyFor @NonNull @Initialized int numMessagesPerShard;
    private final @UnknownKeyFor @NonNull @Initialized int shardNumber;
    private final @UnknownKeyFor @NonNull @Initialized boolean dedup;
    private final @UnknownKeyFor @NonNull @Initialized boolean throwOnFirstSnapshot;
    private final @UnknownKeyFor @NonNull @Initialized int fixedNumSplits;
    private volatile transient @UnknownKeyFor @NonNull @Initialized boolean haltEmission;
    private static @UnknownKeyFor @NonNull @Initialized boolean thrown;

    public static void setFinalizeTracker(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Integer> finalizeTracker) {
        TestCountingSource.finalizeTracker = finalizeTracker;
    }

    public TestCountingSource(@UnknownKeyFor @NonNull @Initialized int numMessagesPerShard) {
        this(numMessagesPerShard, 0, false, false, -1);
    }

    public @UnknownKeyFor @NonNull @Initialized TestCountingSource withDedup() {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, true, this.throwOnFirstSnapshot, -1);
    }

    private @UnknownKeyFor @NonNull @Initialized TestCountingSource withShardNumber(@UnknownKeyFor @NonNull @Initialized int shardNumber) {
        return new TestCountingSource(this.numMessagesPerShard, shardNumber, this.dedup, this.throwOnFirstSnapshot, -1);
    }

    public @UnknownKeyFor @NonNull @Initialized TestCountingSource withThrowOnFirstSnapshot(@UnknownKeyFor @NonNull @Initialized boolean throwOnFirstSnapshot) {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, throwOnFirstSnapshot, -1);
    }

    public @UnknownKeyFor @NonNull @Initialized TestCountingSource withoutSplitting() {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, this.throwOnFirstSnapshot, 1);
    }

    public @UnknownKeyFor @NonNull @Initialized TestCountingSource withFixedNumSplits(@UnknownKeyFor @NonNull @Initialized int maxNumSplits) {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, this.throwOnFirstSnapshot, maxNumSplits);
    }

    private TestCountingSource(@UnknownKeyFor @NonNull @Initialized int numMessagesPerShard, @UnknownKeyFor @NonNull @Initialized int shardNumber, @UnknownKeyFor @NonNull @Initialized boolean dedup, @UnknownKeyFor @NonNull @Initialized boolean throwOnFirstSnapshot, @UnknownKeyFor @NonNull @Initialized int fixedNumSplits) {
        this.numMessagesPerShard = numMessagesPerShard;
        this.shardNumber = shardNumber;
        this.dedup = dedup;
        this.throwOnFirstSnapshot = throwOnFirstSnapshot;
        this.fixedNumSplits = fixedNumSplits;
    }

    void haltEmission() {
        this.haltEmission = true;
    }

    void continueEmission() {
        this.haltEmission = false;
    }

    public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized TestCountingSource> split(@UnknownKeyFor @NonNull @Initialized int desiredNumSplits, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        ArrayList<TestCountingSource> splits = new ArrayList<TestCountingSource>();
        int actualNumSplits = this.fixedNumSplits == -1 ? desiredNumSplits : this.fixedNumSplits;
        for (int i = 0; i < actualNumSplits; ++i) {
            splits.add(this.withShardNumber(i));
        }
        return splits;
    }

    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized CounterMark> getCheckpointMarkCoder() {
        return DelegateCoder.of((Coder)VarIntCoder.of(), (DelegateCoder.CodingFunction)new FromCounterMark(), (DelegateCoder.CodingFunction)new ToCounterMark());
    }

    public @UnknownKeyFor @NonNull @Initialized boolean requiresDeduping() {
        return this.dedup;
    }

    public @UnknownKeyFor @NonNull @Initialized TestCountingSource. @UnknownKeyFor @NonNull @Initialized CountingSourceReader createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @Nullable @UnknownKeyFor @Initialized CounterMark checkpointMark) {
        if (checkpointMark == null) {
            LOG.debug("creating reader");
        } else {
            LOG.debug("restoring reader from checkpoint with current = {}", (Object)checkpointMark.current);
        }
        return new CountingSourceReader(checkpointMark != null ? checkpointMark.current : -1);
    }

    public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Integer>> getOutputCoder() {
        return KvCoder.of((Coder)VarIntCoder.of(), (Coder)VarIntCoder.of());
    }

    static {
        thrown = false;
    }

    private static class ToCounterMark
    implements DelegateCoder.CodingFunction<Integer, CounterMark> {
        private ToCounterMark() {
        }

        public @UnknownKeyFor @NonNull @Initialized CounterMark apply(@UnknownKeyFor @NonNull @Initialized Integer input) {
            return new CounterMark(input);
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            return ToCounterMark.class.hashCode();
        }

        @EnsuresNonNullIf(expression={"#1"}, result=true)
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean equals(@Nullable @UnknownKeyFor @Initialized Object obj) {
            return obj instanceof ToCounterMark;
        }
    }

    private static class FromCounterMark
    implements DelegateCoder.CodingFunction<CounterMark, Integer> {
        private FromCounterMark() {
        }

        public @UnknownKeyFor @NonNull @Initialized Integer apply(@UnknownKeyFor @NonNull @Initialized CounterMark input) {
            return input.current;
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized int hashCode() {
            return FromCounterMark.class.hashCode();
        }

        @EnsuresNonNullIf(expression={"#1"}, result=true)
        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean equals(@Nullable @UnknownKeyFor @Initialized Object obj) {
            return obj instanceof FromCounterMark;
        }
    }

    public class CountingSourceReader
    extends UnboundedSource.UnboundedReader<KV<Integer, Integer>> {
        private @UnknownKeyFor @NonNull @Initialized int current;

        public CountingSourceReader(int startingPoint) {
            this.current = startingPoint;
        }

        public @UnknownKeyFor @NonNull @Initialized boolean start() {
            return this.advance();
        }

        public @UnknownKeyFor @NonNull @Initialized boolean advance() {
            if (this.current >= TestCountingSource.this.numMessagesPerShard - 1 || TestCountingSource.this.haltEmission) {
                return false;
            }
            if (this.current >= 0 && TestCountingSource.this.dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
                return true;
            }
            ++this.current;
            return true;
        }

        public @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized Integer, @UnknownKeyFor @NonNull @Initialized Integer> getCurrent() {
            return KV.of((Object)TestCountingSource.this.shardNumber, (Object)this.current);
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getCurrentTimestamp() {
            return new Instant((long)this.current);
        }

        public @UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] getCurrentRecordId() {
            try {
                return CoderUtils.encodeToByteArray((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)VarIntCoder.of()), this.getCurrent());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public void close() {
        }

        public @UnknownKeyFor @NonNull @Initialized TestCountingSource getCurrentSource() {
            return TestCountingSource.this;
        }

        public @UnknownKeyFor @NonNull @Initialized Instant getWatermark() {
            if (this.current >= TestCountingSource.this.numMessagesPerShard - 1) {
                return new Instant((Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
            }
            return new Instant((long)(this.current + 1));
        }

        public @UnknownKeyFor @NonNull @Initialized CounterMark getCheckpointMark() {
            if (TestCountingSource.this.throwOnFirstSnapshot && !thrown) {
                thrown = true;
                LOG.error("Throwing exception while checkpointing counter");
                throw new RuntimeException("failed during checkpoint");
            }
            return new CounterMark(this.current);
        }

        public @UnknownKeyFor @NonNull @Initialized long getSplitBacklogBytes() {
            return 7L;
        }
    }

    static class CounterMark
    implements UnboundedSource.CheckpointMark {
        @UnknownKeyFor @NonNull @Initialized int current;

        public CounterMark(@UnknownKeyFor @NonNull @Initialized int current) {
            this.current = current;
        }

        public void finalizeCheckpoint() {
            if (finalizeTracker != null) {
                finalizeTracker.add(this.current);
            }
        }
    }
}

