package org.apache.seatunnel.translation.flink.source;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.core.io.InputStatus;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.event.ReaderCloseEvent;
import org.apache.seatunnel.api.source.event.ReaderOpenEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/source/FlinkSourceReader.class */
public class FlinkSourceReader<SplitT extends SourceSplit> implements SourceReader<SeaTunnelRow, SplitWrapper<SplitT>> {
    private final org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader;
    private final SourceReader.Context context;
    private final FlinkRowCollector flinkRowCollector;
    private volatile CompletableFuture<Void> availabilityFuture;
    private static final long DEFAULT_WAIT_TIME_MILLIS = 1000;
    private final ScheduledExecutorService scheduledExecutor;
    private final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceReader.class);
    private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;

    public FlinkSourceReader(org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader, SourceReader.Context context, Config config) {
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat(String.format("source-reader-scheduler-%d", Integer.valueOf(context.getIndexOfSubtask()))).build());
        this.sourceReader = sourceReader;
        this.context = context;
        this.flinkRowCollector = new FlinkRowCollector(config, context.getMetricsContext());
    }

    public void start() {
        try {
            this.sourceReader.open();
            this.context.getEventListener().onEvent(new ReaderOpenEvent());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public InputStatus pollNext(ReaderOutput<SeaTunnelRow> readerOutput) throws Exception {
        if (((FlinkSourceReaderContext) this.context).isSendNoMoreElementEvent()) {
            Thread.sleep(DEFAULT_WAIT_TIME_MILLIS);
        } else {
            this.sourceReader.pollNext(this.flinkRowCollector.withReaderOutput(readerOutput));
            if (this.flinkRowCollector.isEmptyThisPollNext()) {
                synchronized (this) {
                    if (this.availabilityFuture == null || this.availabilityFuture.isDone()) {
                        this.availabilityFuture = new CompletableFuture<>();
                        scheduleComplete(this.availabilityFuture);
                        this.LOGGER.debug("No data available, wait for next poll.");
                    }
                }
                return InputStatus.NOTHING_AVAILABLE;
            }
        }
        return this.inputStatus;
    }

    public List<SplitWrapper<SplitT>> snapshotState(long j) {
        try {
            return (List) this.sourceReader.snapshotState(j).stream().map(SplitWrapper::new).collect(Collectors.toList());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public CompletableFuture<Void> isAvailable() {
        CompletableFuture<Void> completableFuture = this.availabilityFuture;
        return completableFuture != null ? completableFuture : CompletableFuture.completedFuture(null);
    }

    public void addSplits(List<SplitWrapper<SplitT>> list) {
        this.sourceReader.addSplits((List) list.stream().map((v0) -> {
            return v0.getSourceSplit();
        }).collect(Collectors.toList()));
    }

    public void notifyNoMoreSplits() {
        this.sourceReader.handleNoMoreSplits();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            this.inputStatus = InputStatus.END_OF_INPUT;
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            this.sourceReader.handleSourceEvent(((SourceEventWrapper) sourceEvent).getSourceEvent());
        }
    }

    public void close() throws Exception {
        CompletableFuture<Void> completableFuture = this.availabilityFuture;
        if (completableFuture != null && !completableFuture.isDone()) {
            completableFuture.complete(null);
        }
        this.sourceReader.close();
        this.context.getEventListener().onEvent(new ReaderCloseEvent());
        this.scheduledExecutor.shutdown();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.sourceReader.notifyCheckpointComplete(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.sourceReader.notifyCheckpointAborted(j);
    }

    private void scheduleComplete(CompletableFuture<Void> completableFuture) {
        this.scheduledExecutor.schedule(() -> {
            return Boolean.valueOf(completableFuture.complete(null));
        }, DEFAULT_WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS);
    }
}
