package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.class */
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
    private final boolean allowOutOfOrderDelivery;
    private final StickyKeyConsumerSelector selector;
    private boolean isDispatcherStuckOnReplays;
    private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal
        public Map<Consumer, List<Entry>> initialValue() throws Exception {
            return new HashMap();
        }
    };
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription, ServiceConfiguration serviceConfiguration, PulsarApi.KeySharedMeta keySharedMeta) {
        super(persistentTopic, managedCursor, subscription);
        this.isDispatcherStuckOnReplays = false;
        this.allowOutOfOrderDelivery = keySharedMeta.getAllowOutOfOrderDelivery();
        this.recentlyJoinedConsumers = this.allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();
        switch (keySharedMeta.getKeySharedMode()) {
            case AUTO_SPLIT:
                if (serviceConfiguration.isSubscriptionKeySharedUseConsistentHashing()) {
                    this.selector = new ConsistentHashingStickyKeyConsumerSelector(serviceConfiguration.getSubscriptionKeySharedConsistentHashingReplicaPoints());
                    return;
                } else {
                    this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
                    return;
                }
            case STICKY:
                this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
                return;
            default:
                throw new IllegalArgumentException("Invalid key-shared mode: " + keySharedMeta.getKeySharedMode());
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        this.selector.addConsumer(consumer);
        if (this.allowOutOfOrderDelivery || this.consumerList.size() <= 1 || this.cursor.getNumberOfEntriesSinceFirstNotAckedMessage() <= 1) {
            return;
        }
        this.recentlyJoinedConsumers.put(consumer, (PositionImpl) this.cursor.getReadPosition());
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        super.removeConsumer(consumer);
        this.selector.removeConsumer(consumer);
        this.recentlyJoinedConsumers.remove(consumer);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected void sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> list) {
        long j = 0;
        long j2 = 0;
        int size = list.size();
        if (size == 0) {
            readMoreEntries();
            return;
        }
        if (this.consumerSet.isEmpty()) {
            list.forEach((v0) -> {
                v0.release();
            });
            this.cursor.rewind();
            return;
        }
        Map<Consumer, List<Entry>> map = localGroupedEntries.get();
        map.clear();
        for (int i = 0; i < size; i++) {
            Entry entry = list.get(i);
            map.computeIfAbsent(this.selector.select(peekStickyKey(entry.getDataBuffer())), consumer -> {
                return new ArrayList();
            }).add(entry);
        }
        AtomicInteger atomicInteger = new AtomicInteger(map.size());
        for (Map.Entry<Consumer, List<Entry>> entry2 : map.entrySet()) {
            Consumer key = entry2.getKey();
            List<Entry> value = entry2.getValue();
            int size2 = value.size();
            int restrictedMaxEntriesForConsumer = getRestrictedMaxEntriesForConsumer(key, value, Math.min(size2, key.getAvailablePermits()));
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} with messages num {}, read type is {}", new Object[]{this.name, key.consumerName(), Integer.valueOf(restrictedMaxEntriesForConsumer), readType});
            }
            if (restrictedMaxEntriesForConsumer < size2) {
                for (int i2 = restrictedMaxEntriesForConsumer; i2 < size2; i2++) {
                    Entry entry3 = value.get(i2);
                    this.messagesToRedeliver.add(entry3.getLedgerId(), entry3.getEntryId());
                    entry3.release();
                    value.set(i2, null);
                }
            }
            if (restrictedMaxEntriesForConsumer > 0) {
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    for (int i3 = 0; i3 < restrictedMaxEntriesForConsumer; i3++) {
                        Entry entry4 = value.get(i3);
                        this.messagesToRedeliver.remove(entry4.getLedgerId(), entry4.getEntryId());
                    }
                }
                SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
                EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(restrictedMaxEntriesForConsumer);
                EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get();
                filterEntriesForConsumer(value, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor);
                key.sendMessages(value, entryBatchSizes, entryBatchIndexesAcks, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), threadLocal.getTotalChunkedMessages(), getRedeliveryTracker()).addListener2(future -> {
                    if (future.isSuccess() && atomicInteger.decrementAndGet() == 0) {
                        readMoreEntries();
                    }
                });
                TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(threadLocal.getTotalMessages() - entryBatchIndexesAcks.getTotalAckedIndexCount()));
                j += threadLocal.getTotalMessages();
                j2 += threadLocal.getTotalBytes();
            }
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(j, j2);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                this.dispatchRateLimiter.get().tryDispatchPermit(j, j2);
            }
        }
        if (j == 0 && this.recentlyJoinedConsumers.isEmpty()) {
            this.isDispatcherStuckOnReplays = true;
            readMoreEntries();
        }
    }

    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> list, int i) {
        if (i == 0) {
            return 0;
        }
        PositionImpl positionImpl = this.recentlyJoinedConsumers.get(consumer);
        if (positionImpl == null) {
            return i;
        }
        if (positionImpl.compareTo(((PositionImpl) this.cursor.getMarkDeletedPosition()).getNext()) <= 0) {
            this.recentlyJoinedConsumers.remove(consumer);
            return i;
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (((PositionImpl) list.get(i2).getPosition()).compareTo(positionImpl) >= 0) {
                return i2;
            }
        }
        return i;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public synchronized void acknowledgementWasProcessed() {
        if (this.recentlyJoinedConsumers.isEmpty()) {
            return;
        }
        readMoreEntries();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    public synchronized Set<PositionImpl> getMessagesToReplayNow(int i) {
        if (!this.isDispatcherStuckOnReplays) {
            return super.getMessagesToReplayNow(i);
        }
        this.isDispatcherStuckOnReplays = false;
        return Collections.emptySet();
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Key_Shared;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }
}
