/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.reader;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSourceReader<T, C extends SourceConfig>
extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitStateBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    private volatile boolean running = false;
    private final int subtaskId;
    private final C sourceConfig;

    public IncrementalSourceReader(BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue, Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier, RecordEmitter<SourceRecords, T, SourceSplitStateBase> recordEmitter, SourceReaderOptions options, SourceReader.Context context, C sourceConfig) {
        super(elementsQueue, new SingleThreadFetcherManager(elementsQueue, splitReaderSupplier::get), recordEmitter, options, context);
        this.sourceConfig = sourceConfig;
        this.finishedUnackedSplits = new HashMap<String, SnapshotSplit>();
        this.subtaskId = context.getIndexOfSubtask();
    }

    @Override
    public void pollNext(Collector<T> output) throws Exception {
        if (!this.running) {
            if (this.getNumberOfCurrentlyAssignedSplits() == 0) {
                this.context.sendSplitRequest();
            }
            this.running = true;
        }
        super.pollNext(output);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }

    @Override
    public void addSplits(List<SourceSplitBase> splits) {
        ArrayList<SourceSplitBase> unfinishedSplits = new ArrayList<SourceSplitBase>();
        for (SourceSplitBase split : splits) {
            if (split.isSnapshotSplit()) {
                SnapshotSplit snapshotSplit = split.asSnapshotSplit();
                if (snapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                    continue;
                }
                unfinishedSplits.add(split);
                continue;
            }
            unfinishedSplits.add(split.asIncrementalSplit());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        if (!unfinishedSplits.isEmpty()) {
            super.addSplits(unfinishedSplits);
        }
    }

    @Override
    protected void onSplitFinished(Map<String, SourceSplitStateBase> finishedSplitIds) {
        for (SourceSplitStateBase splitState : finishedSplitIds.values()) {
            SourceSplitBase sourceSplit = splitState.toSourceSplit();
            Preconditions.checkState(sourceSplit.isSnapshotSplit(), String.format("Only snapshot split could finish, but the actual split is incremental split %s", sourceSplit));
            this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        this.context.sendSplitRequest();
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!this.finishedUnackedSplits.isEmpty()) {
            ArrayList<SnapshotSplitWatermark> completedSnapshotSplitWatermarks = new ArrayList<SnapshotSplitWatermark>();
            for (SnapshotSplit split : this.finishedUnackedSplits.values()) {
                completedSnapshotSplitWatermarks.add(new SnapshotSplitWatermark(split.splitId(), split.getHighWatermark()));
            }
            CompletedSnapshotSplitsReportEvent reportEvent = new CompletedSnapshotSplitsReportEvent();
            reportEvent.setCompletedSnapshotSplitWatermarks(completedSnapshotSplitWatermarks);
            this.context.sendSourceEventToEnumerator((SourceEvent)reportEvent);
            this.finishedUnackedSplits.clear();
            log.debug("The subtask {} reports offsets of finished snapshot splits {}.", (Object)this.subtaskId, completedSnapshotSplitWatermarks);
        }
    }

    @Override
    protected SourceSplitStateBase initializedState(SourceSplitBase split) {
        if (split.isSnapshotSplit()) {
            return new SnapshotSplitState(split.asSnapshotSplit());
        }
        return new IncrementalSplitState(split.asIncrementalSplit());
    }

    @Override
    public List<SourceSplitBase> snapshotState(long checkpointId) {
        List stateSplits = super.snapshotState(checkpointId);
        List<SourceSplitBase> unfinishedSplits = stateSplits.stream().filter(split -> !this.finishedUnackedSplits.containsKey(split.splitId())).collect(Collectors.toList());
        unfinishedSplits.addAll(this.finishedUnackedSplits.values());
        return unfinishedSplits;
    }

    @Override
    protected SourceSplitBase toSplitType(String splitId, SourceSplitStateBase splitState) {
        return splitState.toSourceSplit();
    }
}

