package org.apache.seatunnel.connectors.doris.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/doris/source/reader/DorisSourceReader.class */
public class DorisSourceReader implements SourceReader<SeaTunnelRow, DorisSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(DorisSourceReader.class);
    private final SourceReader.Context context;
    private final DorisConfig dorisConfig;
    private final Queue<DorisSourceSplit> splitsQueue = new ArrayDeque();
    private volatile boolean noMoreSplits;
    private DorisValueReader valueReader;
    private SeaTunnelRowType seaTunnelRowType;

    public DorisSourceReader(SourceReader.Context context, DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
        this.context = context;
        this.dorisConfig = dorisConfig;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void open() throws Exception {
    }

    public void close() throws IOException {
        if (this.valueReader != null) {
            this.valueReader.close();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        synchronized (collector.getCheckpointLock()) {
            DorisSourceSplit poll = this.splitsQueue.poll();
            if (poll != null) {
                this.valueReader = new DorisValueReader(poll.getPartitionDefinition(), this.dorisConfig, this.seaTunnelRowType);
                while (this.valueReader.hasNext()) {
                    collector.collect(this.valueReader.next());
                }
            }
            if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && this.noMoreSplits && this.splitsQueue.isEmpty()) {
                log.info("Closed the bounded Doris source");
                this.context.signalNoMoreElement();
            }
        }
    }

    public List<DorisSourceSplit> snapshotState(long j) throws Exception {
        return new ArrayList(this.splitsQueue);
    }

    public void addSplits(List<DorisSourceSplit> list) {
        this.splitsQueue.addAll(list);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplits = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
