package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceReader.class */
public class JdbcSourceReader implements SourceReader<SeaTunnelRow, JdbcSourceSplit> {
    protected static final Logger LOG = LoggerFactory.getLogger(JdbcSource.class);
    SourceReader.Context context;
    Deque<JdbcSourceSplit> splits = new LinkedList();
    JdbcInputFormat inputFormat;
    boolean noMoreSplit;

    public JdbcSourceReader(JdbcInputFormat jdbcInputFormat, SourceReader.Context context) {
        this.inputFormat = jdbcInputFormat;
        this.context = context;
    }

    public void open() throws Exception {
        this.inputFormat.openInputFormat();
    }

    public void close() throws IOException {
        this.inputFormat.closeInputFormat();
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        JdbcSourceSplit poll = this.splits.poll();
        if (null != poll) {
            this.inputFormat.open(poll);
            while (!this.inputFormat.reachedEnd()) {
                collector.collect(this.inputFormat.nextRecord());
            }
            this.inputFormat.close();
            return;
        }
        if (!this.noMoreSplit) {
            Thread.sleep(1000L);
        } else {
            LOG.info("Closed the bounded jdbc source");
            this.context.signalNoMoreElement();
        }
    }

    public List<JdbcSourceSplit> snapshotState(long j) throws Exception {
        return new ArrayList(this.splits);
    }

    public void addSplits(List<JdbcSourceSplit> list) {
        this.splits.addAll(list);
    }

    public void handleNoMoreSplits() {
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
