package org.apache.iceberg.flink.source.enumerator;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.assigner.GetSplitResult;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.class */
abstract class AbstractIcebergEnumerator implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
    private final SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext;
    private final SplitAssigner assigner;
    private final Map<Integer, String> readersAwaitingSplit = new LinkedHashMap();
    private final AtomicReference<CompletableFuture<Void>> availableFuture = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractIcebergEnumerator(SplitEnumeratorContext<IcebergSourceSplit> splitEnumeratorContext, SplitAssigner splitAssigner) {
        this.enumeratorContext = splitEnumeratorContext;
        this.assigner = splitAssigner;
    }

    public void start() {
        this.assigner.start();
    }

    public void close() throws IOException {
        this.assigner.close();
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        throw new UnsupportedOperationException(String.format("Received invalid default split request event from subtask %d as Iceberg source uses custom split request event", Integer.valueOf(i)));
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof SplitRequestEvent)) {
            throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s", Integer.valueOf(i), sourceEvent.getClass().getCanonicalName()));
        }
        SplitRequestEvent splitRequestEvent = (SplitRequestEvent) sourceEvent;
        LOG.info("Received request split event from subtask {}", Integer.valueOf(i));
        this.assigner.onCompletedSplits(splitRequestEvent.finishedSplitIds());
        this.readersAwaitingSplit.put(Integer.valueOf(i), splitRequestEvent.requesterHostname());
        assignSplits();
    }

    public void addSplitsBack(List<IcebergSourceSplit> list, int i) {
        LOG.info("Add {} splits back to the pool for failed subtask {}", Integer.valueOf(list.size()), Integer.valueOf(i));
        this.assigner.onUnassignedSplits(list);
        assignSplits();
    }

    public void addReader(int i) {
        LOG.info("Added reader: {}", Integer.valueOf(i));
    }

    private void assignSplits() {
        LOG.info("Assigning splits for {} awaiting readers", Integer.valueOf(this.readersAwaitingSplit.size()));
        Iterator<Map.Entry<Integer, String>> it = this.readersAwaitingSplit.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, String> next = it.next();
            if (this.enumeratorContext.registeredReaders().containsKey(next.getKey())) {
                int intValue = next.getKey().intValue();
                GetSplitResult next2 = this.assigner.getNext(next.getValue());
                if (next2.status() == GetSplitResult.Status.AVAILABLE) {
                    LOG.info("Assign split to subtask {}: {}", Integer.valueOf(intValue), next2.split());
                    this.enumeratorContext.assignSplit(next2.split(), intValue);
                    it.remove();
                } else {
                    if (next2.status() == GetSplitResult.Status.CONSTRAINED) {
                        getAvailableFutureIfNeeded();
                        return;
                    }
                    if (next2.status() != GetSplitResult.Status.UNAVAILABLE) {
                        throw new IllegalArgumentException("Unsupported status: " + next2.status());
                    }
                    if (shouldWaitForMoreSplits()) {
                        getAvailableFutureIfNeeded();
                        return;
                    } else {
                        LOG.info("No more splits available for subtask {}", Integer.valueOf(intValue));
                        this.enumeratorContext.signalNoMoreSplits(intValue);
                        it.remove();
                    }
                }
            } else {
                it.remove();
            }
        }
    }

    protected abstract boolean shouldWaitForMoreSplits();

    private synchronized void getAvailableFutureIfNeeded() {
        if (this.availableFuture.get() != null) {
            return;
        }
        this.availableFuture.set(this.assigner.isAvailable().thenAccept(r4 -> {
            this.enumeratorContext.runInCoordinatorThread(() -> {
                LOG.debug("Executing callback of assignSplits");
                this.availableFuture.set(null);
                assignSplits();
            });
        }));
        LOG.debug("Registered callback for future available splits");
    }
}
