package org.apache.flink.connector.testframe.source;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.testframe.source.split.FromElementsSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/testframe/source/FromElementsSourceReader.class */
public class FromElementsSourceReader<T> implements SourceReader<T, FromElementsSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(FromElementsSourceReader.class);
    private SourceReaderContext context;
    private Integer limitedNum;
    private Boundedness boundedness;
    private List<T> elements;
    private Counter numRecordInCounter;
    private volatile boolean isRunning = true;
    private volatile boolean checkpointAtLimitedNum = false;
    private volatile int emittedNum = 0;

    public FromElementsSourceReader(Integer num, List<T> list, Boundedness boundedness, SourceReaderContext sourceReaderContext) {
        this.context = sourceReaderContext;
        this.elements = list;
        this.limitedNum = num;
        this.boundedness = boundedness;
        this.numRecordInCounter = sourceReaderContext.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

    public void start() {
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        if (!this.isRunning || this.emittedNum >= this.elements.size()) {
            return Boundedness.CONTINUOUS_UNBOUNDED.equals(this.boundedness) ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT;
        }
        if (this.limitedNum == null || (this.limitedNum != null && (this.emittedNum < this.limitedNum.intValue() || this.checkpointAtLimitedNum))) {
            readerOutput.collect(this.elements.get(this.emittedNum));
            this.emittedNum++;
            this.numRecordInCounter.inc();
        }
        return InputStatus.MORE_AVAILABLE;
    }

    public List<FromElementsSplit> snapshotState(long j) {
        if (this.limitedNum != null && !this.checkpointAtLimitedNum && this.emittedNum == this.limitedNum.intValue()) {
            this.checkpointAtLimitedNum = true;
            LOG.info("checkpoint {} is the target checkpoint to be used.", Long.valueOf(j));
        }
        return Arrays.asList(new FromElementsSplit(this.emittedNum));
    }

    public CompletableFuture<Void> isAvailable() {
        return CompletableFuture.completedFuture(null);
    }

    public void addSplits(List<FromElementsSplit> list) {
        this.emittedNum = list.get(0).getEmitNum();
        LOG.info("FromElementsSourceReader restores from {}.", Integer.valueOf(this.emittedNum));
    }

    public void notifyNoMoreSplits() {
    }

    public void close() throws Exception {
        this.isRunning = false;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LOG.info("checkpoint {} finished.", Long.valueOf(j));
    }
}
