package org.apache.pulsar.client.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.HandlerBase;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Iterables;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.pulsar.checksum.utils.Crc32cChecksum;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.shade.org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/client/impl/ConsumerImpl.class */
public class ConsumerImpl extends ConsumerBase {
    private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
    private final long consumerId;
    private volatile int availablePermits;
    private MessageIdImpl lastDequeuedMessage;
    private long subscribeTimeout;
    private final int partitionIndex;
    private final int receiverQueueRefillThreshold;
    private final CompressionCodecProvider codecProvider;
    private volatile boolean waitingOnReceiveForZeroQueueSize;
    private final ReadWriteLock lock;
    private final ReadWriteLock zeroQueueLock;
    private final UnAckedMessageTracker unAckedMessageTracker;
    private final ConcurrentNavigableMap<MessageIdImpl, BitSet> batchMessageAckTracker;
    protected final ConsumerStats stats;
    private final int priorityLevel;
    private final SubscriptionMode subscriptionMode;
    private BatchMessageIdImpl startMessageId;
    private volatile boolean hasReachedEndOfTopic;
    private MessageCrypto msgCrypto;
    private final Map<String, String> metadata;
    private final boolean readCompacted;
    private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits");
    private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/client/impl/ConsumerImpl$SubscriptionMode.class */
    public enum SubscriptionMode {
        Durable,
        NonDurable
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, String str2, ConsumerConfiguration consumerConfiguration, ExecutorService executorService, int i, CompletableFuture<Consumer> completableFuture) {
        this(pulsarClientImpl, str, str2, consumerConfiguration, executorService, i, completableFuture, SubscriptionMode.Durable, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerImpl(PulsarClientImpl pulsarClientImpl, String str, String str2, ConsumerConfiguration consumerConfiguration, ExecutorService executorService, int i, CompletableFuture<Consumer> completableFuture, SubscriptionMode subscriptionMode, MessageId messageId) {
        super(pulsarClientImpl, str, str2, consumerConfiguration, consumerConfiguration.getReceiverQueueSize(), executorService, completableFuture);
        this.availablePermits = 0;
        this.waitingOnReceiveForZeroQueueSize = false;
        this.lock = new ReentrantReadWriteLock();
        this.msgCrypto = null;
        this.consumerId = pulsarClientImpl.newConsumerId();
        this.subscriptionMode = subscriptionMode;
        this.startMessageId = messageId != null ? new BatchMessageIdImpl((MessageIdImpl) messageId) : null;
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
        this.subscribeTimeout = System.currentTimeMillis() + pulsarClientImpl.getConfiguration().getOperationTimeoutMs();
        this.partitionIndex = i;
        this.receiverQueueRefillThreshold = consumerConfiguration.getReceiverQueueSize() / 2;
        this.codecProvider = new CompressionCodecProvider();
        this.priorityLevel = consumerConfiguration.getPriorityLevel();
        this.batchMessageAckTracker = new ConcurrentSkipListMap();
        this.readCompacted = consumerConfiguration.getReadCompacted();
        if (pulsarClientImpl.getConfiguration().getStatsIntervalSeconds() > 0) {
            this.stats = new ConsumerStats(pulsarClientImpl, consumerConfiguration, this);
        } else {
            this.stats = ConsumerStats.CONSUMER_STATS_DISABLED;
        }
        if (consumerConfiguration.getReceiverQueueSize() <= 1) {
            this.zeroQueueLock = new ReentrantReadWriteLock();
        } else {
            this.zeroQueueLock = null;
        }
        if (consumerConfiguration.getAckTimeoutMillis() != 0) {
            this.unAckedMessageTracker = new UnAckedMessageTracker(pulsarClientImpl, this, consumerConfiguration.getAckTimeoutMillis());
        } else {
            this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
        }
        if (consumerConfiguration.getCryptoKeyReader() != null) {
            this.msgCrypto = new MessageCrypto("[" + str + "] [" + str2 + "]", false);
        }
        if (consumerConfiguration.getProperties().isEmpty()) {
            this.metadata = Collections.emptyMap();
        } else {
            this.metadata = Collections.unmodifiableMap(new HashMap(consumerConfiguration.getProperties()));
        }
        grabCnx();
    }

    public UnAckedMessageTracker getUnAckedMessageTracker() {
        return this.unAckedMessageTracker;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> unsubscribeAsync() {
        if (getState() == HandlerBase.State.Closing || getState() == HandlerBase.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (isConnected()) {
            setState(HandlerBase.State.Closing);
            long newRequestId = this.client.newRequestId();
            ByteBuf newUnsubscribe = Commands.newUnsubscribe(this.consumerId, newRequestId);
            ClientCnx cnx = cnx();
            cnx.sendRequestWithId(newUnsubscribe, newRequestId).thenRun(() -> {
                cnx.removeConsumer(this.consumerId);
                log.info("[{}][{}] Successfully unsubscribed from topic", this.topic, this.subscription);
                this.batchMessageAckTracker.clear();
                this.unAckedMessageTracker.close();
                completableFuture.complete(null);
                setState(HandlerBase.State.Closed);
            }).exceptionally(th -> {
                log.error("[{}][{}] Failed to unsubscribe: {}", new Object[]{this.topic, this.subscription, th.getCause().getMessage()});
                completableFuture.completeExceptionally(th.getCause());
                setState(HandlerBase.State.Ready);
                return null;
            });
        } else {
            completableFuture.completeExceptionally(new PulsarClientException("Not connected to broker"));
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message internalReceive() throws PulsarClientException {
        if (this.conf.getReceiverQueueSize() == 0) {
            Preconditions.checkArgument(this.zeroQueueLock != null, "Receiver queue size can't be modified");
            this.zeroQueueLock.writeLock().lock();
            try {
                return fetchSingleMessageFromBroker();
            } finally {
                this.zeroQueueLock.writeLock().unlock();
            }
        }
        try {
            Message take = this.incomingMessages.take();
            messageProcessed(take);
            return take;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.stats.incrementNumReceiveFailed();
            throw new PulsarClientException(e);
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected CompletableFuture<Message> internalReceiveAsync() {
        CompletableFuture<Message> completableFuture = new CompletableFuture<>();
        Message message = null;
        try {
            try {
                this.lock.writeLock().lock();
                message = this.incomingMessages.poll(0L, TimeUnit.MILLISECONDS);
                if (message == null) {
                    this.pendingReceives.add(completableFuture);
                }
                this.lock.writeLock().unlock();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                completableFuture.completeExceptionally(e);
                this.lock.writeLock().unlock();
            }
            if (message == null && this.conf.getReceiverQueueSize() == 0) {
                sendFlowPermitsToBroker(cnx(), 1);
            } else if (message != null) {
                messageProcessed(message);
                completableFuture.complete(message);
            }
            return completableFuture;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    private Message fetchSingleMessageFromBroker() throws PulsarClientException {
        Preconditions.checkArgument(this.conf.getReceiverQueueSize() == 0);
        if (this.incomingMessages.size() > 0) {
            log.error("The incoming message queue should never be greater than 0 when Queue size is 0");
            this.incomingMessages.clear();
        }
        try {
            try {
                this.waitingOnReceiveForZeroQueueSize = true;
                synchronized (this) {
                    if (isConnected()) {
                        sendFlowPermitsToBroker(cnx(), 1);
                    }
                }
                while (true) {
                    Message take = this.incomingMessages.take();
                    this.lastDequeuedMessage = (MessageIdImpl) take.getMessageId();
                    ClientCnx cnx = ((MessageImpl) take).getCnx();
                    synchronized (this) {
                        if (cnx == cnx()) {
                            this.waitingOnReceiveForZeroQueueSize = false;
                            this.stats.updateNumMsgsReceived(take);
                            this.waitingOnReceiveForZeroQueueSize = false;
                            this.incomingMessages.clear();
                            return take;
                        }
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.stats.incrementNumReceiveFailed();
                throw new PulsarClientException(e);
            }
        } catch (Throwable th) {
            this.waitingOnReceiveForZeroQueueSize = false;
            this.incomingMessages.clear();
            throw th;
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    protected Message internalReceive(int i, TimeUnit timeUnit) throws PulsarClientException {
        try {
            Message poll = this.incomingMessages.poll(i, timeUnit);
            if (poll != null) {
                messageProcessed(poll);
            }
            return poll;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            HandlerBase.State state = getState();
            if (state == HandlerBase.State.Closing || state == HandlerBase.State.Closed) {
                return null;
            }
            this.stats.incrementNumReceiveFailed();
            throw new PulsarClientException(e);
        }
    }

    private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageIdImpl, MessageIdImpl messageIdImpl, Map<String, Long> map) {
        MessageIdImpl lowerKey = this.batchMessageAckTracker.lowerKey(messageIdImpl);
        if (lowerKey == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] no messages prior to message {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), batchMessageIdImpl});
                return;
            }
            return;
        }
        ConcurrentNavigableMap<MessageIdImpl, BitSet> headMap = this.batchMessageAckTracker.headMap((ConcurrentNavigableMap<MessageIdImpl, BitSet>) lowerKey, true);
        Iterator<MessageIdImpl> it = headMap.keySet().iterator();
        while (it.hasNext()) {
            headMap.remove(it.next());
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] ack prior message {} to broker on cumulative ack for message {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), lowerKey, batchMessageIdImpl});
        }
        sendAcknowledge(lowerKey, PulsarApi.CommandAck.AckType.Cumulative, map);
    }

    boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageIdImpl, PulsarApi.CommandAck.AckType ackType, Map<String, Long> map) {
        MessageIdImpl messageIdImpl = new MessageIdImpl(batchMessageIdImpl.getLedgerId(), batchMessageIdImpl.getEntryId(), batchMessageIdImpl.getPartitionIndex());
        BitSet bitSet = (BitSet) this.batchMessageAckTracker.get(messageIdImpl);
        if (bitSet == null) {
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("[{}] [{}] message not found {} for ack {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), batchMessageIdImpl, ackType});
            return true;
        }
        int batchIndex = batchMessageIdImpl.getBatchIndex();
        int i = 0;
        this.lock.writeLock().lock();
        try {
            int length = bitSet.length();
            if (ackType == PulsarApi.CommandAck.AckType.Individual) {
                bitSet.clear(batchIndex);
            } else {
                bitSet.clear(0, batchIndex + 1);
            }
            boolean isEmpty = bitSet.isEmpty();
            if (log.isDebugEnabled()) {
                i = bitSet.cardinality();
            }
            if (!isEmpty) {
                if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
                    ackMessagesInEarlierBatch(batchMessageIdImpl, messageIdImpl, map);
                }
                if (!log.isDebugEnabled()) {
                    return false;
                }
                log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", new Object[]{this.subscription, this.consumerName, batchMessageIdImpl, ackType, Integer.valueOf(i)});
                return false;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] can ack message to broker {}, acktype {}, cardinality {}, length {}", new Object[]{this.subscription, this.consumerName, batchMessageIdImpl, ackType, Integer.valueOf(i), Integer.valueOf(length)});
            }
            if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
                this.batchMessageAckTracker.keySet().removeIf(messageIdImpl2 -> {
                    return messageIdImpl2.compareTo((MessageId) messageIdImpl) <= 0;
                });
            }
            this.batchMessageAckTracker.remove(messageIdImpl);
            if (ackType != PulsarApi.CommandAck.AckType.Individual) {
                return true;
            }
            this.stats.incrementNumAcksSent(length);
            return true;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void updateBatchAckTracker(MessageIdImpl messageIdImpl, PulsarApi.CommandAck.AckType ackType) {
        if (this.batchMessageAckTracker.isEmpty()) {
            return;
        }
        MessageIdImpl lowerKey = this.batchMessageAckTracker.lowerKey(messageIdImpl);
        if (lowerKey == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] no messages to clean up prior to message {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), messageIdImpl});
                return;
            }
            return;
        }
        ConcurrentNavigableMap<MessageIdImpl, BitSet> headMap = this.batchMessageAckTracker.headMap((ConcurrentNavigableMap<MessageIdImpl, BitSet>) lowerKey, true);
        Iterator<MessageIdImpl> it = headMap.keySet().iterator();
        while (it.hasNext()) {
            headMap.remove(it.next());
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] updated batch ack tracker up to message {} on cumulative ack for message {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), lowerKey, messageIdImpl});
        }
    }

    public boolean isBatchingAckTrackerEmpty() {
        return this.batchMessageAckTracker.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public CompletableFuture<Void> doAcknowledge(MessageId messageId, PulsarApi.CommandAck.AckType ackType, Map<String, Long> map) {
        Preconditions.checkArgument(messageId instanceof MessageIdImpl);
        if (getState() != HandlerBase.State.Ready && getState() != HandlerBase.State.Connecting) {
            this.stats.incrementNumAcksFailed();
            return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + getState()));
        }
        if (messageId instanceof BatchMessageIdImpl) {
            if (!markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, map)) {
                return CompletableFuture.completedFuture(null);
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] acknowledging message - {}, acktype {}", new Object[]{this.subscription, this.consumerName, messageId, ackType});
            }
        }
        if (ackType == PulsarApi.CommandAck.AckType.Cumulative && !(messageId instanceof BatchMessageIdImpl)) {
            updateBatchAckTracker((MessageIdImpl) messageId, ackType);
        }
        return sendAcknowledge(messageId, ackType, map);
    }

    private CompletableFuture<Void> sendAcknowledge(final MessageId messageId, final PulsarApi.CommandAck.AckType ackType, Map<String, Long> map) {
        final MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
        ByteBuf newAck = Commands.newAck(this.consumerId, messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), ackType, null, map);
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (isConnected()) {
            cnx().ctx().writeAndFlush(newAck).addListener2((GenericFutureListener<? extends Future<? super Void>>) new GenericFutureListener<Future<Void>>() { // from class: org.apache.pulsar.client.impl.ConsumerImpl.1
                @Override // org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(Future<Void> future) throws Exception {
                    if (!future.isSuccess()) {
                        ConsumerImpl.this.stats.incrementNumAcksFailed();
                        completableFuture.completeExceptionally(new PulsarClientException(future.cause()));
                        return;
                    }
                    if (ackType == PulsarApi.CommandAck.AckType.Individual) {
                        ConsumerImpl.this.unAckedMessageTracker.remove(messageIdImpl);
                        if (!(messageId instanceof BatchMessageIdImpl)) {
                            ConsumerImpl.this.stats.incrementNumAcksSent(1L);
                        }
                    } else if (ackType == PulsarApi.CommandAck.AckType.Cumulative) {
                        ConsumerImpl.this.stats.incrementNumAcksSent(ConsumerImpl.this.unAckedMessageTracker.removeMessagesTill(messageIdImpl));
                    }
                    if (ConsumerImpl.log.isDebugEnabled()) {
                        ConsumerImpl.log.debug("[{}] [{}] [{}] Successfully acknowledged message - {}, acktype {}", new Object[]{ConsumerImpl.this.subscription, ConsumerImpl.this.topic, ConsumerImpl.this.consumerName, messageId, ackType});
                    }
                    completableFuture.complete(null);
                }
            });
        } else {
            this.stats.incrementNumAcksFailed();
            completableFuture.completeExceptionally(new PulsarClientException("Not connected to broker. State: " + getState()));
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.client.impl.HandlerBase
    void connectionOpened(ClientCnx clientCnx) {
        int size;
        PulsarApi.MessageIdData build;
        setClientCnx(clientCnx);
        clientCnx.registerConsumer(this.consumerId, this);
        log.info("[{}][{}] Subscribing to topic on cnx {}", new Object[]{this.topic, this.subscription, clientCnx.ctx().channel()});
        long newRequestId = this.client.newRequestId();
        synchronized (this) {
            size = this.incomingMessages.size();
            this.startMessageId = clearReceiverQueue();
            this.unAckedMessageTracker.clear();
            this.batchMessageAckTracker.clear();
        }
        boolean z = this.subscriptionMode == SubscriptionMode.Durable;
        if (z) {
            build = null;
        } else {
            PulsarApi.MessageIdData.Builder newBuilder = PulsarApi.MessageIdData.newBuilder();
            newBuilder.setLedgerId(this.startMessageId.getLedgerId());
            newBuilder.setEntryId(this.startMessageId.getEntryId());
            if (this.startMessageId instanceof BatchMessageIdImpl) {
                newBuilder.setBatchIndex(this.startMessageId.getBatchIndex());
            }
            build = newBuilder.build();
            newBuilder.recycle();
        }
        ByteBuf newSubscribe = Commands.newSubscribe(this.topic, this.subscription, this.consumerId, newRequestId, getSubType(), this.priorityLevel, this.consumerName, z, build, this.metadata, this.readCompacted);
        if (build != null) {
            build.recycle();
        }
        clientCnx.sendRequestWithId(newSubscribe, newRequestId).thenRun(() -> {
            synchronized (this) {
                if (!changeToReadyState()) {
                    setState(HandlerBase.State.Closed);
                    clientCnx.removeConsumer(this.consumerId);
                    clientCnx.channel().close();
                    return;
                }
                log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", new Object[]{this.topic, this.subscription, clientCnx.channel().remoteAddress(), Long.valueOf(this.consumerId)});
                AVAILABLE_PERMITS_UPDATER.set(this, 0);
                if (this.waitingOnReceiveForZeroQueueSize || (this.conf.getReceiverQueueSize() == 0 && size > 0)) {
                    sendFlowPermitsToBroker(clientCnx, 1);
                }
                resetBackoff();
                if ((!this.subscribeFuture.complete(this) || this.partitionIndex <= -1) && this.conf.getReceiverQueueSize() != 0) {
                    sendFlowPermitsToBroker(clientCnx, this.conf.getReceiverQueueSize());
                }
            }
        }).exceptionally(th -> {
            clientCnx.removeConsumer(this.consumerId);
            if (getState() == HandlerBase.State.Closing || getState() == HandlerBase.State.Closed) {
                clientCnx.channel().close();
                return null;
            }
            log.warn("[{}][{}] Failed to subscribe to topic on {}", new Object[]{this.topic, this.subscription, clientCnx.channel().remoteAddress()});
            if ((th.getCause() instanceof PulsarClientException) && isRetriableError((PulsarClientException) th.getCause()) && System.currentTimeMillis() < this.subscribeTimeout) {
                reconnectLater(th.getCause());
                return null;
            }
            if (this.subscribeFuture.isDone()) {
                reconnectLater(th.getCause());
                return null;
            }
            setState(HandlerBase.State.Failed);
            this.subscribeFuture.completeExceptionally(th);
            this.client.cleanupConsumer(this);
            return null;
        });
    }

    private BatchMessageIdImpl clearReceiverQueue() {
        ArrayList arrayList = new ArrayList(this.incomingMessages.size());
        this.incomingMessages.drainTo(arrayList);
        if (arrayList.isEmpty()) {
            return this.lastDequeuedMessage != null ? new BatchMessageIdImpl(this.lastDequeuedMessage) : this.startMessageId;
        }
        MessageIdImpl messageIdImpl = (MessageIdImpl) ((Message) arrayList.get(0)).getMessageId();
        return messageIdImpl instanceof BatchMessageIdImpl ? new BatchMessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), messageIdImpl.getPartitionIndex(), ((BatchMessageIdImpl) messageIdImpl).getBatchIndex() - 1) : new BatchMessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId() - 1, messageIdImpl.getPartitionIndex(), -1);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendFlowPermitsToBroker(ClientCnx clientCnx, int i) {
        if (clientCnx != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Adding {} additional permits", new Object[]{this.topic, this.subscription, Integer.valueOf(i)});
            }
            clientCnx.ctx().writeAndFlush(Commands.newFlow(this.consumerId, i), clientCnx.ctx().voidPromise());
        }
    }

    @Override // org.apache.pulsar.client.impl.HandlerBase
    void connectionFailed(PulsarClientException pulsarClientException) {
        if (System.currentTimeMillis() <= this.subscribeTimeout || !this.subscribeFuture.completeExceptionally(pulsarClientException)) {
            return;
        }
        setState(HandlerBase.State.Failed);
        log.info("[{}] Consumer creation failed for consumer {}", this.topic, Long.valueOf(this.consumerId));
        this.client.cleanupConsumer(this);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase, org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> closeAsync() {
        if (getState() == HandlerBase.State.Closing || getState() == HandlerBase.State.Closed) {
            this.batchMessageAckTracker.clear();
            this.unAckedMessageTracker.close();
            return CompletableFuture.completedFuture(null);
        }
        if (!isConnected()) {
            log.info("[{}] [{}] Closed Consumer (not connected)", this.topic, this.subscription);
            setState(HandlerBase.State.Closed);
            this.batchMessageAckTracker.clear();
            this.unAckedMessageTracker.close();
            this.client.cleanupConsumer(this);
            return CompletableFuture.completedFuture(null);
        }
        Timeout statTimeout = this.stats.getStatTimeout();
        if (statTimeout != null) {
            statTimeout.cancel();
        }
        setState(HandlerBase.State.Closing);
        long newRequestId = this.client.newRequestId();
        ByteBuf newCloseConsumer = Commands.newCloseConsumer(this.consumerId, newRequestId);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ClientCnx cnx = cnx();
        cnx.sendRequestWithId(newCloseConsumer, newRequestId).handle((pair, th) -> {
            cnx.removeConsumer(this.consumerId);
            if (th != null && cnx.ctx().channel().isActive()) {
                completableFuture.completeExceptionally(th);
                return null;
            }
            log.info("[{}] [{}] Closed consumer", this.topic, this.subscription);
            setState(HandlerBase.State.Closed);
            this.batchMessageAckTracker.clear();
            this.unAckedMessageTracker.close();
            completableFuture.complete(null);
            this.client.cleanupConsumer(this);
            failPendingReceive();
            return null;
        });
        return completableFuture;
    }

    private void failPendingReceive() {
        CompletableFuture<Message> poll;
        this.lock.readLock().lock();
        try {
            if (this.listenerExecutor != null && !this.listenerExecutor.isShutdown()) {
                while (!this.pendingReceives.isEmpty() && (poll = this.pendingReceives.poll()) != null) {
                    poll.completeExceptionally(new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
                }
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void messageReceived(PulsarApi.MessageIdData messageIdData, ByteBuf byteBuf, ClientCnx clientCnx) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received message: {}/{}", new Object[]{this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId())});
        }
        if (!verifyChecksum(byteBuf, messageIdData)) {
            discardCorruptedMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.ChecksumMismatch);
            return;
        }
        try {
            PulsarApi.MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(byteBuf);
            ByteBuf decryptPayloadIfNeeded = decryptPayloadIfNeeded(messageIdData, parseMessageMetadata, byteBuf, clientCnx);
            if (decryptPayloadIfNeeded == null) {
                return;
            }
            ByteBuf uncompressPayloadIfNeeded = uncompressPayloadIfNeeded(messageIdData, parseMessageMetadata, decryptPayloadIfNeeded, clientCnx);
            decryptPayloadIfNeeded.release();
            if (uncompressPayloadIfNeeded == null) {
                return;
            }
            int numMessagesInBatch = parseMessageMetadata.getNumMessagesInBatch();
            if (numMessagesInBatch != 1 || parseMessageMetadata.hasNumMessagesInBatch()) {
                if (this.conf.getReceiverQueueSize() == 0) {
                    log.warn("Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size", this.subscription, this.consumerName);
                    closeAsync().handle((r11, th) -> {
                        notifyPendingReceivedCallback(null, new PulsarClientException.InvalidMessageException(String.format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ", this.subscription, this.consumerName)));
                        return null;
                    });
                } else {
                    receiveIndividualMessagesFromBatch(parseMessageMetadata, uncompressPayloadIfNeeded, messageIdData, clientCnx);
                }
                uncompressPayloadIfNeeded.release();
                parseMessageMetadata.recycle();
            } else {
                MessageImpl messageImpl = new MessageImpl(messageIdData, parseMessageMetadata, uncompressPayloadIfNeeded, getPartitionIndex(), clientCnx);
                uncompressPayloadIfNeeded.release();
                parseMessageMetadata.recycle();
                this.lock.readLock().lock();
                try {
                    this.unAckedMessageTracker.add((MessageIdImpl) messageImpl.getMessageId());
                    boolean z = !this.pendingReceives.isEmpty();
                    if ((this.conf.getReceiverQueueSize() != 0 || this.waitingOnReceiveForZeroQueueSize) && !z) {
                        this.incomingMessages.add(messageImpl);
                    }
                    if (z) {
                        notifyPendingReceivedCallback(messageImpl, null);
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            }
            if (this.listener != null) {
                this.listenerExecutor.execute(() -> {
                    for (int i = 0; i < numMessagesInBatch; i++) {
                        try {
                            Message internalReceive = internalReceive(0, TimeUnit.MILLISECONDS);
                            if (internalReceive == null) {
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] [{}] Message has been cleared from the queue", this.topic, this.subscription);
                                }
                                return;
                            }
                            try {
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}][{}] Calling message listener for message {}", new Object[]{this.topic, this.subscription, internalReceive.getMessageId()});
                                }
                                this.listener.received(this, internalReceive);
                            } catch (Throwable th2) {
                                log.error("[{}][{}] Message listener error in processing message: {}", new Object[]{this.topic, this.subscription, internalReceive.getMessageId(), th2});
                            }
                        } catch (PulsarClientException e) {
                            log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, e});
                            return;
                        }
                        log.warn("[{}] [{}] Failed to dequeue the message for listener", new Object[]{this.topic, this.subscription, e});
                        return;
                    }
                });
            }
        } catch (Throwable th2) {
            discardCorruptedMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.ChecksumMismatch);
        }
    }

    void notifyPendingReceivedCallback(MessageImpl messageImpl, Exception exc) {
        if (this.pendingReceives.isEmpty()) {
            return;
        }
        CompletableFuture<Message> poll = this.pendingReceives.poll();
        if (exc != null) {
            this.listenerExecutor.execute(() -> {
                poll.completeExceptionally(exc);
            });
            return;
        }
        Preconditions.checkNotNull(messageImpl, "received message can't be null");
        if (poll != null) {
            if (this.conf.getReceiverQueueSize() == 0) {
                poll.complete(messageImpl);
            } else {
                messageProcessed(messageImpl);
                this.listenerExecutor.execute(() -> {
                    poll.complete(messageImpl);
                });
            }
        }
    }

    void receiveIndividualMessagesFromBatch(PulsarApi.MessageMetadata messageMetadata, ByteBuf byteBuf, PulsarApi.MessageIdData messageIdData, ClientCnx clientCnx) {
        int numMessagesInBatch = messageMetadata.getNumMessagesInBatch();
        BitSet bitSet = new BitSet(numMessagesInBatch);
        MessageIdImpl messageIdImpl = new MessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), getPartitionIndex());
        bitSet.set(0, numMessagesInBatch);
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] added bit set for message {}, cardinality {}, length {}", new Object[]{this.subscription, this.consumerName, messageIdImpl, Integer.valueOf(bitSet.cardinality()), Integer.valueOf(bitSet.length())});
        }
        this.batchMessageAckTracker.put(messageIdImpl, bitSet);
        this.unAckedMessageTracker.add(messageIdImpl);
        int i = 0;
        for (int i2 = 0; i2 < numMessagesInBatch; i2++) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] processing message num - {} in batch", new Object[]{this.subscription, this.consumerName, Integer.valueOf(i2)});
                }
                PulsarApi.SingleMessageMetadata.Builder newBuilder = PulsarApi.SingleMessageMetadata.newBuilder();
                ByteBuf deSerializeSingleMessageInBatch = Commands.deSerializeSingleMessageInBatch(byteBuf, newBuilder, i2, numMessagesInBatch);
                if (this.subscriptionMode == SubscriptionMode.NonDurable && this.startMessageId != null && messageIdData.getLedgerId() == this.startMessageId.getLedgerId() && messageIdData.getEntryId() == this.startMessageId.getEntryId() && i2 <= this.startMessageId.getBatchIndex()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Ignoring message from before the startMessageId", this.subscription, this.consumerName);
                    }
                    i++;
                } else {
                    MessageImpl messageImpl = new MessageImpl(new BatchMessageIdImpl(messageIdData.getLedgerId(), messageIdData.getEntryId(), getPartitionIndex(), i2), messageMetadata, newBuilder.build(), deSerializeSingleMessageInBatch, clientCnx);
                    this.lock.readLock().lock();
                    try {
                        if (this.pendingReceives.isEmpty()) {
                            this.incomingMessages.add(messageImpl);
                        } else {
                            notifyPendingReceivedCallback(messageImpl, null);
                        }
                        this.lock.readLock().unlock();
                        deSerializeSingleMessageInBatch.release();
                        newBuilder.recycle();
                    } catch (Throwable th) {
                        this.lock.readLock().unlock();
                        throw th;
                    }
                }
            } catch (IOException e) {
                log.warn("[{}] [{}] unable to obtain message in batch", this.subscription, this.consumerName);
                this.batchMessageAckTracker.remove(messageIdImpl);
                discardCorruptedMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.BatchDeSerializeError);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", new Object[]{this.subscription, this.consumerName, Integer.valueOf(this.incomingMessages.size()), Integer.valueOf(this.incomingMessages.remainingCapacity())});
        }
        if (i > 0) {
            increaseAvailablePermits(clientCnx, i);
        }
    }

    protected synchronized void messageProcessed(Message message) {
        ClientCnx cnx = cnx();
        ClientCnx cnx2 = ((MessageImpl) message).getCnx();
        this.lastDequeuedMessage = (MessageIdImpl) message.getMessageId();
        if (cnx2 != cnx) {
            return;
        }
        increaseAvailablePermits(cnx);
        this.stats.updateNumMsgsReceived(message);
        if (this.conf.getAckTimeoutMillis() != 0) {
            MessageIdImpl messageIdImpl = (MessageIdImpl) message.getMessageId();
            if (messageIdImpl instanceof BatchMessageIdImpl) {
                messageIdImpl = new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), getPartitionIndex());
            }
            if (this.partitionIndex != -1) {
                this.unAckedMessageTracker.remove(messageIdImpl);
            } else {
                this.unAckedMessageTracker.add(messageIdImpl);
            }
        }
    }

    void increaseAvailablePermits(ClientCnx clientCnx) {
        increaseAvailablePermits(clientCnx, 1);
    }

    private void increaseAvailablePermits(ClientCnx clientCnx, int i) {
        int addAndGet = AVAILABLE_PERMITS_UPDATER.addAndGet(this, i);
        while (true) {
            int i2 = addAndGet;
            if (i2 < this.receiverQueueRefillThreshold) {
                return;
            }
            if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, i2, 0)) {
                sendFlowPermitsToBroker(clientCnx, i2);
                return;
            }
            addAndGet = AVAILABLE_PERMITS_UPDATER.get(this);
        }
    }

    private ByteBuf decryptPayloadIfNeeded(PulsarApi.MessageIdData messageIdData, PulsarApi.MessageMetadata messageMetadata, ByteBuf byteBuf, ClientCnx clientCnx) {
        if (messageMetadata.getEncryptionKeysCount() == 0) {
            return byteBuf.retain();
        }
        if (this.conf.getCryptoKeyReader() == null) {
            if (this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
                log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming encrypted message.", new Object[]{this.topic, this.subscription, this.consumerName});
                return byteBuf.retain();
            }
            if (this.conf.getCryptoFailureAction() != ConsumerCryptoFailureAction.DISCARD) {
                log.error("[{}][{}][{}] Message delivery failed since CryptoKeyReader interface is not implemented to consume encrypted message", new Object[]{this.topic, this.subscription, this.consumerName});
                return null;
            }
            log.warn("[{}][{}][{}] Skipping decryption since CryptoKeyReader interface is not implemented and config is set to discard", new Object[]{this.topic, this.subscription, this.consumerName});
            discardMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.DecryptionError);
            return null;
        }
        ByteBuf decrypt = this.msgCrypto.decrypt(messageMetadata, byteBuf, this.conf.getCryptoKeyReader());
        if (decrypt != null) {
            return decrypt;
        }
        if (this.conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME) {
            log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to consume.", new Object[]{this.topic, this.subscription, this.consumerName, messageIdData});
            return byteBuf.retain();
        }
        if (this.conf.getCryptoFailureAction() != ConsumerCryptoFailureAction.DISCARD) {
            log.error("[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming message", new Object[]{this.topic, this.subscription, this.consumerName, messageIdData});
            return null;
        }
        log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config is set to discard", new Object[]{this.topic, this.subscription, this.consumerName, messageIdData});
        discardMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.DecryptionError);
        return null;
    }

    private ByteBuf uncompressPayloadIfNeeded(PulsarApi.MessageIdData messageIdData, PulsarApi.MessageMetadata messageMetadata, ByteBuf byteBuf, ClientCnx clientCnx) {
        PulsarApi.CompressionType compression = messageMetadata.getCompression();
        CompressionCodec codec = this.codecProvider.getCodec(compression);
        int uncompressedSize = messageMetadata.getUncompressedSize();
        int readableBytes = byteBuf.readableBytes();
        if (readableBytes > 5232640) {
            log.error("[{}][{}] Got corrupted payload message size {} at {}", new Object[]{this.topic, this.subscription, Integer.valueOf(readableBytes), messageIdData});
            discardCorruptedMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.UncompressedSizeCorruption);
            return null;
        }
        try {
            return codec.decode(byteBuf, uncompressedSize);
        } catch (IOException e) {
            log.error("[{}][{}] Failed to decompress message with {} at {}: {}", new Object[]{this.topic, this.subscription, compression, messageIdData, e.getMessage(), e});
            discardCorruptedMessage(messageIdData, clientCnx, PulsarApi.CommandAck.ValidationError.DecompressionError);
            return null;
        }
    }

    private boolean verifyChecksum(ByteBuf byteBuf, PulsarApi.MessageIdData messageIdData) {
        int intValue;
        int computeChecksum;
        if (!Commands.hasChecksum(byteBuf) || (intValue = Commands.readChecksum(byteBuf).intValue()) == (computeChecksum = Crc32cChecksum.computeChecksum(byteBuf))) {
            return true;
        }
        log.error("[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}", new Object[]{this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId()), Long.toHexString(intValue), Integer.toHexString(computeChecksum)});
        return false;
    }

    private void discardCorruptedMessage(PulsarApi.MessageIdData messageIdData, ClientCnx clientCnx, PulsarApi.CommandAck.ValidationError validationError) {
        log.error("[{}][{}] Discarding corrupted message at {}:{}", new Object[]{this.topic, this.subscription, Long.valueOf(messageIdData.getLedgerId()), Long.valueOf(messageIdData.getEntryId())});
        discardMessage(messageIdData, clientCnx, validationError);
    }

    private void discardMessage(PulsarApi.MessageIdData messageIdData, ClientCnx clientCnx, PulsarApi.CommandAck.ValidationError validationError) {
        clientCnx.ctx().writeAndFlush(Commands.newAck(this.consumerId, messageIdData.getLedgerId(), messageIdData.getEntryId(), PulsarApi.CommandAck.AckType.Individual, validationError, Collections.emptyMap()), clientCnx.ctx().voidPromise());
        increaseAvailablePermits(clientCnx);
        this.stats.incrementNumReceiveFailed();
    }

    @Override // org.apache.pulsar.client.impl.HandlerBase
    String getHandlerName() {
        return this.subscription;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public boolean isConnected() {
        return getClientCnx() != null && getState() == HandlerBase.State.Ready;
    }

    int getPartitionIndex() {
        return this.partitionIndex;
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int getAvailablePermits() {
        return AVAILABLE_PERMITS_UPDATER.get(this);
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public int numMessagesInQueue() {
        return this.incomingMessages.size();
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void redeliverUnacknowledgedMessages() {
        int size;
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx.getRemoteEndpointProtocolVersion() < PulsarApi.ProtocolVersion.v2.getNumber()) {
            if (cnx == null || getState() == HandlerBase.State.Connecting) {
                log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", this);
                return;
            } else {
                log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
                cnx.ctx().close();
                return;
            }
        }
        synchronized (this) {
            size = this.incomingMessages.size();
            this.incomingMessages.clear();
            this.unAckedMessageTracker.clear();
            this.batchMessageAckTracker.clear();
        }
        cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId), cnx.ctx().voidPromise());
        if (size > 0) {
            increaseAvailablePermits(cnx, size);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", new Object[]{this.subscription, this.topic, this.consumerName, Integer.valueOf(size)});
        }
    }

    @Override // org.apache.pulsar.client.impl.ConsumerBase
    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> set) {
        if (this.conf.getSubscriptionType() != SubscriptionType.Shared) {
            redeliverUnacknowledgedMessages();
            return;
        }
        ClientCnx cnx = cnx();
        if (!isConnected() || cnx.getRemoteEndpointProtocolVersion() < PulsarApi.ProtocolVersion.v2.getNumber()) {
            if (cnx == null || getState() == HandlerBase.State.Connecting) {
                log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", this);
                return;
            } else {
                log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
                cnx.ctx().close();
                return;
            }
        }
        int removeExpiredMessagesFromQueue = removeExpiredMessagesFromQueue(set);
        Iterable partition = Iterables.partition(set, MAX_REDELIVER_UNACKNOWLEDGED);
        PulsarApi.MessageIdData.Builder newBuilder = PulsarApi.MessageIdData.newBuilder();
        partition.forEach(list -> {
            List list = (List) list.stream().map(messageIdImpl -> {
                this.batchMessageAckTracker.remove(messageIdImpl);
                newBuilder.setPartition(messageIdImpl.getPartitionIndex());
                newBuilder.setLedgerId(messageIdImpl.getLedgerId());
                newBuilder.setEntryId(messageIdImpl.getEntryId());
                return newBuilder.build();
            }).collect(Collectors.toList());
            cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(this.consumerId, list), cnx.ctx().voidPromise());
            list.forEach((v0) -> {
                v0.recycle();
            });
        });
        if (removeExpiredMessagesFromQueue > 0) {
            increaseAvailablePermits(cnx, removeExpiredMessagesFromQueue);
        }
        newBuilder.recycle();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", new Object[]{this.subscription, this.topic, this.consumerName, Integer.valueOf(removeExpiredMessagesFromQueue)});
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public void seek(MessageId messageId) throws PulsarClientException {
        try {
            seekAsync(messageId).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new PulsarClientException(e);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public CompletableFuture<Void> seekAsync(MessageId messageId) {
        if (getState() == HandlerBase.State.Closing || getState() == HandlerBase.State.Closed) {
            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }
        if (!isConnected()) {
            return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        long newRequestId = this.client.newRequestId();
        MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
        ByteBuf newSeek = Commands.newSeek(this.consumerId, newRequestId, messageIdImpl.getLedgerId(), messageIdImpl.getEntryId());
        ClientCnx cnx = cnx();
        log.info("[{}][{}] Seek subscription to message id {}", new Object[]{this.topic, this.subscription, messageId});
        cnx.sendRequestWithId(newSeek, newRequestId).thenRun(() -> {
            log.info("[{}][{}] Successfully reset subscription to message id {}", new Object[]{this.topic, this.subscription, messageId});
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.error("[{}][{}] Failed to reset subscription: {}", new Object[]{this.topic, this.subscription, th.getCause().getMessage()});
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    private MessageIdImpl getMessageIdImpl(Message message) {
        MessageIdImpl messageIdImpl = (MessageIdImpl) message.getMessageId();
        if (messageIdImpl instanceof BatchMessageIdImpl) {
            messageIdImpl = new MessageIdImpl(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId(), getPartitionIndex());
        }
        return messageIdImpl;
    }

    private int removeExpiredMessagesFromQueue(Set<MessageIdImpl> set) {
        int i = 0;
        Message peek = this.incomingMessages.peek();
        if (peek != null) {
            if (!set.contains(getMessageIdImpl(peek))) {
                return 0;
            }
            Message poll = this.incomingMessages.poll();
            while (true) {
                Message message = poll;
                if (message == null) {
                    break;
                }
                i++;
                MessageIdImpl messageIdImpl = getMessageIdImpl(message);
                if (!set.contains(messageIdImpl)) {
                    set.add(messageIdImpl);
                    break;
                }
                poll = this.incomingMessages.poll();
            }
        }
        return i;
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public ConsumerStats getStats() {
        if (this.stats instanceof ConsumerStatsDisabled) {
            return null;
        }
        return this.stats;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTerminated() {
        log.info("[{}] [{}] [{}] Consumer has reached the end of topic", new Object[]{this.subscription, this.topic, this.consumerName});
        this.hasReachedEndOfTopic = true;
        if (this.listener != null) {
            this.listener.reachedEndOfTopic(this);
        }
    }

    @Override // org.apache.pulsar.client.api.Consumer
    public boolean hasReachedEndOfTopic() {
        return this.hasReachedEndOfTopic;
    }

    public int hashCode() {
        return Objects.hash(this.topic, this.subscription, this.consumerName);
    }
}
