package org.apache.iceberg.flink.source.assigner;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitState;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus;
import org.apache.iceberg.flink.source.split.SerializableComparator;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.class */
public class DefaultSplitAssigner implements SplitAssigner {
    private final Queue<IcebergSourceSplit> pendingSplits;
    private CompletableFuture<Void> availableFuture;

    public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit> serializableComparator) {
        this.pendingSplits = serializableComparator == null ? new ArrayDeque<>() : new PriorityQueue<>(serializableComparator);
    }

    public DefaultSplitAssigner(SerializableComparator<IcebergSourceSplit> serializableComparator, Collection<IcebergSourceSplitState> collection) {
        this(serializableComparator);
        collection.forEach(icebergSourceSplitState -> {
            this.pendingSplits.add(icebergSourceSplitState.split());
        });
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public synchronized GetSplitResult getNext(@Nullable String str) {
        return this.pendingSplits.isEmpty() ? GetSplitResult.unavailable() : GetSplitResult.forSplit(this.pendingSplits.poll());
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public void onDiscoveredSplits(Collection<IcebergSourceSplit> collection) {
        addSplits(collection);
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public void onUnassignedSplits(Collection<IcebergSourceSplit> collection) {
        addSplits(collection);
    }

    private synchronized void addSplits(Collection<IcebergSourceSplit> collection) {
        if (collection.isEmpty()) {
            return;
        }
        this.pendingSplits.addAll(collection);
        completeAvailableFuturesIfNeeded();
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public synchronized Collection<IcebergSourceSplitState> state() {
        return (Collection) this.pendingSplits.stream().map(icebergSourceSplit -> {
            return new IcebergSourceSplitState(icebergSourceSplit, IcebergSourceSplitStatus.UNASSIGNED);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public synchronized CompletableFuture<Void> isAvailable() {
        if (this.availableFuture == null) {
            this.availableFuture = new CompletableFuture<>();
        }
        return this.availableFuture;
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public synchronized int pendingSplitCount() {
        return this.pendingSplits.size();
    }

    @Override // org.apache.iceberg.flink.source.assigner.SplitAssigner
    public long pendingRecords() {
        return ((Long) this.pendingSplits.stream().map(icebergSourceSplit -> {
            return Long.valueOf(icebergSourceSplit.task().estimatedRowsCount());
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })).longValue();
    }

    private synchronized void completeAvailableFuturesIfNeeded() {
        if (this.availableFuture != null && !this.pendingSplits.isEmpty()) {
            this.availableFuture.complete(null);
        }
        this.availableFuture = null;
    }
}
