package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceReader.class */
public class MaxcomputeSourceReader implements SourceReader<SeaTunnelRow, MaxcomputeSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(MaxcomputeSourceReader.class);
    private final SourceReader.Context context;
    private final Set<MaxcomputeSourceSplit> sourceSplits = new HashSet();
    private Config pluginConfig;
    boolean noMoreSplit;
    private SeaTunnelRowType seaTunnelRowType;

    public MaxcomputeSourceReader(Config config, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) {
        this.pluginConfig = config;
        this.context = context;
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public void open() {
    }

    public void close() {
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        this.sourceSplits.forEach(maxcomputeSourceSplit -> {
            try {
                TunnelRecordReader openRecordReader = MaxcomputeUtil.getDownloadSession(this.pluginConfig).openRecordReader(maxcomputeSourceSplit.getSplitId(), maxcomputeSourceSplit.getRowNum());
                log.info("open record reader success");
                while (true) {
                    Record read = openRecordReader.read();
                    if (read == null) {
                        openRecordReader.close();
                        return;
                    }
                    collector.collect(MaxcomputeTypeMapper.getSeaTunnelRowData(read, this.seaTunnelRowType));
                }
            } catch (Exception e) {
                throw new MaxcomputeConnectorException(CommonErrorCode.READER_OPERATION_FAILED, e);
            }
        });
        if (this.noMoreSplit && Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
            log.info("Closed the bounded Maxcompute source");
            this.context.signalNoMoreElement();
        }
    }

    public List<MaxcomputeSourceSplit> snapshotState(long j) throws Exception {
        return new ArrayList(this.sourceSplits);
    }

    public void addSplits(List<MaxcomputeSourceSplit> list) {
        this.sourceSplits.addAll(list);
    }

    public void handleNoMoreSplits() {
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long j) {
    }
}
