package org.apache.flink.cdc.connectors.base.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.Fetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.class */
public class IncrementalSourceSplitReader<C extends SourceConfig> implements SplitReader<SourceRecords, SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceSplitReader.class);
    private final ArrayDeque<SnapshotSplit> snapshotSplits = new ArrayDeque<>();
    private final ArrayDeque<StreamSplit> streamSplits = new ArrayDeque<>(1);
    private final int subtaskId;

    @Nullable
    private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;

    @Nullable
    private IncrementalSourceScanFetcher reusedScanFetcher;

    @Nullable
    private IncrementalSourceStreamFetcher reusedStreamFetcher;

    @Nullable
    private String currentSplitId;
    private final DataSourceDialect<C> dataSourceDialect;
    private final C sourceConfig;
    private final IncrementalSourceReaderContext context;
    private final SnapshotPhaseHooks snapshotHooks;

    public IncrementalSourceSplitReader(int i, DataSourceDialect<C> dataSourceDialect, C c, IncrementalSourceReaderContext incrementalSourceReaderContext, SnapshotPhaseHooks snapshotPhaseHooks) {
        this.subtaskId = i;
        this.dataSourceDialect = dataSourceDialect;
        this.sourceConfig = c;
        this.context = incrementalSourceReaderContext;
        this.snapshotHooks = snapshotPhaseHooks;
    }

    public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
        try {
            suspendStreamReaderIfNeed();
            return pollSplitRecords();
        } catch (Exception e) {
            LOG.warn("fetch data failed.", e);
            throw new IOException(e);
        }
    }

    private void suspendStreamReaderIfNeed() throws Exception {
        if (this.currentFetcher == null || !(this.currentFetcher instanceof IncrementalSourceStreamFetcher) || !this.context.isStreamSplitReaderSuspended() || this.currentFetcher.isFinished()) {
            return;
        }
        ((IncrementalSourceStreamFetcher) this.currentFetcher).stopReadTask();
        LOG.info("Suspend stream reader to wait the stream split update.");
    }

    public void handleSplitsChanges(SplitsChange<SourceSplitBase> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChange);
        for (SourceSplitBase sourceSplitBase : splitsChange.splits()) {
            if (sourceSplitBase.isSnapshotSplit()) {
                this.snapshotSplits.add(sourceSplitBase.asSnapshotSplit());
            } else {
                this.streamSplits.add(sourceSplitBase.asStreamSplit());
            }
        }
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        closeScanFetcher();
        closeStreamFetcher();
    }

    private ChangeEventRecords pollSplitRecords() throws InterruptedException {
        ChangeEventRecords forRecords;
        Iterator<SourceRecords> it = null;
        if (this.currentFetcher == null) {
            if (this.streamSplits.size() > 0) {
                submitStreamSplit(this.streamSplits.poll());
            } else if (this.snapshotSplits.size() > 0) {
                submitSnapshotSplit(this.snapshotSplits.poll());
            } else {
                LOG.info("No available split to read.");
            }
            if (this.currentFetcher != null) {
                it = this.currentFetcher.pollSplitRecords();
            } else {
                this.currentSplitId = null;
            }
            return it == null ? finishedSplit() : forRecords(it);
        }
        if (!(this.currentFetcher instanceof IncrementalSourceScanFetcher)) {
            if (!(this.currentFetcher instanceof IncrementalSourceStreamFetcher)) {
                throw new IllegalStateException("Unsupported reader type.");
            }
            Iterator<SourceRecords> pollSplitRecords = this.currentFetcher.pollSplitRecords();
            if (pollSplitRecords == null) {
                closeStreamFetcher();
                return finishedSplit();
            }
            SnapshotSplit poll = this.snapshotSplits.poll();
            if (poll != null) {
                closeStreamFetcher();
                LOG.info("It's turn to switch next fetch reader to snapshot split reader");
                submitSnapshotSplit(poll);
            }
            return ChangeEventRecords.forRecords(StreamSplit.STREAM_SPLIT_ID, pollSplitRecords);
        }
        Iterator<SourceRecords> pollSplitRecords2 = this.currentFetcher.pollSplitRecords();
        if (pollSplitRecords2 == null) {
            return finishedSplit();
        }
        if (this.context.isHasAssignedStreamSplit()) {
            forRecords = forNewAddedTableFinishedSplit(this.currentSplitId, pollSplitRecords2);
            closeScanFetcher();
            closeStreamFetcher();
        } else {
            forRecords = forRecords(pollSplitRecords2);
            SnapshotSplit poll2 = this.snapshotSplits.poll();
            if (poll2 != null) {
                Preconditions.checkState(this.reusedScanFetcher != null);
                submitSnapshotSplit(poll2);
            } else {
                closeScanFetcher();
            }
        }
        return forRecords;
    }

    @VisibleForTesting
    public boolean canAssignNextSplit() {
        return this.currentFetcher == null || this.currentFetcher.isFinished();
    }

    private ChangeEventRecords finishedSplit() {
        ChangeEventRecords forFinishedSplit = ChangeEventRecords.forFinishedSplit(this.currentSplitId);
        this.currentSplitId = null;
        return forFinishedSplit;
    }

    private ChangeEventRecords forNewAddedTableFinishedSplit(String str, Iterator<SourceRecords> it) {
        HashSet hashSet = new HashSet();
        hashSet.add(str);
        hashSet.add(StreamSplit.STREAM_SPLIT_ID);
        this.currentSplitId = null;
        return new ChangeEventRecords(str, it, hashSet);
    }

    private ChangeEventRecords forRecords(Iterator<SourceRecords> it) {
        if (!(this.currentFetcher instanceof IncrementalSourceScanFetcher)) {
            return ChangeEventRecords.forRecords(this.currentSplitId, it);
        }
        ChangeEventRecords forSnapshotRecords = ChangeEventRecords.forSnapshotRecords(this.currentSplitId, it);
        closeScanFetcher();
        return forSnapshotRecords;
    }

    private void submitSnapshotSplit(SnapshotSplit snapshotSplit) {
        this.currentSplitId = snapshotSplit.splitId();
        this.currentFetcher = getScanFetcher();
        FetchTask<SourceSplitBase> createFetchTask = this.dataSourceDialect.createFetchTask(snapshotSplit);
        ((AbstractScanFetchTask) createFetchTask).setSnapshotPhaseHooks(this.snapshotHooks);
        this.currentFetcher.submitTask(createFetchTask);
    }

    private void submitStreamSplit(StreamSplit streamSplit) {
        this.currentSplitId = streamSplit.splitId();
        this.currentFetcher = getStreamFetcher();
        this.currentFetcher.submitTask(this.dataSourceDialect.createFetchTask(streamSplit));
    }

    private IncrementalSourceScanFetcher getScanFetcher() {
        if (this.reusedScanFetcher == null) {
            this.reusedScanFetcher = new IncrementalSourceScanFetcher(this.dataSourceDialect.createFetchTaskContext(this.sourceConfig), this.subtaskId);
        }
        return this.reusedScanFetcher;
    }

    private IncrementalSourceStreamFetcher getStreamFetcher() {
        if (this.reusedStreamFetcher == null) {
            this.reusedStreamFetcher = new IncrementalSourceStreamFetcher(this.dataSourceDialect.createFetchTaskContext(this.sourceConfig), this.subtaskId);
        }
        return this.reusedStreamFetcher;
    }

    private void closeScanFetcher() {
        if (this.reusedScanFetcher != null) {
            LOG.debug("Close snapshot reader {}", this.reusedScanFetcher.getClass().getCanonicalName());
            this.reusedScanFetcher.close();
            if (this.currentFetcher == this.reusedScanFetcher) {
                this.currentFetcher = null;
            }
            this.reusedScanFetcher = null;
        }
    }

    private void closeStreamFetcher() {
        if (this.reusedStreamFetcher != null) {
            LOG.debug("Close stream reader {}", this.reusedStreamFetcher.getClass().getCanonicalName());
            this.reusedStreamFetcher.close();
            if (this.currentFetcher == this.reusedStreamFetcher) {
                this.currentFetcher = null;
            }
            this.reusedStreamFetcher = null;
        }
    }
}
