package org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator;

import com.mongodb.internal.connection.ConcurrentPool;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.class */
public class MongodbSplitEnumerator implements SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> {
    private static final Logger log = LoggerFactory.getLogger(MongodbSplitEnumerator.class);
    private final ArrayList<MongoSplit> pendingSplits;
    private final SourceSplitEnumerator.Context<MongoSplit> context;
    private final MongodbClientProvider clientProvider;
    private final MongoSplitStrategy strategy;

    public MongodbSplitEnumerator(SourceSplitEnumerator.Context<MongoSplit> context, MongodbClientProvider mongodbClientProvider, MongoSplitStrategy mongoSplitStrategy) {
        this(context, mongodbClientProvider, mongoSplitStrategy, Collections.emptyList());
    }

    public MongodbSplitEnumerator(SourceSplitEnumerator.Context<MongoSplit> context, MongodbClientProvider mongodbClientProvider, MongoSplitStrategy mongoSplitStrategy, List<MongoSplit> list) {
        this.pendingSplits = Lists.newArrayList();
        this.context = context;
        this.clientProvider = mongodbClientProvider;
        this.strategy = mongoSplitStrategy;
        this.pendingSplits.addAll(list);
    }

    public void open() {
    }

    public synchronized void run() {
        log.info("Starting MongoSplitEnumerator.");
        Set registeredReaders = this.context.registeredReaders();
        this.pendingSplits.addAll(this.strategy.split());
        log.info("Added {} pending splits for namespace {}.", Integer.valueOf(this.pendingSplits.size()), this.clientProvider.getDefaultCollection().getNamespace().getFullName());
        assignSplits(registeredReaders);
    }

    public void close() {
        if (this.clientProvider != null) {
            this.clientProvider.close();
        }
    }

    public void addSplitsBack(List<MongoSplit> list, int i) {
        if (list != null) {
            log.info("Received {} split(s) back from subtask {}.", Integer.valueOf(list.size()), Integer.valueOf(i));
            this.pendingSplits.addAll(list);
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void handleSplitRequest(int i) {
        throw new MongodbConnectorException(CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", Integer.valueOf(i)));
    }

    public void registerReader(int i) {
        log.debug("Register reader {} to MongodbSplitEnumerator.", Integer.valueOf(i));
        if (this.pendingSplits.isEmpty()) {
            return;
        }
        assignSplits(Collections.singletonList(Integer.valueOf(i)));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public ArrayList<MongoSplit> m193snapshotState(long j) {
        return this.pendingSplits;
    }

    public void notifyCheckpointComplete(long j) {
    }

    private synchronized void assignSplits(Collection<Integer> collection) {
        log.debug("Assign pendingSplits to readers {}", collection);
        int size = collection.size();
        Map map = (Map) this.pendingSplits.stream().collect(Collectors.groupingBy(mongoSplit -> {
            return Integer.valueOf(getSplitOwner(mongoSplit.splitId(), size));
        }));
        collection.forEach(num -> {
            assignSplitsToSubtask(num, map);
        });
        this.pendingSplits.clear();
        SourceSplitEnumerator.Context<MongoSplit> context = this.context;
        context.getClass();
        collection.forEach((v1) -> {
            r1.signalNoMoreSplits(v1);
        });
    }

    private void assignSplitsToSubtask(Integer num, Map<Integer, List<MongoSplit>> map) {
        log.info("Received split request from taskId {}.", num);
        List<MongoSplit> orDefault = map.getOrDefault(num, Collections.emptyList());
        this.context.assignSplit(num.intValue(), orDefault);
        log.info("Assigned {} splits to subtask {}, remaining splits: {}.", new Object[]{Integer.valueOf(orDefault.size()), num, Integer.valueOf(this.pendingSplits.size() - orDefault.size())});
    }

    private static int getSplitOwner(String str, int i) {
        return (str.hashCode() & ConcurrentPool.INFINITE_SIZE) % i;
    }
}
