/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SnapshotSplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HybridSplitAssigner<C extends SourceConfig>
implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(HybridSplitAssigner.class);
    private final SnapshotSplitAssigner<C> snapshotSplitAssigner;
    private final IncrementalSplitAssigner<C> incrementalSplitAssigner;

    public HybridSplitAssigner(SplitAssigner.Context<C> context, int currentParallelism, int incrementalParallelism, List<TableId> remainingTables, boolean isTableIdCaseSensitive, DataSourceDialect<C> dialect, OffsetFactory offsetFactory) {
        this(new SnapshotSplitAssigner<C>(context, currentParallelism, remainingTables, isTableIdCaseSensitive, dialect), new IncrementalSplitAssigner<C>(context, incrementalParallelism, offsetFactory));
    }

    public HybridSplitAssigner(SplitAssigner.Context<C> context, int currentParallelism, int incrementalParallelism, HybridPendingSplitsState checkpoint, DataSourceDialect<C> dialect, OffsetFactory offsetFactory) {
        this(new SnapshotSplitAssigner<C>(context, currentParallelism, checkpoint.getSnapshotPhaseState(), dialect), new IncrementalSplitAssigner<C>(context, incrementalParallelism, offsetFactory));
    }

    private HybridSplitAssigner(SnapshotSplitAssigner<C> snapshotSplitAssigner, IncrementalSplitAssigner<C> incrementalSplitAssigner) {
        this.snapshotSplitAssigner = snapshotSplitAssigner;
        this.incrementalSplitAssigner = incrementalSplitAssigner;
    }

    @Override
    public void open() {
        this.snapshotSplitAssigner.open();
    }

    @Override
    public Optional<SourceSplitBase> getNext() {
        if (!this.snapshotSplitAssigner.noMoreSplits()) {
            return this.snapshotSplitAssigner.getNext();
        }
        if (!this.snapshotSplitAssigner.isCompleted()) {
            return Optional.empty();
        }
        if (!this.incrementalSplitAssigner.noMoreSplits()) {
            return this.incrementalSplitAssigner.getNext();
        }
        return Optional.empty();
    }

    @Override
    public boolean waitingForCompletedSplits() {
        return this.snapshotSplitAssigner.waitingForCompletedSplits();
    }

    @Override
    public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWatermarks) {
        this.snapshotSplitAssigner.onCompletedSplits(completedSplitWatermarks);
        this.incrementalSplitAssigner.onCompletedSplits(completedSplitWatermarks);
    }

    @Override
    public void addSplits(Collection<SourceSplitBase> splits) {
        ArrayList<SourceSplitBase> snapshotSplits = new ArrayList<SourceSplitBase>();
        ArrayList<SourceSplitBase> incrementalSplits = new ArrayList<SourceSplitBase>();
        for (SourceSplitBase split : splits) {
            if (split.isSnapshotSplit()) {
                snapshotSplits.add(split);
                continue;
            }
            incrementalSplits.add(split);
        }
        this.snapshotSplitAssigner.addSplits(snapshotSplits);
        this.incrementalSplitAssigner.addSplits(incrementalSplits);
    }

    @Override
    public PendingSplitsState snapshotState(long checkpointId) {
        return new HybridPendingSplitsState(this.snapshotSplitAssigner.snapshotState(checkpointId), this.incrementalSplitAssigner.snapshotState(checkpointId));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
        this.incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
    }
}

