package org.apache.seatunnel.connectors.seatunnel.starrocks.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.StarRocksBeReadClient;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.class */
public class StarRocksSourceReader implements SourceReader<SeaTunnelRow, StarRocksSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(StarRocksSourceReader.class);
    private final Queue<StarRocksSourceSplit> pendingSplits = new LinkedList();
    private final SourceReader.Context context;
    private final SourceConfig sourceConfig;
    private final SeaTunnelRowType seaTunnelRowType;
    private Map<String, StarRocksBeReadClient> clientsPools;
    private volatile boolean noMoreSplitsAssignment;

    public StarRocksSourceReader(SourceReader.Context context, SeaTunnelRowType seaTunnelRowType, SourceConfig sourceConfig) {
        this.context = context;
        this.sourceConfig = sourceConfig;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        while (!this.pendingSplits.isEmpty()) {
            synchronized (collector.getCheckpointLock()) {
                read(this.pendingSplits.poll(), collector);
            }
        }
        if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && this.noMoreSplitsAssignment && this.pendingSplits.isEmpty()) {
            log.info("Closed the bounded StarRocks source");
            this.context.signalNoMoreElement();
        }
    }

    public List<StarRocksSourceSplit> snapshotState(long j) {
        return new ArrayList(this.pendingSplits);
    }

    public void addSplits(List<StarRocksSourceSplit> list) {
        this.pendingSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
        log.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
    }

    private void read(StarRocksSourceSplit starRocksSourceSplit, Collector<SeaTunnelRow> collector) {
        StarRocksBeReadClient starRocksBeReadClient;
        QueryPartition partition = starRocksSourceSplit.getPartition();
        String beAddress = partition.getBeAddress();
        if (this.clientsPools.containsKey(beAddress)) {
            starRocksBeReadClient = this.clientsPools.get(beAddress);
        } else {
            starRocksBeReadClient = new StarRocksBeReadClient(beAddress, this.sourceConfig);
            this.clientsPools.put(beAddress, starRocksBeReadClient);
        }
        starRocksBeReadClient.openScanner(partition, this.seaTunnelRowType);
        while (starRocksBeReadClient.hasNext()) {
            collector.collect(starRocksBeReadClient.getNext());
        }
    }

    public void open() throws Exception {
        this.clientsPools = new HashMap();
    }

    public void close() throws IOException {
        if (this.clientsPools.isEmpty()) {
            return;
        }
        this.clientsPools.values().forEach(starRocksBeReadClient -> {
            if (starRocksBeReadClient != null) {
                try {
                    starRocksBeReadClient.close();
                } catch (StarRocksConnectorException e) {
                    log.error("Failed to close reader: ", e);
                }
            }
        });
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
