package org.apache.seatunnel.connectors.seatunnel.sls.source;

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListConsumerGroupResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.sls.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.sls.state.SlsSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.class */
public class SlsSourceSplitEnumerator implements SourceSplitEnumerator<SlsSourceSplit, SlsSourceState> {
    private static final Logger log = LoggerFactory.getLogger(SlsSourceSplitEnumerator.class);
    private final Client slsCleint;
    private final ConsumerMetaData consumerMetaData;
    private final long discoveryIntervalMillis;
    private final SourceSplitEnumerator.Context<SlsSourceSplit> context;
    private SlsSourceState slsSourceState;
    private ScheduledExecutorService executor;
    private ScheduledFuture<?> scheduledFuture;
    private final Map<Integer, SlsSourceSplit> assignedSplit = new HashMap();
    private final Map<Integer, SlsSourceSplit> pendingSplit = new HashMap();

    public SlsSourceSplitEnumerator(SlsSourceConfig slsSourceConfig, SourceSplitEnumerator.Context<SlsSourceSplit> context) {
        this.context = context;
        this.slsCleint = new Client(slsSourceConfig.getEndpoint(), slsSourceConfig.getAccessKeyId(), slsSourceConfig.getAccessKeySecret());
        this.consumerMetaData = slsSourceConfig.getConsumerMetaData();
        this.discoveryIntervalMillis = slsSourceConfig.getDiscoveryIntervalMillis().longValue();
    }

    public SlsSourceSplitEnumerator(SlsSourceConfig slsSourceConfig, SourceSplitEnumerator.Context<SlsSourceSplit> context, SlsSourceState slsSourceState) {
        this.context = context;
        this.slsCleint = new Client(slsSourceConfig.getEndpoint(), slsSourceConfig.getAccessKeyId(), slsSourceConfig.getAccessKeySecret());
        this.consumerMetaData = slsSourceConfig.getConsumerMetaData();
        this.discoveryIntervalMillis = slsSourceConfig.getDiscoveryIntervalMillis().longValue();
        this.slsSourceState = slsSourceState;
        if (slsSourceState != null) {
        }
    }

    public void open() {
        if (this.discoveryIntervalMillis > 0) {
            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("sls-shard-dynamic-discovery");
                return thread;
            });
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    discoverySplits();
                } catch (Exception e) {
                    log.error("Dynamic discovery failure:", e);
                }
            }, this.discoveryIntervalMillis, this.discoveryIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void run() throws Exception {
        fetchPendingShardSplit();
        assignSplit();
    }

    public void close() throws IOException {
    }

    public void addSplitsBack(List<SlsSourceSplit> list, int i) {
        if (list.isEmpty()) {
            return;
        }
        list.forEach(slsSourceSplit -> {
            this.pendingSplit.put(slsSourceSplit.getShardId(), slsSourceSplit);
        });
    }

    public int currentUnassignedSplitSize() {
        return 0;
    }

    public void handleSplitRequest(int i) {
    }

    public void registerReader(int i) {
        if (this.pendingSplit.isEmpty()) {
            return;
        }
        assignSplit();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }

    private void discoverySplits() throws LogException {
        fetchPendingShardSplit();
        assignSplit();
    }

    private void fetchPendingShardSplit() throws LogException {
        String project = this.consumerMetaData.getProject();
        String logstore = this.consumerMetaData.getLogstore();
        String consumerGroup = this.consumerMetaData.getConsumerGroup();
        StartMode startMode = this.consumerMetaData.getStartMode();
        int fetchSize = this.consumerMetaData.getFetchSize();
        Consts.CursorMode autoCursorReset = this.consumerMetaData.getAutoCursorReset();
        this.slsCleint.ListShard(project, logstore).GetShards().forEach(shard -> {
            if (this.assignedSplit.containsKey(Integer.valueOf(shard.getShardId())) || this.pendingSplit.containsKey(Integer.valueOf(shard.getShardId()))) {
                return;
            }
            try {
                String initShardCursor = initShardCursor(project, logstore, consumerGroup, shard.getShardId(), startMode, autoCursorReset);
                if (initShardCursor.equals("")) {
                    throw new RuntimeException("shard cursor error");
                }
                this.pendingSplit.put(Integer.valueOf(shard.getShardId()), new SlsSourceSplit(project, logstore, consumerGroup, Integer.valueOf(shard.getShardId()), initShardCursor, Integer.valueOf(fetchSize)));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    private String initShardCursor(String str, String str2, String str3, int i, StartMode startMode, Consts.CursorMode cursorMode) throws Exception {
        switch (startMode) {
            case EARLIEST:
                try {
                    return this.slsCleint.GetCursor(str, str2, i, Consts.CursorMode.BEGIN).GetCursor();
                } catch (LogException e) {
                    throw new RuntimeException(e);
                }
            case LATEST:
                try {
                    return this.slsCleint.GetCursor(str, str2, i, Consts.CursorMode.END).GetCursor();
                } catch (LogException e2) {
                    throw new RuntimeException(e2);
                }
            case GROUP_CURSOR:
                try {
                    if (!checkConsumerGroupExists(str, str2, str3)) {
                        createConsumerGroup(str, str2, str3);
                    }
                    List<ConsumerGroupShardCheckPoint> checkPoints = this.slsCleint.GetCheckPoint(str, str2, str3, i).getCheckPoints();
                    if (checkPoints.size() == 1) {
                        ConsumerGroupShardCheckPoint consumerGroupShardCheckPoint = checkPoints.get(0);
                        if (!consumerGroupShardCheckPoint.getCheckPoint().equals("")) {
                            return consumerGroupShardCheckPoint.getCheckPoint();
                        }
                    }
                    return this.slsCleint.GetCursor(str, str2, i, cursorMode).GetCursor();
                } catch (LogException e3) {
                    if (e3.GetErrorCode().equals("ConsumerGroupNotExist")) {
                        return this.slsCleint.GetCursor(str, str2, i, cursorMode).GetCursor();
                    }
                    throw new RuntimeException(e3);
                }
            default:
                throw new RuntimeException(str + ":" + str2 + ":" + str3 + ":" + startMode + ":fail");
        }
    }

    private synchronized void assignSplit() {
        HashMap hashMap = new HashMap(16);
        for (int i = 0; i < this.context.currentParallelism(); i++) {
            hashMap.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ArrayList();
            });
        }
        this.pendingSplit.forEach((num2, slsSourceSplit) -> {
            if (this.assignedSplit.containsKey(num2)) {
                return;
            }
            ((List) hashMap.get(Integer.valueOf(getSplitOwner(slsSourceSplit.getShardId().intValue(), this.context.currentParallelism())))).add(slsSourceSplit);
        });
        hashMap.forEach((num3, list) -> {
            this.context.assignSplit(num3.intValue(), list);
            if (this.discoveryIntervalMillis <= 0) {
                this.context.signalNoMoreSplits(num3.intValue());
            }
        });
        this.assignedSplit.putAll(this.pendingSplit);
        this.pendingSplit.clear();
    }

    private static int getSplitOwner(int i, int i2) {
        return i % i2;
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public SlsSourceState m434snapshotState(long j) throws Exception {
        return new SlsSourceState(new HashSet(this.assignedSplit.values()));
    }

    public boolean checkConsumerGroupExists(String str, String str2, String str3) throws Exception {
        ListConsumerGroupResponse ListConsumerGroup = this.slsCleint.ListConsumerGroup(str, str2);
        if (ListConsumerGroup == null) {
            return false;
        }
        Iterator<ConsumerGroup> it = ListConsumerGroup.GetConsumerGroups().iterator();
        while (it.hasNext()) {
            if (it.next().getConsumerGroupName().equals(str3)) {
                return true;
            }
        }
        return false;
    }

    public void createConsumerGroup(String str, String str2, String str3) throws LogException {
        try {
            this.slsCleint.CreateConsumerGroup(str, str2, new ConsumerGroup(str3, 100, false));
        } catch (LogException e) {
            if ("ConsumerGroupAlreadyExist".equals(e.GetErrorCode())) {
            }
            throw e;
        }
    }
}
