package org.apache.flink.test.util;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.core.io.InputStatus;

/* loaded from: input_file:org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint.class */
public class NumberSequenceSourceWithWaitForCheckpoint extends NumberSequenceSource {
    private static final long serialVersionUID = 1;
    private final int numSplits;
    private final long numAllowedMessageBeforeCheckpoint;

    /* loaded from: input_file:org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint$AssignAfterCheckpointEnumerator.class */
    private static final class AssignAfterCheckpointEnumerator<SplitT extends IteratorSourceSplit<?, ?>> extends IteratorSourceEnumerator<SplitT> {
        private final Queue<Integer> pendingRequests;
        private final SplitEnumeratorContext<?> context;

        /* JADX WARN: Multi-variable type inference failed */
        public AssignAfterCheckpointEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext, Collection<SplitT> collection) {
            super(splitEnumeratorContext, collection);
            this.pendingRequests = new ArrayDeque();
            this.context = splitEnumeratorContext;
        }

        public void handleSplitRequest(int i, @Nullable String str) {
            this.pendingRequests.add(Integer.valueOf(i));
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Collection<SplitT> m996snapshotState(long j) throws Exception {
            this.context.runInCoordinatorThread(this::fullFillPendingRequests);
            return super.snapshotState(j);
        }

        private void fullFillPendingRequests() {
            Iterator<Integer> it = this.pendingRequests.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                if (this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                    super.handleSplitRequest(intValue, (String) null);
                }
            }
            this.pendingRequests.clear();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/util/NumberSequenceSourceWithWaitForCheckpoint$CheckpointListeningIteratorSourceReader.class */
    private static class CheckpointListeningIteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> extends IteratorSourceReader<E, IterT, SplitT> {
        private boolean checkpointed;
        private long messagesProduced;
        private final long numAllowedMessageBeforeCheckpoint;

        public CheckpointListeningIteratorSourceReader(SourceReaderContext sourceReaderContext, long j) {
            super(sourceReaderContext);
            this.checkpointed = false;
            this.messagesProduced = 0L;
            this.numAllowedMessageBeforeCheckpoint = j;
        }

        public InputStatus pollNext(ReaderOutput<E> readerOutput) {
            if (this.messagesProduced >= this.numAllowedMessageBeforeCheckpoint && !this.checkpointed) {
                return InputStatus.NOTHING_AVAILABLE;
            }
            this.messagesProduced++;
            return super.pollNext(readerOutput);
        }

        public void notifyCheckpointComplete(long j) {
            this.checkpointed = true;
        }
    }

    public NumberSequenceSourceWithWaitForCheckpoint(long j, long j2, int i) {
        super(j, j2);
        this.numSplits = i;
        this.numAllowedMessageBeforeCheckpoint = (j2 - j) / i;
    }

    public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> splitEnumeratorContext) {
        return new AssignAfterCheckpointEnumerator(splitEnumeratorContext, splitNumberRange(getFrom(), getTo(), this.numSplits));
    }

    public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext sourceReaderContext) {
        return new CheckpointListeningIteratorSourceReader(sourceReaderContext, this.numAllowedMessageBeforeCheckpoint);
    }
}
