package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource.class */
public class TestCountingSource extends UnboundedSource<KV<Integer, Integer>, CounterMark> {
    private static List<Integer> finalizeTracker;
    private final int numMessagesPerShard;
    private final int shardNumber;
    private final boolean dedup;
    private final boolean throwOnFirstSnapshot;
    private final int fixedNumSplits;
    private volatile transient boolean haltEmission;
    private static final Logger LOG = LoggerFactory.getLogger(TestCountingSource.class);
    private static boolean thrown = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource$CounterMark.class */
    public static class CounterMark implements UnboundedSource.CheckpointMark {
        int current;

        public CounterMark(int i) {
            this.current = i;
        }

        public void finalizeCheckpoint() {
            if (TestCountingSource.finalizeTracker != null) {
                TestCountingSource.finalizeTracker.add(Integer.valueOf(this.current));
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource$CountingSourceReader.class */
    public class CountingSourceReader extends UnboundedSource.UnboundedReader<KV<Integer, Integer>> {
        private int current;

        public CountingSourceReader(int i) {
            this.current = i;
        }

        public boolean start() {
            return advance();
        }

        public boolean advance() {
            if (this.current >= TestCountingSource.this.numMessagesPerShard - 1 || TestCountingSource.this.haltEmission) {
                return false;
            }
            if (this.current >= 0 && TestCountingSource.this.dedup && ThreadLocalRandom.current().nextInt(5) == 0) {
                return true;
            }
            this.current++;
            return true;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public KV<Integer, Integer> m24getCurrent() {
            return KV.of(Integer.valueOf(TestCountingSource.this.shardNumber), Integer.valueOf(this.current));
        }

        public Instant getCurrentTimestamp() {
            return new Instant(this.current);
        }

        public byte[] getCurrentRecordId() {
            try {
                return CoderUtils.encodeToByteArray(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), m24getCurrent());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public void close() {
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public TestCountingSource m23getCurrentSource() {
            return TestCountingSource.this;
        }

        public Instant getWatermark() {
            return this.current >= TestCountingSource.this.numMessagesPerShard - 1 ? BoundedWindow.TIMESTAMP_MAX_VALUE : new Instant(this.current + 1);
        }

        /* renamed from: getCheckpointMark, reason: merged with bridge method [inline-methods] */
        public CounterMark m22getCheckpointMark() {
            if (!TestCountingSource.this.throwOnFirstSnapshot || TestCountingSource.thrown) {
                return new CounterMark(this.current);
            }
            boolean unused = TestCountingSource.thrown = true;
            TestCountingSource.LOG.error("Throwing exception while checkpointing counter");
            throw new RuntimeException("failed during checkpoint");
        }

        public long getSplitBacklogBytes() {
            return 7L;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource$FromCounterMark.class */
    private static class FromCounterMark implements DelegateCoder.CodingFunction<CounterMark, Integer> {
        private FromCounterMark() {
        }

        public Integer apply(CounterMark counterMark) {
            return Integer.valueOf(counterMark.current);
        }

        public int hashCode() {
            return FromCounterMark.class.hashCode();
        }

        public boolean equals(Object obj) {
            return obj instanceof FromCounterMark;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/TestCountingSource$ToCounterMark.class */
    private static class ToCounterMark implements DelegateCoder.CodingFunction<Integer, CounterMark> {
        private ToCounterMark() {
        }

        public CounterMark apply(Integer num) {
            return new CounterMark(num.intValue());
        }

        public int hashCode() {
            return ToCounterMark.class.hashCode();
        }

        public boolean equals(Object obj) {
            return obj instanceof ToCounterMark;
        }
    }

    public static void setFinalizeTracker(List<Integer> list) {
        finalizeTracker = list;
    }

    public TestCountingSource(int i) {
        this(i, 0, false, false, -1);
    }

    public TestCountingSource withDedup() {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, true, this.throwOnFirstSnapshot, -1);
    }

    private TestCountingSource withShardNumber(int i) {
        return new TestCountingSource(this.numMessagesPerShard, i, this.dedup, this.throwOnFirstSnapshot, -1);
    }

    public TestCountingSource withThrowOnFirstSnapshot(boolean z) {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, z, -1);
    }

    public TestCountingSource withoutSplitting() {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, this.throwOnFirstSnapshot, 1);
    }

    public TestCountingSource withFixedNumSplits(int i) {
        return new TestCountingSource(this.numMessagesPerShard, this.shardNumber, this.dedup, this.throwOnFirstSnapshot, i);
    }

    private TestCountingSource(int i, int i2, boolean z, boolean z2, int i3) {
        this.numMessagesPerShard = i;
        this.shardNumber = i2;
        this.dedup = z;
        this.throwOnFirstSnapshot = z2;
        this.fixedNumSplits = i3;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void haltEmission() {
        this.haltEmission = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void continueEmission() {
        this.haltEmission = false;
    }

    public List<TestCountingSource> split(int i, PipelineOptions pipelineOptions) {
        ArrayList arrayList = new ArrayList();
        int i2 = this.fixedNumSplits == -1 ? i : this.fixedNumSplits;
        for (int i3 = 0; i3 < i2; i3++) {
            arrayList.add(withShardNumber(i3));
        }
        return arrayList;
    }

    public Coder<CounterMark> getCheckpointMarkCoder() {
        return DelegateCoder.of(VarIntCoder.of(), new FromCounterMark(), new ToCounterMark());
    }

    public boolean requiresDeduping() {
        return this.dedup;
    }

    public CountingSourceReader createReader(PipelineOptions pipelineOptions, CounterMark counterMark) {
        if (counterMark == null) {
            LOG.debug("creating reader");
        } else {
            LOG.debug("restoring reader from checkpoint with current = {}", Integer.valueOf(counterMark.current));
        }
        return new CountingSourceReader(counterMark != null ? counterMark.current : -1);
    }

    public Coder<KV<Integer, Integer>> getOutputCoder() {
        return KvCoder.of(VarIntCoder.of(), VarIntCoder.of());
    }
}
