package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.cli.HelpFormatter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.class */
public class RocketMqSourceSplitEnumerator implements SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> {
    private static final Logger log = LoggerFactory.getLogger(RocketMqSourceSplitEnumerator.class);
    private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60000;
    private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
    private final ConsumerMetadata metadata;
    private final SourceSplitEnumerator.Context<RocketMqSourceSplit> context;
    private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
    private ScheduledExecutorService executor;
    private ScheduledFuture scheduledFuture;
    private long discoveryIntervalMillis;

    public RocketMqSourceSplitEnumerator(ConsumerMetadata consumerMetadata, SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
        this.metadata = consumerMetadata;
        this.context = context;
        this.assignedSplit = new HashMap();
        this.pendingSplit = new HashMap();
    }

    public RocketMqSourceSplitEnumerator(ConsumerMetadata consumerMetadata, SourceSplitEnumerator.Context<RocketMqSourceSplit> context, long j) {
        this(consumerMetadata, context);
        this.discoveryIntervalMillis = j;
    }

    private static int getSplitOwner(MessageQueue messageQueue, int i) {
        return ((((messageQueue.getQueueId() * 31) & Integer.MAX_VALUE) % i) + messageQueue.getQueueId()) % i;
    }

    public void open() {
        this.discoveryIntervalMillis = this.discoveryIntervalMillis > 0 ? this.discoveryIntervalMillis : 60000L;
        if (this.discoveryIntervalMillis > 0) {
            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
                Thread thread = new Thread(runnable);
                thread.setDaemon(true);
                thread.setName("RocketMq-messageQueue-dynamic-discovery");
                return thread;
            });
            this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                try {
                    discoverySplits();
                } catch (Exception e) {
                    log.error("Dynamic discovery failure:", e);
                }
            }, this.discoveryIntervalMillis, this.discoveryIntervalMillis, TimeUnit.MILLISECONDS);
        }
    }

    public void run() throws Exception {
        fetchPendingPartitionSplit();
        setPartitionStartOffset();
        assignSplit();
    }

    public void close() throws IOException {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            if (this.executor != null) {
                this.executor.shutdownNow();
            }
        }
    }

    public void addSplitsBack(List<RocketMqSourceSplit> list, int i) {
        if (list.isEmpty()) {
            return;
        }
        this.pendingSplit.putAll(convertToNextSplit(list));
        assignSplit();
    }

    private Map<MessageQueue, ? extends RocketMqSourceSplit> convertToNextSplit(List<RocketMqSourceSplit> list) {
        try {
            Map<MessageQueue, Long> listOffsets = listOffsets((Collection) list.stream().map((v0) -> {
                return v0.getMessageQueue();
            }).collect(Collectors.toList()), ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            list.forEach(rocketMqSourceSplit -> {
                rocketMqSourceSplit.setStartOffset(rocketMqSourceSplit.getEndOffset() + 1);
                rocketMqSourceSplit.setEndOffset(((Long) listOffsets.get(rocketMqSourceSplit.getMessageQueue())).longValue());
            });
            return (Map) list.stream().collect(Collectors.toMap(rocketMqSourceSplit2 -> {
                return rocketMqSourceSplit2.getMessageQueue();
            }, rocketMqSourceSplit3 -> {
                return rocketMqSourceSplit3;
            }));
        } catch (Exception e) {
            throw new RocketMqConnectorException(RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplit.size();
    }

    public void handleSplitRequest(int i) {
    }

    public void registerReader(int i) {
        if (this.pendingSplit.isEmpty()) {
            return;
        }
        assignSplit();
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public RocketMqSourceState m1313snapshotState(long j) throws Exception {
        return new RocketMqSourceState((Set) this.assignedSplit.values().stream().collect(Collectors.toSet()));
    }

    public void notifyCheckpointComplete(long j) throws Exception {
    }

    private void discoverySplits() {
        fetchPendingPartitionSplit();
        assignSplit();
    }

    private void fetchPendingPartitionSplit() {
        getTopicInfo().forEach(rocketMqSourceSplit -> {
            if (this.assignedSplit.containsKey(rocketMqSourceSplit.getMessageQueue()) || this.pendingSplit.containsKey(rocketMqSourceSplit.getMessageQueue())) {
                return;
            }
            this.pendingSplit.put(rocketMqSourceSplit.getMessageQueue(), rocketMqSourceSplit);
        });
    }

    private Set<RocketMqSourceSplit> getTopicInfo() {
        log.info("Configured topics: {}", this.metadata.getTopics());
        List<Map<MessageQueue, TopicOffset>> offsetTopics = RocketMqAdminUtil.offsetTopics(this.metadata.getBaseConfig(), this.metadata.getTopics());
        Set<RocketMqSourceSplit> newConcurrentHashSet = Sets.newConcurrentHashSet();
        offsetTopics.forEach(map -> {
            map.forEach((messageQueue, topicOffset) -> {
                newConcurrentHashSet.add(new RocketMqSourceSplit(messageQueue, topicOffset.getMinOffset(), topicOffset.getMaxOffset()));
            });
        });
        return newConcurrentHashSet;
    }

    private void setPartitionStartOffset() throws MQClientException {
        Map<MessageQueue, Long> specificStartOffsets;
        Set<MessageQueue> keySet = this.pendingSplit.keySet();
        switch (this.metadata.getStartMode()) {
            case CONSUME_FROM_FIRST_OFFSET:
                specificStartOffsets = listOffsets(keySet, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                break;
            case CONSUME_FROM_LAST_OFFSET:
                specificStartOffsets = listOffsets(keySet, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                break;
            case CONSUME_FROM_TIMESTAMP:
                specificStartOffsets = listOffsets(keySet, ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
                break;
            case CONSUME_FROM_GROUP_OFFSETS:
                specificStartOffsets = listConsumerGroupOffsets(keySet);
                if (specificStartOffsets.isEmpty()) {
                    specificStartOffsets = listOffsets(keySet, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                    break;
                }
                break;
            case CONSUME_FROM_SPECIFIC_OFFSETS:
                specificStartOffsets = this.metadata.getSpecificStartOffsets();
                setMessageQueueBroker(keySet, specificStartOffsets);
                break;
            default:
                throw new RocketMqConnectorException(RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR, this.metadata.getStartMode().name());
        }
        specificStartOffsets.entrySet().forEach(entry -> {
            if (this.pendingSplit.containsKey(entry.getKey())) {
                this.pendingSplit.get(entry.getKey()).setStartOffset(((Long) entry.getValue()).longValue());
            }
        });
    }

    private void setMessageQueueBroker(Collection<MessageQueue> collection, Map<MessageQueue, Long> map) {
        Map map2 = (Map) collection.stream().collect(Collectors.toMap(messageQueue -> {
            return messageQueue.getTopic() + HelpFormatter.DEFAULT_OPT_PREFIX + messageQueue.getQueueId();
        }, (v0) -> {
            return v0.getBrokerName();
        }));
        for (MessageQueue messageQueue2 : map.keySet()) {
            String str = messageQueue2.getTopic() + HelpFormatter.DEFAULT_OPT_PREFIX + messageQueue2.getQueueId();
            if (map2.containsKey(str)) {
                messageQueue2.setBrokerName((String) map2.get(str));
            }
        }
    }

    private Map<MessageQueue, Long> listOffsets(Collection<MessageQueue> collection, ConsumeFromWhere consumeFromWhere) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        Map<MessageQueue, TopicOffset> flatOffsetTopics = RocketMqAdminUtil.flatOffsetTopics(this.metadata.getBaseConfig(), this.metadata.getTopics());
        switch (consumeFromWhere) {
            case CONSUME_FROM_FIRST_OFFSET:
                collection.forEach(messageQueue -> {
                    newConcurrentMap.put(messageQueue, Long.valueOf(((TopicOffset) flatOffsetTopics.get(messageQueue)).getMinOffset()));
                });
                break;
            case CONSUME_FROM_LAST_OFFSET:
                collection.forEach(messageQueue2 -> {
                    newConcurrentMap.put(messageQueue2, Long.valueOf(((TopicOffset) flatOffsetTopics.get(messageQueue2)).getMaxOffset()));
                });
                break;
            case CONSUME_FROM_TIMESTAMP:
                newConcurrentMap.putAll(RocketMqAdminUtil.searchOffsetsByTimestamp(this.metadata.getBaseConfig(), collection, this.metadata.getStartOffsetsTimestamp()));
                break;
        }
        return newConcurrentMap;
    }

    public Map<MessageQueue, Long> listConsumerGroupOffsets(Collection<MessageQueue> collection) {
        return RocketMqAdminUtil.currentOffsets(this.metadata.getBaseConfig(), this.metadata.getTopics(), new HashSet(collection));
    }

    private synchronized void assignSplit() {
        HashMap hashMap = new HashMap(16);
        for (int i = 0; i < this.context.currentParallelism(); i++) {
            hashMap.computeIfAbsent(Integer.valueOf(i), num -> {
                return new ArrayList();
            });
        }
        this.pendingSplit.entrySet().forEach(entry -> {
            if (this.assignedSplit.containsKey(entry.getKey())) {
                return;
            }
            ((List) hashMap.get(Integer.valueOf(getSplitOwner((MessageQueue) entry.getKey(), this.context.currentParallelism())))).add(entry.getValue());
        });
        SourceSplitEnumerator.Context<RocketMqSourceSplit> context = this.context;
        context.getClass();
        hashMap.forEach((v1, v2) -> {
            r1.assignSplit(v1, v2);
        });
        this.assignedSplit.putAll(this.pendingSplit);
        this.pendingSplit.clear();
    }
}
