package org.apache.paimon.flink.source.assigners;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.align.PlaceholderSplit;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.class */
public class AlignedSplitAssigner implements SplitAssigner {
    private final Deque<PendingSnapshot> pendingSplitAssignment = new LinkedList();

    /* loaded from: input_file:org/apache/paimon/flink/source/assigners/AlignedSplitAssigner$PendingSnapshot.class */
    private static class PendingSnapshot {
        private final long snapshotId;
        private final boolean isPlaceHolder;
        private final Map<Integer, List<FileStoreSourceSplit>> subtaskSplits;

        public PendingSnapshot(long j, boolean z, Map<Integer, List<FileStoreSourceSplit>> map) {
            this.snapshotId = j;
            this.isPlaceHolder = z;
            this.subtaskSplits = map;
        }

        public List<FileStoreSourceSplit> remove(int i) {
            return this.subtaskSplits.remove(Integer.valueOf(i));
        }

        public void add(int i, FileStoreSourceSplit fileStoreSourceSplit) {
            Preconditions.checkArgument(((DataSplit) fileStoreSourceSplit.split()).snapshotId() == this.snapshotId, "SnapshotId not equal. This is a bug, please file an issue.");
            this.subtaskSplits.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ArrayList();
            }).add(fileStoreSourceSplit);
        }

        public void addAll(int i, List<FileStoreSourceSplit> list) {
            Preconditions.checkArgument(!this.subtaskSplits.containsKey(Integer.valueOf(i)), "Encountered a non-empty list of subtask pending splits. This is a bug, please file an issue.");
            list.forEach(fileStoreSourceSplit -> {
                Preconditions.checkArgument(((DataSplit) fileStoreSourceSplit.split()).snapshotId() == this.snapshotId, "SnapshotId not equal");
            });
            this.subtaskSplits.put(Integer.valueOf(i), list);
        }

        public boolean empty() {
            return this.subtaskSplits.isEmpty() || this.isPlaceHolder;
        }
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public List<FileStoreSourceSplit> getNext(int i, @Nullable String str) {
        PendingSnapshot peek = this.pendingSplitAssignment.peek();
        if (peek == null || peek.isPlaceHolder) {
            return Collections.emptyList();
        }
        List<FileStoreSourceSplit> remove = peek.remove(i);
        return remove != null ? remove : Collections.emptyList();
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public void addSplit(int i, FileStoreSourceSplit fileStoreSourceSplit) {
        long snapshotId = ((DataSplit) fileStoreSourceSplit.split()).snapshotId();
        PendingSnapshot peekLast = this.pendingSplitAssignment.peekLast();
        boolean z = fileStoreSourceSplit.split() instanceof PlaceholderSplit;
        if (peekLast != null && peekLast.snapshotId == snapshotId) {
            peekLast.add(i, fileStoreSourceSplit);
            return;
        }
        PendingSnapshot pendingSnapshot = new PendingSnapshot(snapshotId, z, new HashMap());
        pendingSnapshot.add(i, fileStoreSourceSplit);
        this.pendingSplitAssignment.addLast(pendingSnapshot);
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public void addSplitsBack(int i, List<FileStoreSourceSplit> list) {
        if (list.isEmpty()) {
            return;
        }
        long snapshotId = ((DataSplit) list.get(0).split()).snapshotId();
        boolean z = list.get(0).split() instanceof PlaceholderSplit;
        PendingSnapshot peek = this.pendingSplitAssignment.peek();
        if (peek != null && snapshotId == peek.snapshotId) {
            peek.addAll(i, list);
            return;
        }
        PendingSnapshot pendingSnapshot = new PendingSnapshot(snapshotId, z, new HashMap());
        pendingSnapshot.addAll(i, list);
        this.pendingSplitAssignment.addFirst(pendingSnapshot);
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public Collection<FileStoreSourceSplit> remainingSplits() {
        ArrayList arrayList = new ArrayList();
        Iterator<PendingSnapshot> it = this.pendingSplitAssignment.iterator();
        while (it.hasNext()) {
            Collection values = it.next().subtaskSplits.values();
            arrayList.getClass();
            values.forEach((v1) -> {
                r1.addAll(v1);
            });
        }
        return arrayList;
    }

    @Override // org.apache.paimon.flink.source.assigners.SplitAssigner
    public Optional<Long> getNextSnapshotId(int i) {
        PendingSnapshot peek = this.pendingSplitAssignment.peek();
        return Optional.ofNullable(peek != null ? Long.valueOf(peek.snapshotId) : null);
    }

    public boolean isAligned() {
        PendingSnapshot peek = this.pendingSplitAssignment.peek();
        return peek != null && peek.empty();
    }

    public int remainingSnapshots() {
        return this.pendingSplitAssignment.size();
    }

    public void removeFirst() {
        PendingSnapshot poll = this.pendingSplitAssignment.poll();
        Preconditions.checkArgument(poll != null && poll.empty(), "The head pending splits is not empty. This is a bug, please file an issue.");
    }
}
