package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.class */
public class IncrementalSplitAssigner<C extends SourceConfig> implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSplitAssigner.class);
    protected static final String INCREMENTAL_SPLIT_ID = "incremental-split-%d";
    private final SplitAssigner.Context<C> context;
    private final int incrementalParallelism;
    private final OffsetFactory offsetFactory;
    private final Map<TableId, Offset> tableWatermarks = new HashMap();
    private boolean splitAssigned = false;
    private final List<IncrementalSplit> remainingSplits = new ArrayList();
    private final Map<String, IncrementalSplit> assignedSplits = new HashMap();
    private boolean startWithSnapshotMinimumOffset = true;
    private List<CatalogTable> checkpointTables;
    private Map<TableId, byte[]> historyTableChanges;

    public IncrementalSplitAssigner(SplitAssigner.Context<C> context, int i, OffsetFactory offsetFactory) {
        this.context = context;
        this.incrementalParallelism = i;
        this.offsetFactory = offsetFactory;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void open() {
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public Optional<SourceSplitBase> getNext() {
        if (!this.remainingSplits.isEmpty()) {
            Iterator<IncrementalSplit> it = this.remainingSplits.iterator();
            IncrementalSplit next = it.next();
            it.remove();
            this.assignedSplits.put(next.splitId(), next);
            return Optional.of(next);
        }
        if (this.splitAssigned) {
            return Optional.empty();
        }
        this.remainingSplits.addAll(createIncrementalSplits(this.startWithSnapshotMinimumOffset));
        this.splitAssigned = true;
        return getNext();
    }

    public boolean noMoreSplits() {
        return getRemainingTables().isEmpty() && this.remainingSplits.isEmpty();
    }

    private Set<TableId> getRemainingTables() {
        HashSet hashSet = new HashSet(this.context.getCapturedTables());
        this.assignedSplits.values().forEach(incrementalSplit -> {
            List<TableId> tableIds = incrementalSplit.getTableIds();
            hashSet.getClass();
            tableIds.forEach((v1) -> {
                r1.remove(v1);
            });
        });
        return hashSet;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public boolean waitingForCompletedSplits() {
        return false;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void onCompletedSplits(List<SnapshotSplitWatermark> list) {
        list.forEach(snapshotSplitWatermark -> {
            this.context.getSplitCompletedOffsets().put(snapshotSplitWatermark.getSplitId(), snapshotSplitWatermark);
        });
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void addSplits(Collection<SourceSplitBase> collection) {
        collection.stream().map((v0) -> {
            return v0.asIncrementalSplit();
        }).forEach(incrementalSplit -> {
            Offset startupOffset = incrementalSplit.getStartupOffset();
            for (CompletedSnapshotSplitInfo completedSnapshotSplitInfo : incrementalSplit.getCompletedSnapshotSplitInfos()) {
                if (this.context.getCapturedTables().contains(completedSnapshotSplitInfo.getTableId())) {
                    this.context.getSplitCompletedOffsets().put(completedSnapshotSplitInfo.getSplitId(), completedSnapshotSplitInfo.getWatermark());
                    this.context.getAssignedSnapshotSplit().put(completedSnapshotSplitInfo.getSplitId(), completedSnapshotSplitInfo.asSnapshotSplit());
                }
            }
            for (TableId tableId : incrementalSplit.getTableIds()) {
                if (this.context.getCapturedTables().contains(tableId)) {
                    this.tableWatermarks.put(tableId, startupOffset);
                }
            }
            this.checkpointTables = incrementalSplit.getCheckpointTables();
            this.historyTableChanges = incrementalSplit.getHistoryTableChanges();
        });
        if (this.tableWatermarks.isEmpty()) {
            return;
        }
        this.startWithSnapshotMinimumOffset = false;
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public IncrementalPhaseState snapshotState(long j) {
        return new IncrementalPhaseState();
    }

    @Override // org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner
    public void notifyCheckpointComplete(long j) {
    }

    public List<IncrementalSplit> createIncrementalSplits(boolean z) {
        HashSet<TableId> hashSet = new HashSet(this.context.getCapturedTables());
        this.assignedSplits.values().forEach(incrementalSplit -> {
            List<TableId> tableIds = incrementalSplit.getTableIds();
            hashSet.getClass();
            tableIds.forEach((v1) -> {
                r1.remove(v1);
            });
        });
        List<TableId>[] listArr = new List[this.incrementalParallelism];
        int i = 0;
        for (TableId tableId : hashSet) {
            int i2 = i % this.incrementalParallelism;
            if (listArr[i2] == null) {
                listArr[i2] = new ArrayList();
            }
            listArr[i2].add(tableId);
            i++;
        }
        int i3 = 0;
        ArrayList arrayList = new ArrayList();
        for (List<TableId> list : listArr) {
            int i4 = i3;
            i3++;
            arrayList.add(createIncrementalSplit(list, i4, z));
        }
        return arrayList;
    }

    private IncrementalSplit createIncrementalSplit(List<TableId> list, int i, boolean z) {
        C sourceConfig = this.context.getSourceConfig();
        List<SnapshotSplit> list2 = (List) this.context.getAssignedSnapshotSplit().values().stream().filter(snapshotSplit -> {
            return list.contains(snapshotSplit.getTableId());
        }).sorted(Comparator.comparing((v0) -> {
            return v0.splitId();
        })).collect(Collectors.toList());
        Map<String, SnapshotSplitWatermark> splitCompletedOffsets = this.context.getSplitCompletedOffsets();
        ArrayList arrayList = new ArrayList();
        Offset offset = null;
        for (SnapshotSplit snapshotSplit2 : list2) {
            SnapshotSplitWatermark snapshotSplitWatermark = splitCompletedOffsets.get(snapshotSplit2.splitId());
            if (z) {
                Offset highWatermark = sourceConfig.isExactlyOnce() ? snapshotSplitWatermark.getHighWatermark() : snapshotSplitWatermark.getLowWatermark();
                if (offset == null || highWatermark.isBefore(offset)) {
                    offset = highWatermark;
                    LOG.debug("Find the min offset {} of change log in split {}", highWatermark, snapshotSplitWatermark);
                }
            }
            arrayList.add(new CompletedSnapshotSplitInfo(snapshotSplit2.splitId(), snapshotSplit2.getTableId(), snapshotSplit2.getSplitKeyType(), snapshotSplit2.getSplitStart(), snapshotSplit2.getSplitEnd(), snapshotSplitWatermark));
        }
        for (TableId tableId : list) {
            Offset offset2 = this.tableWatermarks.get(tableId);
            if (offset == null || (offset2 != null && offset2.isBefore(offset))) {
                offset = offset2;
                LOG.debug("Find the min offset {} of change log in table-watermarks {}", offset2, tableId);
            }
        }
        return new IncrementalSplit(String.format(INCREMENTAL_SPLIT_ID, Integer.valueOf(i)), list, offset != null ? offset : sourceConfig.getStartupConfig().getStartupOffset(this.offsetFactory), sourceConfig.getStopConfig().getStopOffset(this.offsetFactory), arrayList, this.checkpointTables, this.historyTableChanges);
    }

    @VisibleForTesting
    void setSplitAssigned(boolean z) {
        this.splitAssigned = z;
    }

    public boolean completedSnapshotPhase(List<TableId> list) {
        Preconditions.checkArgument(this.splitAssigned && noMoreSplits());
        Iterator it = new ArrayList(this.context.getAssignedSnapshotSplit().keySet()).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            SnapshotSplit snapshotSplit = this.context.getAssignedSnapshotSplit().get(str);
            if (list.contains(snapshotSplit.getTableId())) {
                this.context.getAssignedSnapshotSplit().remove(str);
                this.context.getSplitCompletedOffsets().remove(snapshotSplit.splitId());
            }
        }
        return this.context.getAssignedSnapshotSplit().isEmpty() && this.context.getSplitCompletedOffsets().isEmpty();
    }

    public boolean waitingForAssignedSplits() {
        return (this.splitAssigned && noMoreSplits()) ? false : true;
    }
}
