package org.apache.beam.runners.flink.streaming;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/streaming/TestCountingSource.class */
public class TestCountingSource extends UnboundedSource<KV<Integer, Integer>, CounterMark> {
    private static List<Integer> finalizeTracker;
    private final int numMessagesPerShard;
    private final int shardNumber;
    private final boolean dedup;
    private final boolean throwOnFirstSnapshot;
    private final boolean allowSplitting;
    private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
    private static boolean thrown = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/TestCountingSource$CounterMark.class */
    public class CounterMark implements UnboundedSource.CheckpointMark {
        int current;

        public CounterMark(int i) {
            this.current = i;
        }

        public void finalizeCheckpoint() {
            if (TestCountingSource.finalizeTracker != null) {
                TestCountingSource.finalizeTracker.add(Integer.valueOf(this.current));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/streaming/TestCountingSource$CountingSourceReader.class */
    public class CountingSourceReader extends UnboundedSource.UnboundedReader<KV<Integer, Integer>> {
        private int current;

        public CountingSourceReader(int i) {
            this.current = i;
        }

        public boolean start() {
            return advance();
        }

        public boolean advance() {
            if (this.current >= TestCountingSource.this.numMessagesPerShard - 1) {
                return false;
            }
            if (this.current >= 0 && TestCountingSource.this.dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
                return true;
            }
            this.current++;
            return true;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public KV<Integer, Integer> m14getCurrent() {
            return KV.of(Integer.valueOf(TestCountingSource.this.shardNumber), Integer.valueOf(this.current));
        }

        public Instant getCurrentTimestamp() {
            return new Instant(this.current);
        }

        public byte[] getCurrentRecordId() {
            try {
                return CoderUtils.encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), m14getCurrent());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public void close() {
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public TestCountingSource m13getCurrentSource() {
            return TestCountingSource.this;
        }

        public Instant getWatermark() {
            return new Instant(this.current + 1);
        }

        /* renamed from: getCheckpointMark, reason: merged with bridge method [inline-methods] */
        public CounterMark m12getCheckpointMark() {
            if (!TestCountingSource.this.throwOnFirstSnapshot || TestCountingSource.thrown) {
                return new CounterMark(this.current);
            }
            boolean unused = TestCountingSource.thrown = true;
            TestCountingSource.LOG.error("Throwing exception while checkpointing counter");
            throw new RuntimeException("failed during checkpoint");
        }

        public long getSplitBacklogBytes() {
            return 7L;
        }
    }

    public static void setFinalizeTracker(List<Integer> list) {
        finalizeTracker = list;
    }

    public TestCountingSource(int i) {
        this(i, 0, false, false, true);
    }

    public TestCountingSource withDedup() {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, true, this.throwOnFirstSnapshot, true);
    }

    private TestCountingSource withShardNumber(int i) {
        return new TestCountingSource(this.numMessagesPerShard, i, this.dedup, this.throwOnFirstSnapshot, true);
    }

    public TestCountingSource withThrowOnFirstSnapshot(boolean z) {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, z, true);
    }

    public TestCountingSource withoutSplitting() {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, this.throwOnFirstSnapshot, false);
    }

    private TestCountingSource(int i, int i2, boolean z, boolean z2, boolean z3) {
        this.numMessagesPerShard = i;
        this.shardNumber = i2;
        this.dedup = z;
        this.throwOnFirstSnapshot = z2;
        this.allowSplitting = z3;
    }

    public int getShardNumber() {
        return this.shardNumber;
    }

    public List<TestCountingSource> generateInitialSplits(int i, PipelineOptions pipelineOptions) {
        ArrayList arrayList = new ArrayList();
        int i2 = this.allowSplitting ? i : 1;
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.add(withShardNumber(i3));
        }
        return arrayList;
    }

    public Coder<CounterMark> getCheckpointMarkCoder() {
        return DelegateCoder.of(VarIntCoder.of(), new DelegateCoder.CodingFunction<CounterMark, Integer>() { // from class: org.apache.beam.runners.flink.streaming.TestCountingSource.1
            public Integer apply(CounterMark counterMark) {
                return Integer.valueOf(counterMark.current);
            }
        }, new DelegateCoder.CodingFunction<Integer, CounterMark>() { // from class: org.apache.beam.runners.flink.streaming.TestCountingSource.2
            public CounterMark apply(Integer num) {
                return new CounterMark(num.intValue());
            }
        });
    }

    public boolean requiresDeduping() {
        return this.dedup;
    }

    public CountingSourceReader createReader(PipelineOptions pipelineOptions, @Nullable CounterMark counterMark) {
        if (counterMark == null) {
            LOG.debug("creating reader");
        } else {
            LOG.debug("restoring reader from checkpoint with current = {}", Integer.valueOf(counterMark.current));
        }
        return new CountingSourceReader(counterMark != null ? counterMark.current : -1);
    }

    public void validate() {
    }

    public Coder<KV<Integer, Integer>> getDefaultOutputCoder() {
        return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
    }
}
