package org.apache.paimon.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.class */
public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
    private final Map<Integer, LinkedList<FileStoreSourceSplit>> bucketSplits;
    private final long discoveryInterval;
    private final Set<Integer> readersAwaitingSplit;
    private final FileStoreSourceSplitGenerator splitGenerator;
    private final StreamTableScan scan;
    private final int splitBatchSize;

    @Nullable
    private Long nextSnapshotId;
    private boolean finished = false;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext, Collection<FileStoreSourceSplit> collection, @Nullable Long l, long j, int i, StreamTableScan streamTableScan) {
        Preconditions.checkArgument(j > 0);
        this.context = (SplitEnumeratorContext) Preconditions.checkNotNull(splitEnumeratorContext);
        this.bucketSplits = new HashMap();
        addSplits(collection);
        this.nextSnapshotId = l;
        this.discoveryInterval = j;
        this.splitBatchSize = i;
        this.readersAwaitingSplit = new HashSet();
        this.splitGenerator = new FileStoreSourceSplitGenerator();
        this.scan = streamTableScan;
    }

    private void addSplits(Collection<FileStoreSourceSplit> collection) {
        collection.forEach(this::addSplit);
    }

    private void addSplit(FileStoreSourceSplit fileStoreSourceSplit) {
        this.bucketSplits.computeIfAbsent(Integer.valueOf(((DataSplit) fileStoreSourceSplit.split()).bucket()), num -> {
            return new LinkedList();
        }).add(fileStoreSourceSplit);
    }

    private void addSplitsBack(Collection<FileStoreSourceSplit> collection) {
        new LinkedList(collection).descendingIterator().forEachRemaining(this::addSplitToHead);
    }

    private void addSplitToHead(FileStoreSourceSplit fileStoreSourceSplit) {
        this.bucketSplits.computeIfAbsent(Integer.valueOf(((DataSplit) fileStoreSourceSplit.split()).bucket()), num -> {
            return new LinkedList();
        }).addFirst(fileStoreSourceSplit);
    }

    public void start() {
        SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext = this.context;
        StreamTableScan streamTableScan = this.scan;
        streamTableScan.getClass();
        splitEnumeratorContext.callAsync(streamTableScan::plan, this::processDiscoveredSplits, 0L, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int i) {
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        this.readersAwaitingSplit.add(Integer.valueOf(i));
        assignSplits();
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", sourceEvent);
    }

    public void addSplitsBack(List<FileStoreSourceSplit> list, int i) {
        LOG.debug("File Source Enumerator adds splits back: {}", list);
        addSplitsBack(list);
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PendingSplitsCheckpoint m133snapshotState(long j) {
        ArrayList arrayList = new ArrayList();
        Collection<LinkedList<FileStoreSourceSplit>> values = this.bucketSplits.values();
        arrayList.getClass();
        values.forEach((v1) -> {
            r1.addAll(v1);
        });
        PendingSplitsCheckpoint pendingSplitsCheckpoint = new PendingSplitsCheckpoint(arrayList, this.nextSnapshotId);
        LOG.debug("Source Checkpoint is {}", pendingSplitsCheckpoint);
        return pendingSplitsCheckpoint;
    }

    private void processDiscoveredSplits(TableScan.Plan plan, Throwable th) {
        if (th == null) {
            this.nextSnapshotId = this.scan.checkpoint();
            if (plan.splits().isEmpty()) {
                return;
            }
            addSplits(this.splitGenerator.createSplits(plan));
            assignSplits();
            return;
        }
        if (!(th instanceof EndOfScanException)) {
            LOG.error("Failed to enumerate files", th);
            return;
        }
        LOG.debug("Catching EndOfStreamException, the stream is finished.");
        this.finished = true;
        assignSplits();
    }

    private void assignSplits() {
        Map<Integer, List<FileStoreSourceSplit>> createAssignment = createAssignment();
        if (this.finished) {
            Iterator<Integer> it = this.readersAwaitingSplit.iterator();
            while (it.hasNext()) {
                Integer next = it.next();
                if (!createAssignment.containsKey(next)) {
                    this.context.signalNoMoreSplits(next.intValue());
                    it.remove();
                }
            }
        }
        Set<Integer> keySet = createAssignment.keySet();
        Set<Integer> set = this.readersAwaitingSplit;
        set.getClass();
        keySet.forEach((v1) -> {
            r1.remove(v1);
        });
        this.context.assignSplits(new SplitsAssignment(createAssignment));
    }

    private Map<Integer, List<FileStoreSourceSplit>> createAssignment() {
        HashMap hashMap = new HashMap();
        this.bucketSplits.forEach((num, linkedList) -> {
            if (linkedList.size() > 0) {
                int intValue = num.intValue() % this.context.currentParallelism();
                if (this.readersAwaitingSplit.contains(Integer.valueOf(intValue))) {
                    if (!this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                        this.readersAwaitingSplit.remove(Integer.valueOf(intValue));
                        return;
                    }
                    List list = (List) hashMap.computeIfAbsent(Integer.valueOf(intValue), num -> {
                        return new ArrayList();
                    });
                    if (list.size() < this.splitBatchSize) {
                        list.add(linkedList.poll());
                    }
                }
            }
        });
        return hashMap;
    }
}
