package org.apache.seatunnel.connectors.seatunnel.tablestore.source;

import com.alicloud.openservices.tablestore.core.utils.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.class */
public class TableStoreDBSourceSplitEnumerator implements SourceSplitEnumerator<TableStoreDBSourceSplit, TableStoreDBSourceState> {
    private static final Logger log = LoggerFactory.getLogger(TableStoreDBSourceSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<TableStoreDBSourceSplit> enumeratorContext;
    private final Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
    private final TablestoreOptions tablestoreOptions;
    private final Object stateLock;
    private volatile boolean shouldEnumerate;

    public TableStoreDBSourceSplitEnumerator(SourceSplitEnumerator.Context<TableStoreDBSourceSplit> context, TablestoreOptions tablestoreOptions) {
        this(context, tablestoreOptions, null);
    }

    public TableStoreDBSourceSplitEnumerator(SourceSplitEnumerator.Context<TableStoreDBSourceSplit> context, TablestoreOptions tablestoreOptions, TableStoreDBSourceState tableStoreDBSourceState) {
        this.stateLock = new Object();
        this.enumeratorContext = context;
        this.tablestoreOptions = tablestoreOptions;
        this.pendingSplits = new HashMap();
        this.shouldEnumerate = tableStoreDBSourceState == null;
        if (tableStoreDBSourceState != null) {
            this.shouldEnumerate = tableStoreDBSourceState.isShouldEnumerate();
            this.pendingSplits.putAll(tableStoreDBSourceState.getPendingSplits());
        }
    }

    public void open() {
    }

    public void run() throws Exception {
        Set<Integer> registeredReaders = this.enumeratorContext.registeredReaders();
        if (this.shouldEnumerate) {
            Set<TableStoreDBSourceSplit> tableStoreDBSourceSplit = getTableStoreDBSourceSplit();
            synchronized (this.stateLock) {
                addPendingSplit(tableStoreDBSourceSplit);
                this.shouldEnumerate = false;
            }
            assignSplit(registeredReaders);
        }
    }

    private void assignSplit(Set<Integer> set) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            List<TableStoreDBSourceSplit> remove = this.pendingSplits.remove(Integer.valueOf(intValue));
            if (remove != null && !remove.isEmpty()) {
                log.info("Assign splits {} to reader {}", remove, Integer.valueOf(intValue));
                try {
                    this.enumeratorContext.assignSplit(intValue, remove);
                } catch (Exception e) {
                    log.error("Failed to assign splits {} to reader {}", new Object[]{remove, Integer.valueOf(intValue), e});
                    this.pendingSplits.put(Integer.valueOf(intValue), remove);
                }
            }
        }
    }

    private Set<TableStoreDBSourceSplit> getTableStoreDBSourceSplit() {
        HashSet hashSet = new HashSet();
        String[] split = this.tablestoreOptions.getTable().split(StringUtils.COMMA_SEPARATOR);
        for (int i = 0; i < split.length; i++) {
            hashSet.add(new TableStoreDBSourceSplit(Integer.valueOf(i), split[i], this.tablestoreOptions.getPrimaryKeys().get(i)));
        }
        return hashSet;
    }

    private void addPendingSplit(Collection<TableStoreDBSourceSplit> collection) {
        int currentParallelism = this.enumeratorContext.currentParallelism();
        for (TableStoreDBSourceSplit tableStoreDBSourceSplit : collection) {
            this.pendingSplits.computeIfAbsent(Integer.valueOf(tableStoreDBSourceSplit.getSplitId().intValue() % currentParallelism), num -> {
                return new ArrayList();
            }).add(tableStoreDBSourceSplit);
        }
    }

    public void close() throws IOException {
        throw new UnsupportedOperationException("Unimplemented method 'close'");
    }

    public void addSplitsBack(List<TableStoreDBSourceSplit> list, int i) {
        log.debug("Add back splits {} to tablestore.", list);
        if (list.isEmpty()) {
            return;
        }
        addPendingSplit(list);
        assignSplit(Collections.singleton(Integer.valueOf(i)));
        this.enumeratorContext.signalNoMoreSplits(i);
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void handleSplitRequest(int i) {
    }

    public void registerReader(int i) {
        log.debug("Register reader {} to TablestoreSplitEnumerator.", Integer.valueOf(i));
        if (this.pendingSplits.isEmpty()) {
            return;
        }
        assignSplit(Collections.singleton(Integer.valueOf(i)));
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public TableStoreDBSourceState m1101snapshotState(long j) throws Exception {
        TableStoreDBSourceState tableStoreDBSourceState;
        synchronized (this.stateLock) {
            tableStoreDBSourceState = new TableStoreDBSourceState(this.shouldEnumerate, this.pendingSplits);
        }
        return tableStoreDBSourceState;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }
}
