package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.DefaultSeaTunnelRowDeserializer;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.ElasticsearchRecord;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.source.SeaTunnelRowDeserializer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.class */
public class ElasticsearchSourceReader implements SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSourceReader.class);
    SourceReader.Context context;
    private Config pluginConfig;
    private EsRestClient esRestClient;
    private final SeaTunnelRowDeserializer deserializer;
    boolean noMoreSplit;
    Deque<ElasticsearchSourceSplit> splits = new LinkedList();
    private final long pollNextWaitTime = 1000;

    public ElasticsearchSourceReader(SourceReader.Context context, Config config, SeaTunnelRowType seaTunnelRowType) {
        this.context = context;
        this.pluginConfig = config;
        this.deserializer = new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
    }

    public void open() {
        this.esRestClient = EsRestClient.createInstance(this.pluginConfig);
    }

    public void close() throws IOException {
        this.esRestClient.close();
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        synchronized (collector.getCheckpointLock()) {
            ElasticsearchSourceSplit poll = this.splits.poll();
            if (poll != null) {
                SourceIndexInfo sourceIndexInfo = poll.getSourceIndexInfo();
                ScrollResult searchByScroll = this.esRestClient.searchByScroll(sourceIndexInfo.getIndex(), sourceIndexInfo.getSource(), sourceIndexInfo.getScrollTime(), sourceIndexInfo.getScrollSize());
                outputFromScrollResult(searchByScroll, sourceIndexInfo.getSource(), collector);
                while (searchByScroll.getDocs() != null && searchByScroll.getDocs().size() > 0) {
                    searchByScroll = this.esRestClient.searchWithScrollId(searchByScroll.getScrollId(), sourceIndexInfo.getScrollTime());
                    outputFromScrollResult(searchByScroll, sourceIndexInfo.getSource(), collector);
                }
            } else if (this.noMoreSplit) {
                log.info("Closed the bounded ELasticsearch source");
                this.context.signalNoMoreElement();
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    private void outputFromScrollResult(ScrollResult scrollResult, List<String> list, Collector<SeaTunnelRow> collector) {
        Iterator<Map<String, Object>> it = scrollResult.getDocs().iterator();
        while (it.hasNext()) {
            collector.collect(this.deserializer.deserialize(new ElasticsearchRecord(it.next(), list)));
        }
    }

    public List<ElasticsearchSourceSplit> snapshotState(long j) throws Exception {
        return new ArrayList(this.splits);
    }

    public void addSplits(List<ElasticsearchSourceSplit> list) {
        this.splits.addAll(list);
    }

    public void handleNoMoreSplits() {
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
