package org.apache.paimon.flink.source.align;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.paimon.flink.source.ContinuousFileSplitEnumerator;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.class */
public class AlignedContinuousFileSplitEnumerator extends ContinuousFileSplitEnumerator {
    private static final Logger LOG = LoggerFactory.getLogger(AlignedContinuousFileSplitEnumerator.class);
    private static final String PLACEHOLDER_SPLIT = "placeholder";
    private static final int MAX_PENDING_PLAN = 10;
    private final ArrayBlockingQueue<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> pendingPlans;
    private final AlignedSplitAssigner alignedAssigner;
    private final long alignTimeout;
    private final Object lock;
    private long currentCheckpointId;
    private Long lastConsumedSnapshotId;
    private boolean closed;

    public AlignedContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext, Collection<FileStoreSourceSplit> collection, @Nullable Long l, long j, StreamTableScan streamTableScan, BucketMode bucketMode, long j2, int i) {
        super(splitEnumeratorContext, collection, l, j, streamTableScan, bucketMode, i);
        this.pendingPlans = new ArrayBlockingQueue<>(10);
        this.alignedAssigner = (AlignedSplitAssigner) this.splitAssigner;
        this.nextSnapshotId = l;
        this.alignTimeout = j2;
        this.lock = new Object();
        this.currentCheckpointId = Long.MIN_VALUE;
        this.lastConsumedSnapshotId = null;
        this.closed = false;
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    protected void addSplits(Collection<FileStoreSourceSplit> collection) {
        TreeMap treeMap = new TreeMap();
        for (FileStoreSourceSplit fileStoreSourceSplit : collection) {
            ((List) treeMap.computeIfAbsent(Long.valueOf(((DataSplit) fileStoreSourceSplit.split()).snapshotId()), l -> {
                return new ArrayList();
            })).add(fileStoreSourceSplit);
        }
        Iterator it = treeMap.values().iterator();
        while (it.hasNext()) {
            computeForBucket((List) it.next()).forEach((num, list) -> {
                list.forEach(fileStoreSourceSplit2 -> {
                    this.splitAssigner.addSplit(num.intValue(), fileStoreSourceSplit2);
                });
            });
        }
    }

    private Map<Integer, List<FileStoreSourceSplit>> computeForBucket(Collection<FileStoreSourceSplit> collection) {
        HashMap hashMap = new HashMap();
        for (FileStoreSourceSplit fileStoreSourceSplit : collection) {
            ((List) hashMap.computeIfAbsent(Integer.valueOf(assignSuggestedTask(fileStoreSourceSplit)), num -> {
                return new ArrayList();
            })).add(fileStoreSourceSplit);
        }
        return hashMap;
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    public void close() throws IOException {
        this.closed = true;
        synchronized (this.lock) {
            this.lock.notifyAll();
        }
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    public void addSplitsBack(List<FileStoreSourceSplit> list, int i) {
        super.addSplitsBack(list, i);
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    /* renamed from: snapshotState */
    public PendingSplitsCheckpoint mo208snapshotState(long j) throws Exception {
        if (!this.alignedAssigner.isAligned() && !this.closed) {
            synchronized (this.lock) {
                if (this.pendingPlans.isEmpty()) {
                    this.lock.wait(this.alignTimeout);
                    Preconditions.checkArgument(!this.closed, "Enumerator has been closed.");
                    Preconditions.checkArgument(!this.pendingPlans.isEmpty(), "Timeout while waiting for snapshot from paimon source.");
                }
            }
            ContinuousFileSplitEnumerator.PlanWithNextSnapshotId poll = this.pendingPlans.poll();
            addSplits(this.splitGenerator.createSplits(((ContinuousFileSplitEnumerator.PlanWithNextSnapshotId) Objects.requireNonNull(poll)).plan()));
            this.nextSnapshotId = poll.nextSnapshotId();
            assignSplits();
        }
        Preconditions.checkArgument(this.alignedAssigner.isAligned());
        this.lastConsumedSnapshotId = this.alignedAssigner.getNextSnapshotId(0).orElse(null);
        this.alignedAssigner.removeFirst();
        this.currentCheckpointId = j;
        CheckpointEvent checkpointEvent = new CheckpointEvent(j);
        for (int i = 0; i < this.context.currentParallelism(); i++) {
            this.context.sendEventToSourceReader(i, checkpointEvent);
        }
        return new PendingSplitsCheckpoint(this.alignedAssigner.remainingSplits(), this.nextSnapshotId);
    }

    public void notifyCheckpointAborted(long j) {
        if (this.currentCheckpointId == j) {
            throw new FlinkRuntimeException("Checkpoint failure is not allowed in aligned mode.");
        }
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    public void notifyCheckpointComplete(long j) {
        this.currentCheckpointId = Long.MIN_VALUE;
        this.scan.notifyCheckpointComplete(this.lastConsumedSnapshotId == null ? null : Long.valueOf(this.lastConsumedSnapshotId.longValue() + 1));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    public Optional<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> scanNextSnapshot() {
        if (this.pendingPlans.remainingCapacity() > 0) {
            Optional<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> scanNextSnapshot = super.scanNextSnapshot();
            if (scanNextSnapshot.isPresent()) {
                ContinuousFileSplitEnumerator.PlanWithNextSnapshotId planWithNextSnapshotId = scanNextSnapshot.get();
                if (!(planWithNextSnapshotId.plan() instanceof SnapshotNotExistPlan)) {
                    synchronized (this.lock) {
                        this.pendingPlans.add(planWithNextSnapshotId);
                        this.lock.notifyAll();
                    }
                }
            }
        }
        return Optional.empty();
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    protected void processDiscoveredSplits(Optional<ContinuousFileSplitEnumerator.PlanWithNextSnapshotId> optional, Throwable th) {
        if (th != null) {
            if (!(th instanceof EndOfScanException)) {
                LOG.error("Failed to enumerate files", th);
                throw new RuntimeException(th);
            }
            LOG.debug("Catching EndOfStreamException, the stream is finished.");
            this.finished = true;
        }
        if (this.alignedAssigner.remainingSnapshots() >= 10) {
            assignSplits();
            return;
        }
        ContinuousFileSplitEnumerator.PlanWithNextSnapshotId poll = this.pendingPlans.poll();
        if (poll != null) {
            this.nextSnapshotId = poll.nextSnapshotId();
            Objects.requireNonNull(this.nextSnapshotId);
            TableScan.Plan plan = poll.plan();
            if (plan.splits().isEmpty()) {
                addSplits(Collections.singletonList(new FileStoreSourceSplit(PLACEHOLDER_SPLIT, new PlaceholderSplit(this.nextSnapshotId.longValue() - 1))));
            } else {
                addSplits(this.splitGenerator.createSplits(plan));
            }
        }
        assignSplits();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    public boolean noMoreSplits() {
        return super.noMoreSplits() && this.alignedAssigner.remainingSnapshots() == 0 && this.pendingPlans.isEmpty();
    }

    @Override // org.apache.paimon.flink.source.ContinuousFileSplitEnumerator
    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
        return new AlignedSplitAssigner();
    }
}
