/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingEntryReader;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.SafeRun;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentStreamingDispatcherMultipleConsumers
extends PersistentDispatcherMultipleConsumers
implements StreamingDispatcher {
    private static final Logger log = LoggerFactory.getLogger(PersistentStreamingDispatcherMultipleConsumers.class);
    private final StreamingEntryReader streamingEntryReader;

    public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
        super(topic, cursor, subscription);
        this.streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl)this.cursor, this, this.topic);
    }

    @Override
    public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
        PersistentDispatcherMultipleConsumers.ReadType readType = (PersistentDispatcherMultipleConsumers.ReadType)((Object)ctx.ctx);
        if (ctx.isLast()) {
            this.readFailureBackoff.reduceToHalf();
            if (readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
                this.havePendingRead = false;
            } else {
                this.havePendingReplayRead = false;
            }
        }
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int newReadBatchSize = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}] Increasing read batch size from {} to {}", new Object[]{this.name, this.readBatchSize, newReadBatchSize});
            }
            this.readBatchSize = newReadBatchSize;
        }
        if (this.shouldRewindBeforeReadingOrReplaying && readType == PersistentDispatcherMultipleConsumers.ReadType.Normal) {
            entry.release();
            this.cursor.rewind();
            this.shouldRewindBeforeReadingOrReplaying = false;
            this.readMoreEntries();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Distributing a messages to {} consumers", (Object)this.name, (Object)this.consumerList.size());
        }
        this.cursor.seek(((ManagedLedgerImpl)this.cursor.getManagedLedger()).getNextValidPosition((PositionImpl)entry.getPosition()));
        this.sendMessagesToConsumers(readType, Lists.newArrayList(entry));
        ctx.recycle();
    }

    @Override
    public void canReadMoreEntries(boolean withBackoff) {
        this.havePendingRead = false;
        this.topic.getBrokerService().executor().schedule(() -> this.topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(this.topic.getName(), (SafeRunnable)SafeRun.safeRun(() -> {
            PersistentStreamingDispatcherMultipleConsumers persistentStreamingDispatcherMultipleConsumers = this;
            synchronized (persistentStreamingDispatcherMultipleConsumers) {
                if (!this.havePendingRead) {
                    log.info("[{}] Scheduling read operation", (Object)this.name);
                    this.readMoreEntries();
                } else {
                    log.info("[{}] Skipping read since we have pendingRead", (Object)this.name);
                }
            }
        })), withBackoff ? this.readFailureBackoff.next() : 0L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void notifyConsumersEndOfTopic() {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0L) {
            this.consumerList.forEach(Consumer::reachedEndOfTopic);
        }
    }

    @Override
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.streamingEntryReader.cancelReadRequests()) {
            this.havePendingRead = false;
        }
    }

    @Override
    public synchronized void readMoreEntries() {
        int currentTotalAvailablePermits = this.totalAvailablePermits;
        if (currentTotalAvailablePermits > 0 && this.isAtleastOneConsumerAvailable()) {
            int messagesToRead = this.calculateNumOfMessageToRead(currentTotalAvailablePermits);
            if (-1 == messagesToRead) {
                return;
            }
            Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(messagesToRead);
            if (!messagesToReplayNow.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule replay of {} messages for {} consumers", new Object[]{this.name, messagesToReplayNow.size(), this.consumerList.size()});
                }
                this.havePendingReplayRead = true;
                Set<? extends Position> deletedMessages = this.topic.isDelayedDeliveryEnabled() ? this.asyncReplayEntriesInOrder(messagesToReplayNow) : this.asyncReplayEntries(messagesToReplayNow);
                deletedMessages.forEach(position -> this.messagesToRedeliver.remove(((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId()));
                if (messagesToReplayNow.size() - deletedMessages.size() == 0) {
                    this.havePendingReplayRead = false;
                    this.topic.getBrokerService().executor().execute(() -> this.readMoreEntries());
                }
            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 1) {
                log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", new Object[]{this.name, this.totalUnackedMessages, this.topic.getMaxUnackedMessagesOnSubscription()});
            } else if (!this.havePendingRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule read of {} messages for {} consumers", new Object[]{this.name, messagesToRead, this.consumerList.size()});
                }
                this.havePendingRead = true;
                this.streamingEntryReader.asyncReadEntries(messagesToRead, this.serviceConfig.getDispatcherMaxReadSizeBytes(), (Object)PersistentDispatcherMultipleConsumers.ReadType.Normal);
            } else {
                log.debug("[{}] Cannot schedule next read until previous one is done", (Object)this.name);
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer buffer is full, pause reading", (Object)this.name);
        }
    }
}

