package org.apache.iceberg.flink.source.assigner;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.class */
public class SimpleSplitAssigner implements SplitAssigner {
    private final Deque<IcebergSourceSplit> pendingSplits;
    private CompletableFuture<Void> availableFuture;

    public SimpleSplitAssigner() {
        this.pendingSplits = new ArrayDeque();
    }

    public SimpleSplitAssigner(Collection<IcebergSourceSplitState> collection) {
        this.pendingSplits = new ArrayDeque(collection.size());
        collection.forEach(icebergSourceSplitState -> {
            this.pendingSplits.add(icebergSourceSplitState.split());
        });
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public GetSplitResult getNext(@Nullable String str) {
        return this.pendingSplits.isEmpty() ? GetSplitResult.unavailable() : GetSplitResult.forSplit(this.pendingSplits.poll());
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public void onDiscoveredSplits(Collection<IcebergSourceSplit> collection) {
        addSplits(collection);
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public void onUnassignedSplits(Collection<IcebergSourceSplit> collection) {
        addSplits(collection);
    }

    private void addSplits(Collection<IcebergSourceSplit> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.pendingSplits.addAll(collection);
        completeAvailableFuturesIfNeeded();
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public Collection<IcebergSourceSplitState> state() {
        return (Collection) this.pendingSplits.stream().map(icebergSourceSplit -> {
            return new IcebergSourceSplitState(icebergSourceSplit, IcebergSourceSplitStatus.UNASSIGNED);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public synchronized CompletableFuture<Void> isAvailable() {
        if (this.availableFuture == null) {
            this.availableFuture = new CompletableFuture<>();
        }
        return this.availableFuture;
    }

    private synchronized void completeAvailableFuturesIfNeeded() {
        if (this.availableFuture != null && !this.pendingSplits.isEmpty()) {
            this.availableFuture.complete(null);
        }
        this.availableFuture = null;
    }
}
