package org.apache.seatunnel.translation.source;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/source/ParallelSource.class */
public class ParallelSource<T, SplitT extends SourceSplit, StateT extends Serializable> implements BaseSourceFunction<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ParallelSource.class);
    protected final SeaTunnelSource<T, SplitT, StateT> source;
    protected final ParallelEnumeratorContext<SplitT> parallelEnumeratorContext;
    protected final ParallelReaderContext readerContext;
    protected final Integer subtaskId;
    protected final Integer parallelism;
    protected final Serializer<SplitT> splitSerializer;
    protected final Serializer<StateT> enumeratorStateSerializer;
    protected final List<SplitT> restoredSplitState;
    protected final SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
    protected final SourceReader<T, SplitT> reader;
    protected volatile transient ScheduledThreadPoolExecutor executorService;
    private volatile boolean running = true;

    public ParallelSource(SeaTunnelSource<T, SplitT, StateT> seaTunnelSource, Map<Integer, List<byte[]>> map, int i, int i2) {
        this.source = seaTunnelSource;
        this.subtaskId = Integer.valueOf(i2);
        this.parallelism = Integer.valueOf(i);
        this.splitSerializer = seaTunnelSource.getSplitSerializer();
        this.enumeratorStateSerializer = seaTunnelSource.getEnumeratorStateSerializer();
        this.parallelEnumeratorContext = new ParallelEnumeratorContext<>(this, i, i2);
        this.readerContext = new ParallelReaderContext(this, seaTunnelSource.getBoundedness(), Integer.valueOf(i2));
        if (map != null) {
            try {
                if (map.size() > 0) {
                    StateT deserialize = map.containsKey(-1) ? this.enumeratorStateSerializer.deserialize(map.get(-1).get(0)) : null;
                    this.restoredSplitState = new ArrayList(map.get(Integer.valueOf(i2)).size());
                    Iterator<byte[]> it = map.get(Integer.valueOf(i2)).iterator();
                    while (it.hasNext()) {
                        this.restoredSplitState.add(this.splitSerializer.deserialize(it.next()));
                    }
                    this.splitEnumerator = seaTunnelSource.restoreEnumerator(this.parallelEnumeratorContext, deserialize);
                    this.reader = seaTunnelSource.createReader(this.readerContext);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.restoredSplitState = Collections.emptyList();
        this.splitEnumerator = seaTunnelSource.createEnumerator(this.parallelEnumeratorContext);
        this.reader = seaTunnelSource.createReader(this.readerContext);
    }

    @Override // org.apache.seatunnel.translation.source.BaseSourceFunction
    public void open() throws Exception {
        this.executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(1, String.format("parallel-split-enumerator-executor-%s", this.subtaskId));
        this.splitEnumerator.open();
        if (this.restoredSplitState.size() > 0) {
            this.splitEnumerator.addSplitsBack(this.restoredSplitState, this.subtaskId.intValue());
        }
        this.reader.open();
        this.parallelEnumeratorContext.register();
        this.splitEnumerator.registerReader(this.subtaskId.intValue());
    }

    @Override // org.apache.seatunnel.translation.source.BaseSourceFunction
    public void run(Collector<T> collector) throws Exception {
        Future<?> submit = this.executorService.submit(() -> {
            try {
                this.splitEnumerator.run();
            } catch (Exception e) {
                throw new RuntimeException("SourceSplitEnumerator run failed.", e);
            }
        });
        while (this.running) {
            if (submit.isDone()) {
                submit.get();
            }
            this.reader.pollNext(collector);
            Thread.sleep(5L);
        }
        LOG.debug("Parallel source runs complete.");
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.running = false;
        if (this.executorService != null) {
            LOG.debug("Close the thread pool resource.");
            this.executorService.shutdown();
        }
        if (this.splitEnumerator != null) {
            LOG.debug("Close the split enumerator for the Apache SeaTunnel source.");
            this.splitEnumerator.close();
        }
        if (this.reader != null) {
            LOG.debug("Close the data reader for the Apache SeaTunnel source.");
            this.reader.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleNoMoreElement() {
        this.running = false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleSplitRequest(int i) {
        this.splitEnumerator.handleSplitRequest(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addSplits(List<SplitT> list) {
        this.reader.addSplits(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleNoMoreSplits() {
        this.reader.handleNoMoreSplits();
    }

    @Override // org.apache.seatunnel.translation.source.BaseSourceFunction
    public Map<Integer, List<byte[]>> snapshotState(long j) throws Exception {
        HashMap hashMap = new HashMap(2);
        StateT snapshotState = this.splitEnumerator.snapshotState(j);
        if (snapshotState != null) {
            hashMap.put(-1, Collections.singletonList(this.enumeratorStateSerializer.serialize(snapshotState)));
        }
        List<SplitT> snapshotState2 = this.reader.snapshotState(j);
        if (snapshotState2 != null) {
            ArrayList arrayList = new ArrayList(snapshotState2.size());
            Iterator<SplitT> it = snapshotState2.iterator();
            while (it.hasNext()) {
                arrayList.add(this.splitSerializer.serialize(it.next()));
            }
            hashMap.put(this.subtaskId, arrayList);
        }
        return hashMap;
    }

    @Override // org.apache.seatunnel.api.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.splitEnumerator.notifyCheckpointComplete(j);
        this.reader.notifyCheckpointComplete(j);
    }

    @Override // org.apache.seatunnel.api.state.CheckpointListener
    public void notifyCheckpointAborted(long j) throws Exception {
        this.splitEnumerator.notifyCheckpointAborted(j);
        this.reader.notifyCheckpointAborted(j);
    }
}
