/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.util;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.core.io.InputStatus;

public class NumberSequenceSourceWithWaitForCheckpoint
extends NumberSequenceSource {
    private static final long serialVersionUID = 1L;
    private final int numSplits;
    private final long numAllowedMessageBeforeCheckpoint;

    public NumberSequenceSourceWithWaitForCheckpoint(long from, long to, int numSplits) {
        super(from, to);
        this.numSplits = numSplits;
        this.numAllowedMessageBeforeCheckpoint = (to - from) / (long)numSplits;
    }

    public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> enumContext) {
        List splits = this.splitNumberRange(this.getFrom(), this.getTo(), this.numSplits);
        return new AssignAfterCheckpointEnumerator<NumberSequenceSource.NumberSequenceSplit>(enumContext, splits);
    }

    public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext readerContext) {
        return new CheckpointListeningIteratorSourceReader(readerContext, this.numAllowedMessageBeforeCheckpoint);
    }

    private static class CheckpointListeningIteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>>
    extends IteratorSourceReader<E, IterT, SplitT> {
        private boolean checkpointed = false;
        private long messagesProduced = 0L;
        private final long numAllowedMessageBeforeCheckpoint;

        public CheckpointListeningIteratorSourceReader(SourceReaderContext context, long waitForCheckpointAfterMessages) {
            super(context);
            this.numAllowedMessageBeforeCheckpoint = waitForCheckpointAfterMessages;
        }

        public InputStatus pollNext(ReaderOutput<E> output) {
            if (this.messagesProduced < this.numAllowedMessageBeforeCheckpoint || this.checkpointed) {
                ++this.messagesProduced;
                return super.pollNext(output);
            }
            return InputStatus.NOTHING_AVAILABLE;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            this.checkpointed = true;
        }
    }

    private static final class AssignAfterCheckpointEnumerator<SplitT extends IteratorSourceSplit<?, ?>>
    extends IteratorSourceEnumerator<SplitT> {
        private final Queue<Integer> pendingRequests = new ArrayDeque<Integer>();
        private final SplitEnumeratorContext<?> context;

        public AssignAfterCheckpointEnumerator(SplitEnumeratorContext<SplitT> context, Collection<SplitT> splits) {
            super(context, splits);
            this.context = context;
        }

        public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            this.pendingRequests.add(subtaskId);
        }

        public Collection<SplitT> snapshotState(long checkpointId) throws Exception {
            this.context.runInCoordinatorThread(this::fullFillPendingRequests);
            return super.snapshotState(checkpointId);
        }

        private void fullFillPendingRequests() {
            Iterator iterator = this.pendingRequests.iterator();
            while (iterator.hasNext()) {
                int subtask = (Integer)iterator.next();
                if (!this.context.registeredReaders().containsKey(subtask)) continue;
                super.handleSplitRequest(subtask, null);
            }
            this.pendingRequests.clear();
        }
    }
}

