/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Murmur3_32Hash;

public class HashRangeExclusiveStickyKeyConsumerSelector
implements StickyKeyConsumerSelector {
    private final int rangeSize;
    private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;

    public HashRangeExclusiveStickyKeyConsumerSelector() {
        this(65536);
    }

    public HashRangeExclusiveStickyKeyConsumerSelector(int rangeSize) {
        if (rangeSize < 1) {
            throw new IllegalArgumentException("range size must greater than 0");
        }
        this.rangeSize = rangeSize;
        this.rangeMap = new ConcurrentSkipListMap();
    }

    @Override
    public void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
        this.validateKeySharedMeta(consumer);
        for (PulsarApi.IntRange intRange : consumer.getKeySharedMeta().getHashRangesList()) {
            this.rangeMap.put(intRange.getStart(), consumer);
            this.rangeMap.put(intRange.getEnd(), consumer);
        }
    }

    @Override
    public void removeConsumer(Consumer consumer) {
        this.rangeMap.entrySet().removeIf(entry -> ((Consumer)entry.getValue()).equals(consumer));
    }

    @Override
    public Consumer select(byte[] stickyKey) {
        return this.select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
    }

    @Override
    public Map<String, List<String>> getConsumerKeyHashRanges() {
        HashMap<String, List<String>> result = new HashMap<String, List<String>>();
        Map.Entry<Integer, Consumer> prev = null;
        for (Map.Entry<Integer, Consumer> entry : this.rangeMap.entrySet()) {
            if (prev == null) {
                prev = entry;
                continue;
            }
            if (prev.getValue().equals(entry.getValue())) {
                result.computeIfAbsent(entry.getValue().consumerName(), key -> new ArrayList()).add("[" + prev.getKey() + ", " + entry.getKey() + "]");
            }
            prev = null;
        }
        return result;
    }

    Consumer select(int hash) {
        if (this.rangeMap.size() > 0) {
            Consumer floorConsumer;
            int slot = hash % this.rangeSize;
            Map.Entry<Integer, Consumer> ceilingEntry = this.rangeMap.ceilingEntry(slot);
            Map.Entry<Integer, Consumer> floorEntry = this.rangeMap.floorEntry(slot);
            Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
            Consumer consumer = floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
            if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
                return ceilingConsumer;
            }
            return null;
        }
        return null;
    }

    private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
        if (consumer.getKeySharedMeta() == null) {
            throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
        }
        List<PulsarApi.IntRange> ranges = consumer.getKeySharedMeta().getHashRangesList();
        if (ranges.isEmpty()) {
            throw new BrokerServiceException.ConsumerAssignException("Ranges for KeyShared policy must not be empty.");
        }
        for (PulsarApi.IntRange intRange : ranges) {
            if (intRange.getStart() > intRange.getEnd()) {
                throw new BrokerServiceException.ConsumerAssignException("Fixed hash range start > end");
            }
            Map.Entry<Integer, Consumer> ceilingEntry = this.rangeMap.ceilingEntry(intRange.getStart());
            Map.Entry<Integer, Consumer> floorEntry = this.rangeMap.floorEntry(intRange.getEnd());
            if (floorEntry != null && floorEntry.getKey() >= intRange.getStart()) {
                throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + floorEntry.getValue());
            }
            if (ceilingEntry != null && ceilingEntry.getKey() <= intRange.getEnd()) {
                throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
            }
            if (ceilingEntry == null || floorEntry == null || !ceilingEntry.getValue().equals(floorEntry.getValue())) continue;
            PulsarApi.KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta();
            for (PulsarApi.IntRange range : keySharedMeta.getHashRangesList()) {
                int start = Math.max(intRange.getStart(), range.getStart());
                int end = Math.min(intRange.getEnd(), range.getEnd());
                if (end < start) continue;
                throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
            }
        }
    }

    Map<Integer, Consumer> getRangeConsumer() {
        return Collections.unmodifiableMap(this.rangeMap);
    }
}

