package org.apache.seatunnel.translation.spark.source.partition.batch;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.seatunnel.translation.source.CoordinatedSource;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.serialization.InternalRowCollector;

/* loaded from: input_file:org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.class */
public class CoordinatedBatchPartitionReader extends ParallelBatchPartitionReader {
    protected final Map<Integer, InternalRowCollector> collectorMap;

    /* loaded from: input_file:org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader$InternalCoordinatedSource.class */
    public class InternalCoordinatedSource<SplitT extends SourceSplit, StateT extends Serializable> extends CoordinatedSource<SeaTunnelRow, SplitT, StateT> {
        public InternalCoordinatedSource(SeaTunnelSource<SeaTunnelRow, SplitT, StateT> seaTunnelSource, Map<Integer, List<byte[]>> map, int i, String str) {
            super(seaTunnelSource, map, i, str);
        }

        public void run(Collector<SeaTunnelRow> collector) throws Exception {
            this.readerMap.entrySet().parallelStream().forEach(entry -> {
                AtomicBoolean atomicBoolean = (AtomicBoolean) this.readerRunningMap.get(entry.getKey());
                SourceReader sourceReader = (SourceReader) entry.getValue();
                Collector collector2 = CoordinatedBatchPartitionReader.this.collectorMap.get(entry.getKey());
                this.executorService.execute(() -> {
                    while (atomicBoolean.get()) {
                        try {
                            sourceReader.pollNext(collector2);
                            if (collector2.isEmptyThisPollNext()) {
                                Thread.sleep(100L);
                            } else {
                                collector2.resetEmptyThisPollNext();
                                Thread.sleep(0L);
                            }
                        } catch (Exception e) {
                            this.running = false;
                            atomicBoolean.set(false);
                            throw new RuntimeException(e);
                        }
                    }
                });
            });
            this.splitEnumerator.run();
            while (this.running) {
                Thread.sleep(5L);
            }
        }

        protected void handleNoMoreElement(int i) {
            super.handleNoMoreElement(i);
            if (this.running) {
                return;
            }
            CoordinatedBatchPartitionReader.this.running = false;
        }
    }

    public CoordinatedBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource, Integer num, String str, Integer num2, Map<String, String> map, MultiTableManager multiTableManager) {
        super(seaTunnelSource, num, str, num2, map, multiTableManager);
        this.collectorMap = new HashMap(num.intValue());
        for (int i = 0; i < num.intValue(); i++) {
            this.collectorMap.put(Integer.valueOf(i), multiTableManager.getInternalRowCollector(this.handover, new Object(), map));
        }
    }

    @Override // org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader
    protected String getEnumeratorThreadName() {
        return "coordinated-split-enumerator-executor";
    }

    @Override // org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader
    protected BaseSourceFunction<SeaTunnelRow> createInternalSource() {
        return new InternalCoordinatedSource(this.source, null, this.parallelism.intValue(), this.jobId);
    }
}
