/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.EntryWrapper;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableCollection;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.prometheus.client.Gauge;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.PositionAckSetUtil;
import org.apache.pulsar.shade.org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.shade.org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBaseDispatcher
implements Dispatcher {
    private static final Logger log = LoggerFactory.getLogger(AbstractBaseDispatcher.class);
    protected final Subscription subscription;
    private static final Gauge PENDING_BYTES_TO_DISPATCH = (Gauge)((Gauge.Builder)((Gauge.Builder)Gauge.build().name("pulsar_broker_pending_bytes_to_dispatch")).help("Amount of bytes loaded in memory to be dispatched to Consumers")).register();
    protected final ServiceConfiguration serviceConfig;
    protected final boolean dispatchThrottlingOnBatchMessageEnabled;
    protected ImmutableList<EntryFilterWithClassLoader> entryFilters;
    protected final FilterContext filterContext;

    protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
        this.subscription = subscription;
        this.serviceConfig = serviceConfig;
        this.dispatchThrottlingOnBatchMessageEnabled = serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
        if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic().getBrokerService().getEntryFilters())) {
            this.entryFilters = ((ImmutableCollection)subscription.getTopic().getBrokerService().getEntryFilters().values()).asList();
            this.filterContext = new FilterContext();
        } else {
            this.entryFilters = ImmutableList.of();
            this.filterContext = FilterContext.FILTER_CONTEXT_DISABLED;
        }
    }

    protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<Entry> entries) {
        int totalMessages = 0;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            EntryWrapper entryWrapper;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, this.subscription.toString(), -1L);
            entryWrappers[i] = entryWrapper = EntryWrapper.get(entry, msgMetadata);
            int batchSize = msgMetadata.getNumMessagesInBatch();
            totalMessages += batchSize;
        }
        return totalMessages;
    }

    public int filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
        return this.filterEntriesForConsumer(null, 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor, isReplayRead, consumer);
    }

    public int filterEntriesForConsumer(@Nullable EntryWrapper[] entryWrapper, int entryWrapperOffset, List<? extends Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead, Consumer consumer) {
        int totalMessages = 0;
        long totalBytes = 0L;
        int totalChunkedMessages = 0;
        int totalEntries = 0;
        ArrayList<Position> entriesToFiltered = CollectionUtils.isNotEmpty(this.entryFilters) ? new ArrayList<Position>() : null;
        ArrayList<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(this.entryFilters) ? new ArrayList<PositionImpl>() : null;
        int entriesSize = entries.size();
        for (int i = 0; i < entriesSize; ++i) {
            BrokerInterceptor interceptor;
            Entry entry = entries.get(i);
            if (entry == null) continue;
            ByteBuf metadataAndPayload = entry.getDataBuffer();
            int entryWrapperIndex = i + entryWrapperOffset;
            MessageMetadata msgMetadata = entryWrapper != null && entryWrapper[entryWrapperIndex] != null ? entryWrapper[entryWrapperIndex].getMetadata() : null;
            msgMetadata = msgMetadata == null ? Commands.peekMessageMetadata(metadataAndPayload, this.subscription.toString(), -1L) : msgMetadata;
            EntryFilter.FilterResult filterResult = EntryFilter.FilterResult.ACCEPT;
            if (CollectionUtils.isNotEmpty(this.entryFilters)) {
                this.fillContext(this.filterContext, msgMetadata, this.subscription, consumer);
                filterResult = AbstractBaseDispatcher.getFilterResult(this.filterContext, entry, this.entryFilters);
                if (filterResult == EntryFilter.FilterResult.REJECT) {
                    entriesToFiltered.add(entry.getPosition());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
                    entriesToRedeliver.add((PositionImpl)entry.getPosition());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            }
            if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
                if (Markers.isTxnMarker(msgMetadata)) {
                    this.individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
                if (((PersistentTopic)this.subscription.getTopic()).isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
                    this.individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                    entries.set(i, null);
                    entry.release();
                    continue;
                }
            }
            if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
                PositionImpl pos = (PositionImpl)entry.getPosition();
                if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
                    this.processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
                }
                entries.set(i, null);
                entry.release();
                this.individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
                continue;
            }
            if (this.trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
                entries.set(i, null);
                entry.release();
                continue;
            }
            ++totalEntries;
            int batchSize = msgMetadata.getNumMessagesInBatch();
            totalMessages += batchSize;
            totalBytes += (long)metadataAndPayload.readableBytes();
            totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
            batchSizes.setBatchSize(i, batchSize);
            long[] ackSet = null;
            if (indexesAcks != null && cursor != null) {
                PositionImpl positionInPendingAck;
                PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                ackSet = cursor.getDeletedBatchIndexesAsLongArray(position);
                if (this.subscription instanceof PersistentSubscription && ((PersistentSubscription)this.subscription).getPendingAckHandle() instanceof PendingAckHandleImpl && (positionInPendingAck = ((PersistentSubscription)this.subscription).getPositionInPendingAck(position)) != null) {
                    if (positionInPendingAck.hasAckSet()) {
                        if (PositionAckSetUtil.isAckSetEmpty(ackSet = ackSet != null ? PositionAckSetUtil.andAckSet(ackSet, positionInPendingAck.getAckSet()) : positionInPendingAck.getAckSet())) {
                            entries.set(i, null);
                            entry.release();
                            continue;
                        }
                    } else {
                        entries.set(i, null);
                        entry.release();
                        continue;
                    }
                }
                if (ackSet != null) {
                    indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
                } else {
                    indexesAcks.setIndexesAcks(i, null);
                }
            }
            if (null == (interceptor = this.subscription.interceptor())) continue;
            interceptor.beforeSendMessage(this.subscription, entry, ackSet, msgMetadata);
        }
        if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
            this.subscription.acknowledgeMessage(entriesToFiltered, CommandAck.AckType.Individual, Collections.emptyMap());
            int filtered = entriesToFiltered.size();
            Topic topic = this.subscription.getTopic();
            if (topic instanceof AbstractTopic) {
                ((AbstractTopic)topic).addFilteredEntriesCount(filtered);
            }
        }
        if (CollectionUtils.isNotEmpty(entriesToRedeliver)) {
            this.subscription.getTopic().getBrokerService().getPulsar().getExecutor().schedule(() -> this.subscription.redeliverUnacknowledgedMessages(consumer, entriesToRedeliver), (long)this.serviceConfig.getDispatcherEntryFilterRescheduledMessageDelay(), TimeUnit.MILLISECONDS);
        }
        sendMessageInfo.setTotalMessages(totalMessages);
        sendMessageInfo.setTotalBytes(totalBytes);
        sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
        return totalEntries;
    }

    private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) {
        if (!(this.subscription instanceof CompactorSubscription)) {
            this.subscription.acknowledgeMessage(Collections.singletonList(position), CommandAck.AckType.Individual, properties);
        }
    }

    private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry, ImmutableList<EntryFilterWithClassLoader> entryFilters) {
        for (EntryFilter entryFilter : entryFilters) {
            EntryFilter.FilterResult filterResult = entryFilter.filterEntry(entry, filterContext);
            if (filterResult == null) {
                filterResult = EntryFilter.FilterResult.ACCEPT;
            }
            if (filterResult == EntryFilter.FilterResult.ACCEPT) continue;
            return filterResult;
        }
        return EntryFilter.FilterResult.ACCEPT;
    }

    private void fillContext(FilterContext context, MessageMetadata msgMetadata, Subscription subscription, Consumer consumer) {
        context.reset();
        context.setMsgMetadata(msgMetadata);
        context.setSubscription(subscription);
        context.setConsumer(consumer);
    }

    protected abstract boolean isConsumersExceededOnSubscription();

    protected boolean isConsumersExceededOnSubscription(AbstractTopic topic, int consumerSize) {
        if (topic.isSystemTopic()) {
            return false;
        }
        Integer maxConsumersPerSubscription = topic.getHierarchyTopicPolicies().getMaxConsumersPerSubscription().get();
        return maxConsumersPerSubscription != null && maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerSize;
    }

    private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
        Commands.skipMessageMetadata(headersAndPayload);
        try {
            ReplicatedSubscriptionsSnapshot snapshot = Markers.parseReplicatedSubscriptionsSnapshot(headersAndPayload);
            this.subscription.processReplicatedSubscriptionSnapshot(snapshot);
        }
        catch (Throwable t) {
            log.warn("Failed to process replicated subscription snapshot at {} -- {}", new Object[]{pos, t.getMessage(), t});
            return;
        }
    }

    @Override
    public void resetCloseFuture() {
    }

    protected abstract void reScheduleRead();

    protected boolean reachDispatchRateLimit(DispatchRateLimiter dispatchRateLimiter) {
        if (dispatchRateLimiter.isDispatchRateLimitingEnabled() && !dispatchRateLimiter.hasMessageDispatchPermit()) {
            this.reScheduleRead();
            return true;
        }
        return false;
    }

    protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter, int messagesToRead, long bytesToRead) {
        return AbstractBaseDispatcher.computeReadLimits(messagesToRead, (int)dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(), bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
    }

    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg, long bytesToRead, long availablePermitsOnByte) {
        if (availablePermitsOnMsg > 0) {
            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
        }
        if (availablePermitsOnByte > 0L) {
            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
        }
        return Pair.of(messagesToRead, bytesToRead);
    }

    protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
        return Commands.peekStickyKey(metadataAndPayload, this.subscription.getTopicName(), this.subscription.getName());
    }

    protected final void updatePendingBytesToDispatch(long size) {
        PENDING_BYTES_TO_DISPATCH.inc(size);
    }
}

