package org.apache.seatunnel.translation.spark.source.partition.batch;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.ParallelSource;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.serialization.InternalRowCollector;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.class */
public class ParallelBatchPartitionReader {
    private static final Logger log = LoggerFactory.getLogger(ParallelBatchPartitionReader.class);
    protected static final Integer INTERVAL = 100;
    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
    protected final Integer parallelism;
    protected final String jobId;
    protected final Integer subtaskId;
    protected volatile BaseSourceFunction<SeaTunnelRow> internalSource;
    protected volatile InternalRowCollector internalRowCollector;
    private final Map<String, String> envOptions;
    private final MultiTableManager multiTableManager;
    protected final Object checkpointLock = new Object();
    protected volatile boolean running = true;
    protected volatile boolean prepare = true;
    protected final ExecutorService executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, getEnumeratorThreadName());
    protected final Handover<InternalRow> handover = new Handover<>();

    /* loaded from: input_file:org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader$InternalParallelSource.class */
    public class InternalParallelSource<SplitT extends SourceSplit, StateT extends Serializable> extends ParallelSource<SeaTunnelRow, SplitT, StateT> {
        public InternalParallelSource(SeaTunnelSource<SeaTunnelRow, SplitT, StateT> seaTunnelSource, Map<Integer, List<byte[]>> map, int i, String str, int i2) {
            super(seaTunnelSource, map, i, str, i2);
        }

        protected void handleNoMoreElement() {
            super.handleNoMoreElement();
            ParallelBatchPartitionReader.this.running = false;
        }
    }

    public ParallelBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource, Integer num, String str, Integer num2, Map<String, String> map, MultiTableManager multiTableManager) {
        this.source = seaTunnelSource;
        this.parallelism = num;
        this.jobId = str;
        this.subtaskId = num2;
        this.envOptions = map;
        this.multiTableManager = multiTableManager;
    }

    protected String getEnumeratorThreadName() {
        return String.format("parallel-split-enumerator-executor-%s", this.subtaskId);
    }

    public boolean next() throws Exception {
        prepare();
        while (this.running && this.handover.isEmpty()) {
            try {
                Thread.sleep(INTERVAL.intValue());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return this.running || !this.handover.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepare() {
        if (this.prepare) {
            this.internalSource = createInternalSource();
            try {
                this.internalSource.open();
                this.internalRowCollector = this.multiTableManager.getInternalRowCollector(this.handover, this.checkpointLock, this.envOptions);
                this.executorService.execute(() -> {
                    try {
                        this.internalSource.run(this.internalRowCollector);
                    } catch (Exception e) {
                        this.handover.reportError(e);
                        log.error("BatchPartitionReader execute failed.", e);
                        this.running = false;
                    }
                });
                this.prepare = false;
            } catch (Exception e) {
                this.running = false;
                throw new RuntimeException("Failed to open internal source.", e);
            }
        }
    }

    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
        return new InternalParallelSource(this.source, null, this.parallelism.intValue(), this.jobId, this.subtaskId.intValue());
    }

    public InternalRow get() {
        try {
            return (InternalRow) this.handover.pollNext().get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() throws IOException {
        this.running = false;
        try {
            if (this.internalSource != null) {
                this.internalSource.close();
            }
            this.executorService.shutdown();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
