package org.apache.seatunnel.translation.source;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/source/CoordinatedSource.class */
public class CoordinatedSource<T, SplitT extends SourceSplit, StateT extends Serializable> implements BaseSourceFunction<T> {
    protected static final long SLEEP_TIME_INTERVAL = 5;
    protected final SeaTunnelSource<T, SplitT, StateT> source;
    protected final Map<Integer, List<byte[]>> restoredState;
    protected final Integer parallelism;
    protected final Serializer<SplitT> splitSerializer;
    protected final Serializer<StateT> enumeratorStateSerializer;
    protected final Map<Integer, CoordinatedReaderContext> readerContextMap;
    protected volatile transient SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
    protected final Map<Integer, AtomicBoolean> readerRunningMap;
    protected volatile transient ScheduledThreadPoolExecutor executorService;
    protected final Map<Integer, List<SplitT>> restoredSplitStateMap = new HashMap();
    protected transient Map<Integer, SourceReader<T, SplitT>> readerMap = new ConcurrentHashMap();
    protected final AtomicInteger completedReader = new AtomicInteger(0);
    protected volatile boolean running = true;
    protected final CoordinatedEnumeratorContext<SplitT> coordinatedEnumeratorContext = new CoordinatedEnumeratorContext<>(this);

    public CoordinatedSource(SeaTunnelSource<T, SplitT, StateT> seaTunnelSource, Map<Integer, List<byte[]>> map, int i) {
        this.source = seaTunnelSource;
        this.restoredState = map;
        this.parallelism = Integer.valueOf(i);
        this.splitSerializer = seaTunnelSource.getSplitSerializer();
        this.enumeratorStateSerializer = seaTunnelSource.getEnumeratorStateSerializer();
        this.readerContextMap = new ConcurrentHashMap(i);
        this.readerRunningMap = new ConcurrentHashMap(i);
        try {
            createSplitEnumerator();
            createReaders();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void createSplitEnumerator() throws Exception {
        if (this.restoredState == null || this.restoredState.size() <= 0) {
            this.splitEnumerator = this.source.createEnumerator(this.coordinatedEnumeratorContext);
            return;
        }
        this.splitEnumerator = this.source.restoreEnumerator(this.coordinatedEnumeratorContext, (Serializable) this.enumeratorStateSerializer.deserialize(this.restoredState.get(-1).get(0)));
        this.restoredState.forEach((num, list) -> {
            if (num.intValue() == -1) {
                return;
            }
            ArrayList arrayList = new ArrayList(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                try {
                    arrayList.add((SourceSplit) this.splitSerializer.deserialize((byte[]) it.next()));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            this.restoredSplitStateMap.put(num, arrayList);
        });
    }

    private void createReaders() throws Exception {
        for (int i = 0; i < this.parallelism.intValue(); i++) {
            CoordinatedReaderContext coordinatedReaderContext = new CoordinatedReaderContext(this, this.source.getBoundedness(), Integer.valueOf(i));
            this.readerContextMap.put(Integer.valueOf(i), coordinatedReaderContext);
            this.readerRunningMap.put(Integer.valueOf(i), new AtomicBoolean(true));
            this.readerMap.put(Integer.valueOf(i), this.source.createReader(coordinatedReaderContext));
        }
    }

    @Override // org.apache.seatunnel.translation.source.BaseSourceFunction
    public void open() throws Exception {
        this.executorService = ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(this.parallelism.intValue(), "parallel-split-enumerator-executor");
        this.splitEnumerator.open();
        this.restoredSplitStateMap.forEach((num, list) -> {
            this.splitEnumerator.addSplitsBack(list, num.intValue());
        });
        this.readerMap.entrySet().parallelStream().forEach(entry -> {
            try {
                ((SourceReader) entry.getValue()).open();
                this.splitEnumerator.registerReader(((Integer) entry.getKey()).intValue());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override // org.apache.seatunnel.translation.source.BaseSourceFunction
    public void run(Collector<T> collector) throws Exception {
        this.readerMap.entrySet().parallelStream().forEach(entry -> {
            AtomicBoolean atomicBoolean = this.readerRunningMap.get(entry.getKey());
            SourceReader sourceReader = (SourceReader) entry.getValue();
            this.executorService.execute(() -> {
                while (atomicBoolean.get()) {
                    try {
                        sourceReader.pollNext(collector);
                        Thread.sleep(SLEEP_TIME_INTERVAL);
                    } catch (Exception e) {
                        this.running = false;
                        atomicBoolean.set(false);
                        throw new RuntimeException(e);
                    }
                }
            });
        });
        this.splitEnumerator.run();
        while (this.running) {
            Thread.sleep(SLEEP_TIME_INTERVAL);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.running = false;
        for (Map.Entry<Integer, SourceReader<T, SplitT>> entry : this.readerMap.entrySet()) {
            this.readerRunningMap.get(entry.getKey()).set(false);
            entry.getValue().close();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        SourceSplitEnumerator<SplitT, StateT> sourceSplitEnumerator = this.splitEnumerator;
        if (sourceSplitEnumerator != null) {
            sourceSplitEnumerator.close();
        }
    }

    @Override // org.apache.seatunnel.translation.source.BaseSourceFunction
    public Map<Integer, List<byte[]>> snapshotState(long j) throws Exception {
        byte[] serialize = this.enumeratorStateSerializer.serialize((Serializable) this.splitEnumerator.snapshotState(j));
        Map<Integer, List<byte[]>> map = (Map) this.readerMap.entrySet().parallelStream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            try {
                List snapshotState = ((SourceReader) entry.getValue()).snapshotState(j);
                ArrayList arrayList = new ArrayList(snapshotState.size());
                Iterator it = snapshotState.iterator();
                while (it.hasNext()) {
                    arrayList.add(this.splitSerializer.serialize((SourceSplit) it.next()));
                }
                return arrayList;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }));
        map.put(-1, Collections.singletonList(serialize));
        return map;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.splitEnumerator.notifyCheckpointComplete(j);
        this.readerMap.values().parallelStream().forEach(sourceReader -> {
            try {
                sourceReader.notifyCheckpointComplete(j);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.splitEnumerator.notifyCheckpointAborted(j);
        this.readerMap.values().parallelStream().forEach(sourceReader -> {
            try {
                sourceReader.notifyCheckpointAborted(j);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleNoMoreElement(int i) {
        this.readerRunningMap.get(Integer.valueOf(i)).set(false);
        this.readerContextMap.remove(Integer.valueOf(i));
        if (this.completedReader.incrementAndGet() == this.parallelism.intValue()) {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleSplitRequest(int i) {
        this.splitEnumerator.handleSplitRequest(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleReaderEvent(int i, SourceEvent sourceEvent) {
        this.splitEnumerator.handleSourceEvent(i, sourceEvent);
    }

    public int currentReaderCount() {
        return this.readerContextMap.size();
    }

    public Set<Integer> registeredReaders() {
        return this.readerMap.keySet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addSplits(int i, List<SplitT> list) {
        this.readerMap.get(Integer.valueOf(i)).addSplits(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleNoMoreSplits(int i) {
        this.readerMap.get(Integer.valueOf(i)).handleNoMoreSplits();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleEnumeratorEvent(int i, SourceEvent sourceEvent) {
        this.readerMap.get(Integer.valueOf(i)).handleSourceEvent(sourceEvent);
    }
}
