package org.apache.seatunnel.translation.flink.source;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.class */
public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT> implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceEnumerator.class);
    private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator;
    private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext;
    private final int parallelism;
    private final Object lock = new Object();
    private volatile boolean isRun = false;
    private volatile int currentRegisterReaders = 0;

    public FlinkSourceEnumerator(SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator, SplitEnumeratorContext<SplitWrapper<SplitT>> splitEnumeratorContext) {
        this.sourceSplitEnumerator = sourceSplitEnumerator;
        this.enumeratorContext = splitEnumeratorContext;
        this.parallelism = this.enumeratorContext.currentParallelism();
    }

    public void start() {
        this.sourceSplitEnumerator.open();
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        this.sourceSplitEnumerator.handleSplitRequest(i);
    }

    public void addSplitsBack(List<SplitWrapper<SplitT>> list, int i) {
        this.sourceSplitEnumerator.addSplitsBack((List) list.stream().map((v0) -> {
            return v0.getSourceSplit();
        }).collect(Collectors.toList()), i);
    }

    public void addReader(int i) {
        this.sourceSplitEnumerator.registerReader(i);
        synchronized (this.lock) {
            this.currentRegisterReaders++;
            if (!this.isRun && this.currentRegisterReaders == this.parallelism) {
                try {
                    this.sourceSplitEnumerator.run();
                    this.isRun = true;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public EnumStateT snapshotState(long j) throws Exception {
        return (EnumStateT) this.sourceSplitEnumerator.snapshotState(j);
    }

    public void close() throws IOException {
        this.sourceSplitEnumerator.close();
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            LOGGER.info("Received NoMoreElementEvent from reader [{}], total registered readers [{}]", Integer.valueOf(i), Integer.valueOf(this.enumeratorContext.currentParallelism()));
            this.enumeratorContext.sendEventToSourceReader(i, sourceEvent);
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            this.sourceSplitEnumerator.handleSourceEvent(i, ((SourceEventWrapper) sourceEvent).getSourceEvent());
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.sourceSplitEnumerator.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.sourceSplitEnumerator.notifyCheckpointAborted(j);
    }
}
