package org.apache.flink.table.store.connector.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.table.source.SnapshotEnumerator;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.class */
public class ContinuousFileSplitEnumerator implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
    private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
    private final long discoveryInterval;
    private final Set<Integer> readersAwaitingSplit;
    private final FileStoreSourceSplitGenerator splitGenerator;
    private final SnapshotEnumerator snapshotEnumerator;
    private Long currentSnapshotId;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> splitEnumeratorContext, Path path, TableScan tableScan, Collection<FileStoreSourceSplit> collection, long j, long j2) {
        Preconditions.checkArgument(j2 > 0);
        this.context = (SplitEnumeratorContext) Preconditions.checkNotNull(splitEnumeratorContext);
        this.bucketSplits = new HashMap();
        addSplits(collection);
        this.currentSnapshotId = Long.valueOf(j);
        this.discoveryInterval = j2;
        this.readersAwaitingSplit = new HashSet();
        this.splitGenerator = new FileStoreSourceSplitGenerator();
        this.snapshotEnumerator = new SnapshotEnumerator(path, tableScan, j);
    }

    private void addSplits(Collection<FileStoreSourceSplit> collection) {
        collection.forEach(this::addSplit);
    }

    private void addSplit(FileStoreSourceSplit fileStoreSourceSplit) {
        this.bucketSplits.computeIfAbsent(Integer.valueOf(fileStoreSourceSplit.split().bucket()), num -> {
            return new LinkedList();
        }).add(fileStoreSourceSplit);
    }

    public void start() {
        this.context.callAsync(this.snapshotEnumerator, this::processDiscoveredSplits, this.discoveryInterval, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int i) {
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        this.readersAwaitingSplit.add(Integer.valueOf(i));
        assignSplits();
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", sourceEvent);
    }

    public void addSplitsBack(List<FileStoreSourceSplit> list, int i) {
        LOG.debug("File Source Enumerator adds splits back: {}", list);
        addSplits(list);
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PendingSplitsCheckpoint m483snapshotState(long j) {
        ArrayList arrayList = new ArrayList();
        Collection<Queue<FileStoreSourceSplit>> values = this.bucketSplits.values();
        arrayList.getClass();
        values.forEach((v1) -> {
            r1.addAll(v1);
        });
        PendingSplitsCheckpoint pendingSplitsCheckpoint = new PendingSplitsCheckpoint(arrayList, this.currentSnapshotId == null ? -1L : this.currentSnapshotId.longValue());
        LOG.debug("Source Checkpoint is {}", pendingSplitsCheckpoint);
        return pendingSplitsCheckpoint;
    }

    private void processDiscoveredSplits(@Nullable SnapshotEnumerator.EnumeratorResult enumeratorResult, Throwable th) {
        if (th != null) {
            LOG.error("Failed to enumerate files", th);
        } else {
            if (enumeratorResult == null) {
                return;
            }
            this.currentSnapshotId = Long.valueOf(enumeratorResult.snapshotId);
            addSplits(this.splitGenerator.createSplits(enumeratorResult.plan));
            assignSplits();
        }
    }

    private void assignSplits() {
        this.bucketSplits.forEach((num, queue) -> {
            if (queue.size() > 0) {
                int intValue = num.intValue() % this.context.currentParallelism();
                if (this.readersAwaitingSplit.remove(Integer.valueOf(intValue)) && this.context.registeredReaders().containsKey(Integer.valueOf(intValue))) {
                    this.context.assignSplit((SourceSplit) queue.poll(), intValue);
                }
            }
        });
    }
}
