package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.class */
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
    private final StickyKeyConsumerSelector selector;
    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic persistentTopic, ManagedCursor managedCursor, Subscription subscription, StickyKeyConsumerSelector stickyKeyConsumerSelector) {
        super(persistentTopic, managedCursor, subscription);
        this.selector = stickyKeyConsumerSelector;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
        super.addConsumer(consumer);
        this.selector.addConsumer(consumer);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
        super.removeConsumer(consumer);
        this.selector.removeConsumer(consumer);
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected void sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.ReadType readType, List<Entry> list) {
        long j = 0;
        long j2 = 0;
        if (list.size() == 0) {
            readMoreEntries();
            return;
        }
        HashMap hashMap = new HashMap();
        for (Entry entry : list) {
            int makeHash = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % this.selector.getRangeSize();
            hashMap.putIfAbsent(Integer.valueOf(makeHash), new ArrayList());
            ((List) hashMap.get(Integer.valueOf(makeHash))).add(entry);
        }
        Iterator it = hashMap.entrySet().iterator();
        AtomicInteger atomicInteger = new AtomicInteger(hashMap.size());
        while (it.hasNext() && this.totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
            Map.Entry entry2 = (Map.Entry) it.next();
            Consumer selectByIndex = this.selector.selectByIndex(((Integer) entry2.getKey()).intValue());
            if (selectByIndex == null) {
                log.info("[{}] rewind because no available consumer found for key {} from total {}", new Object[]{this.name, entry2.getKey(), Integer.valueOf(this.consumerList.size())});
                ((List) entry2.getValue()).forEach((v0) -> {
                    v0.release();
                });
                this.cursor.rewind();
                return;
            }
            int availablePermits = selectByIndex.isWritable() ? selectByIndex.getAvailablePermits() : 1;
            if (log.isDebugEnabled() && !selectByIndex.isWritable()) {
                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", new Object[]{this.topic.getName(), this.name, selectByIndex});
            }
            int min = Math.min(((List) entry2.getValue()).size(), availablePermits);
            if (log.isDebugEnabled()) {
                log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}", new Object[]{this.name, selectByIndex.consumerName(), entry2.getKey(), Integer.valueOf(min), readType});
            }
            if (min > 0) {
                ArrayList arrayList = new ArrayList(((List) entry2.getValue()).subList(0, min));
                if (readType == PersistentDispatcherMultipleConsumers.ReadType.Replay) {
                    arrayList.forEach(entry3 -> {
                        this.messagesToRedeliver.remove(entry3.getLedgerId(), entry3.getEntryId());
                    });
                }
                SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
                EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(arrayList.size());
                filterEntriesForConsumer(arrayList, entryBatchSizes, threadLocal);
                selectByIndex.sendMessages(arrayList, entryBatchSizes, threadLocal.getTotalMessages(), threadLocal.getTotalBytes(), getRedeliveryTracker()).addListener2(future -> {
                    if (future.isSuccess() && atomicInteger.decrementAndGet() == 0) {
                        readMoreEntries();
                    }
                });
                ((List) entry2.getValue()).removeAll(arrayList);
                this.totalAvailablePermits -= threadLocal.getTotalMessages();
                j += threadLocal.getTotalMessages();
                j2 += threadLocal.getTotalBytes();
                if (((List) entry2.getValue()).size() == 0) {
                    it.remove();
                }
            }
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                this.topic.getDispatchRateLimiter().get().tryDispatchPermit(j, j2);
            }
            if (this.dispatchRateLimiter.isPresent()) {
                this.dispatchRateLimiter.get().tryDispatchPermit(j, j2);
            }
        }
        if (hashMap.size() > 0) {
            int i = 0;
            for (List list2 : hashMap.values()) {
                i += list2.size();
                list2.forEach(entry4 -> {
                    this.messagesToRedeliver.add(entry4.getLedgerId(), entry4.getEntryId());
                    entry4.release();
                });
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", this.name, Integer.valueOf(i));
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers, org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers, org.apache.pulsar.broker.service.Dispatcher
    public PulsarApi.CommandSubscribe.SubType getType() {
        return PulsarApi.CommandSubscribe.SubType.Key_Shared;
    }

    @Override // org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers
    protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> set) {
        return this.cursor.asyncReplayEntries(set, this, PersistentDispatcherMultipleConsumers.ReadType.Replay, true);
    }
}
