/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotSplitAssigner<C extends SourceConfig>
implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class);
    private final SplitAssigner.Context<C> context;
    private final C sourceConfig;
    private final List<TableId> alreadyProcessedTables;
    private final List<SnapshotSplit> remainingSplits;
    private final Map<String, SnapshotSplit> assignedSplits;
    private final Map<String, Offset> splitCompletedOffsets;
    private boolean assignerCompleted;
    private final int currentParallelism;
    private final LinkedList<TableId> remainingTables;
    private final boolean isRemainingTablesCheckpointed;
    private ChunkSplitter chunkSplitter;
    private boolean isTableIdCaseSensitive;
    private Long checkpointIdToFinish;
    private final DataSourceDialect<C> dialect;

    SnapshotSplitAssigner(SplitAssigner.Context<C> context, int currentParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive, DataSourceDialect<C> dialect) {
        this(context, currentParallelism, new ArrayList<TableId>(), new ArrayList<SnapshotSplit>(), new HashMap<String, SnapshotSplit>(), new HashMap<String, Offset>(), false, remainingTables, isTableIdCaseSensitive, true, dialect);
    }

    SnapshotSplitAssigner(SplitAssigner.Context<C> context, int currentParallelism, SnapshotPhaseState checkpoint, DataSourceDialect<C> dialect) {
        this(context, currentParallelism, checkpoint.getAlreadyProcessedTables(), checkpoint.getRemainingSplits(), checkpoint.getAssignedSplits(), checkpoint.getSplitCompletedOffsets(), checkpoint.isAssignerCompleted(), checkpoint.getRemainingTables(), checkpoint.isTableIdCaseSensitive(), checkpoint.isRemainingTablesCheckpointed(), dialect);
    }

    private SnapshotSplitAssigner(SplitAssigner.Context<C> context, int currentParallelism, List<TableId> alreadyProcessedTables, List<SnapshotSplit> remainingSplits, Map<String, SnapshotSplit> assignedSplits, Map<String, Offset> splitCompletedOffsets, boolean assignerCompleted, List<TableId> remainingTables, boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed, DataSourceDialect<C> dialect) {
        this.context = context;
        this.sourceConfig = context.getSourceConfig();
        this.currentParallelism = currentParallelism;
        this.alreadyProcessedTables = alreadyProcessedTables;
        this.remainingSplits = remainingSplits;
        this.assignedSplits = assignedSplits;
        this.splitCompletedOffsets = splitCompletedOffsets;
        this.assignerCompleted = assignerCompleted;
        this.remainingTables = new LinkedList<TableId>(remainingTables);
        this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
        this.isTableIdCaseSensitive = isTableIdCaseSensitive;
        this.dialect = dialect;
    }

    @Override
    public void open() {
        this.chunkSplitter = this.dialect.createChunkSplitter(this.sourceConfig);
        if (!this.isRemainingTablesCheckpointed && !this.assignerCompleted) {
            try {
                List<TableId> discoverTables = this.dialect.discoverDataCollections(this.sourceConfig);
                this.context.getCapturedTables().addAll(discoverTables);
                discoverTables.removeAll(this.alreadyProcessedTables);
                this.remainingTables.addAll(discoverTables);
                this.isTableIdCaseSensitive = this.dialect.isDataCollectionIdCaseSensitive(this.sourceConfig);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to discover remaining tables to capture", e);
            }
        }
    }

    @Override
    public Optional<SourceSplitBase> getNext() {
        if (this.chunkSplitter == null) {
            return Optional.empty();
        }
        if (!this.remainingSplits.isEmpty()) {
            Iterator<SnapshotSplit> iterator = this.remainingSplits.iterator();
            SnapshotSplit split = iterator.next();
            iterator.remove();
            this.assignedSplits.put(split.splitId(), split);
            this.context.getAssignedSnapshotSplit().put(split.splitId(), split);
            return Optional.of(split);
        }
        TableId nextTable = this.remainingTables.pollFirst();
        if (nextTable != null) {
            Collection<SnapshotSplit> splits = this.chunkSplitter.generateSplits(nextTable);
            this.remainingSplits.addAll(splits);
            this.alreadyProcessedTables.add(nextTable);
            return this.getNext();
        }
        return Optional.empty();
    }

    @Override
    public boolean waitingForCompletedSplits() {
        return !this.allSplitsCompleted();
    }

    @Override
    public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWatermarks) {
        completedSplitWatermarks.forEach(m3 -> this.splitCompletedOffsets.put(m3.getSplitId(), m3.getHighWatermark()));
        if (this.allSplitsCompleted()) {
            if (this.currentParallelism == 1) {
                this.assignerCompleted = true;
                LOG.info("Snapshot split assigner received all splits completed and the job parallelism is 1, snapshot split assigner is turn into completed status.");
            } else {
                LOG.info("Snapshot split assigner received all splits completed, waiting for a complete checkpoint to mark the assigner completed.");
            }
        }
    }

    @Override
    public void addSplits(Collection<SourceSplitBase> splits) {
        for (SourceSplitBase split : splits) {
            this.remainingSplits.add(split.asSnapshotSplit());
            this.assignedSplits.remove(split.splitId());
            this.splitCompletedOffsets.remove(split.splitId());
        }
    }

    @Override
    public SnapshotPhaseState snapshotState(long checkpointId) {
        SnapshotPhaseState state = new SnapshotPhaseState(this.alreadyProcessedTables, this.remainingSplits, this.assignedSplits, this.splitCompletedOffsets, this.assignerCompleted, this.remainingTables, this.isTableIdCaseSensitive, true);
        if (this.checkpointIdToFinish == null && !this.assignerCompleted && this.allSplitsCompleted()) {
            this.checkpointIdToFinish = checkpointId;
        }
        return state;
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        if (this.checkpointIdToFinish != null && !this.assignerCompleted && this.allSplitsCompleted()) {
            this.assignerCompleted = checkpointId >= this.checkpointIdToFinish;
            LOG.info("Snapshot split assigner is turn into completed status.");
        }
    }

    public boolean noMoreSplits() {
        return this.remainingTables.isEmpty() && this.remainingSplits.isEmpty();
    }

    public boolean isCompleted() {
        return this.assignerCompleted;
    }

    public Map<String, SnapshotSplit> getAssignedSplits() {
        return this.assignedSplits;
    }

    public Map<String, Offset> getSplitCompletedOffsets() {
        return this.splitCompletedOffsets;
    }

    private boolean allSplitsCompleted() {
        return this.noMoreSplits() && this.assignedSplits.size() == this.splitCompletedOffsets.size();
    }
}

