package org.apache.seatunnel.connectors.doris.source.split;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition;
import org.apache.seatunnel.connectors.doris.rest.RestService;
import org.apache.seatunnel.connectors.doris.source.DorisSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.class */
public class DorisSourceSplitEnumerator implements SourceSplitEnumerator<DorisSourceSplit, DorisSourceState> {
    private static final Logger log = LoggerFactory.getLogger(DorisSourceSplitEnumerator.class);
    private SourceSplitEnumerator.Context<DorisSourceSplit> context;
    private DorisConfig dorisConfig;
    private volatile boolean shouldEnumerate;
    private final Map<Integer, List<DorisSourceSplit>> pendingSplit;
    private SeaTunnelRowType seaTunnelRowType;
    private final Object stateLock;

    public DorisSourceSplitEnumerator(SourceSplitEnumerator.Context<DorisSourceSplit> context, DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
        this(context, dorisConfig, seaTunnelRowType, null);
    }

    public DorisSourceSplitEnumerator(SourceSplitEnumerator.Context<DorisSourceSplit> context, DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType, DorisSourceState dorisSourceState) {
        this.stateLock = new Object();
        this.context = context;
        this.dorisConfig = dorisConfig;
        this.seaTunnelRowType = seaTunnelRowType;
        this.pendingSplit = new ConcurrentHashMap();
        this.shouldEnumerate = dorisSourceState == null;
        if (dorisSourceState != null) {
            this.shouldEnumerate = dorisSourceState.isShouldEnumerate();
            this.pendingSplit.putAll(dorisSourceState.getPendingSplit());
        }
    }

    public void open() {
    }

    public void close() throws IOException {
    }

    public void run() {
        Set registeredReaders = this.context.registeredReaders();
        if (this.shouldEnumerate) {
            List<DorisSourceSplit> dorisSourceSplit = getDorisSourceSplit();
            synchronized (this.stateLock) {
                addPendingSplit(dorisSourceSplit);
                this.shouldEnumerate = false;
                assignSplit(registeredReaders);
            }
        }
        log.debug("No more splits to assign. Sending NoMoreSplitsEvent to reader {}.", registeredReaders);
        SourceSplitEnumerator.Context<DorisSourceSplit> context = this.context;
        Objects.requireNonNull(context);
        registeredReaders.forEach((v1) -> {
            r1.signalNoMoreSplits(v1);
        });
    }

    public void addSplitsBack(List<DorisSourceSplit> list, int i) {
        log.debug("Add back splits {} to DorisSourceSplitEnumerator.", list);
        if (list.isEmpty()) {
            return;
        }
        synchronized (this.stateLock) {
            addPendingSplit(list);
            if (this.context.registeredReaders().contains(Integer.valueOf(i))) {
                assignSplit(Collections.singletonList(Integer.valueOf(i)));
            } else {
                log.warn("Reader {} is not registered. Pending splits {} are not assigned.", Integer.valueOf(i), list);
            }
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void handleSplitRequest(int i) {
        throw new DorisConnectorException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", Integer.valueOf(i)));
    }

    public void registerReader(int i) {
        log.debug("Register reader {} to DorisSourceSplitEnumerator.", Integer.valueOf(i));
        if (this.pendingSplit.isEmpty()) {
            return;
        }
        synchronized (this.stateLock) {
            assignSplit(Collections.singletonList(Integer.valueOf(i)));
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public DorisSourceState m1005snapshotState(long j) {
        DorisSourceState dorisSourceState;
        synchronized (this.stateLock) {
            dorisSourceState = new DorisSourceState(this.shouldEnumerate, this.pendingSplit);
        }
        return dorisSourceState;
    }

    public void notifyCheckpointComplete(long j) {
    }

    private List<DorisSourceSplit> getDorisSourceSplit() {
        ArrayList arrayList = new ArrayList();
        for (PartitionDefinition partitionDefinition : RestService.findPartitions(this.seaTunnelRowType, this.dorisConfig, log)) {
            arrayList.add(new DorisSourceSplit(partitionDefinition, String.valueOf(partitionDefinition.hashCode())));
        }
        return arrayList;
    }

    private void addPendingSplit(Collection<DorisSourceSplit> collection) {
        int currentParallelism = this.context.currentParallelism();
        for (DorisSourceSplit dorisSourceSplit : collection) {
            int splitOwner = getSplitOwner(dorisSourceSplit.splitId(), currentParallelism);
            log.info("Assigning split {} to reader {} .", dorisSourceSplit.splitId(), Integer.valueOf(splitOwner));
            this.pendingSplit.computeIfAbsent(Integer.valueOf(splitOwner), num -> {
                return new ArrayList();
            }).add(dorisSourceSplit);
        }
    }

    private static int getSplitOwner(String str, int i) {
        return (str.hashCode() & Integer.MAX_VALUE) % i;
    }

    private void assignSplit(Collection<Integer> collection) {
        for (Integer num : collection) {
            List<DorisSourceSplit> remove = this.pendingSplit.remove(num);
            if (remove != null && !remove.isEmpty()) {
                log.debug("Assign splits {} to reader {}", remove, num);
                this.context.assignSplit(num.intValue(), remove);
            }
        }
    }
}
