package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.shade.com.google.common.base.MoreObjects;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.channel.ChannelFutureListener;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.channel.ChannelPromise;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/Consumer.class */
public class Consumer {
    private final Subscription subscription;
    private final PulsarApi.CommandSubscribe.SubType subType;
    private final ServerCnx cnx;
    private final String appId;
    private AuthenticationDataSource authenticationData;
    private final String topicName;
    private final int partitionIdx;
    private final PulsarApi.CommandSubscribe.InitialPosition subscriptionInitialPosition;
    private final long consumerId;
    private final int priorityLevel;
    private final boolean readCompacted;
    private final String consumerName;
    private long lastConsumedTimestamp;
    private long lastAckedTimestamp;
    private final ConcurrentLongLongPairHashMap pendingAcks;
    private final ConsumerStats stats;
    private final int maxUnackedMessages;
    private final Map<String, String> metadata;
    private final PulsarApi.KeySharedMeta keySharedMeta;
    private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits");
    private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked");
    private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    private volatile int messagePermits = 0;
    private volatile int permitsReceivedWhileConsumerBlocked = 0;
    private volatile int unackedMessages = 0;
    private volatile boolean blockedConsumerOnUnackedMsgs = false;
    private final Rate msgOut = new Rate();
    private final Rate msgRedeliver = new Rate();

    public Consumer(Subscription subscription, PulsarApi.CommandSubscribe.SubType subType, String str, long j, int i, String str2, int i2, ServerCnx serverCnx, String str3, Map<String, String> map, boolean z, PulsarApi.CommandSubscribe.InitialPosition initialPosition, PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {
        this.subscription = subscription;
        this.subType = subType;
        this.topicName = str;
        this.partitionIdx = TopicName.getPartitionIndex(str);
        this.consumerId = j;
        this.priorityLevel = i;
        this.readCompacted = z;
        this.consumerName = str2;
        this.maxUnackedMessages = i2;
        this.subscriptionInitialPosition = initialPosition;
        this.keySharedMeta = keySharedMeta;
        this.cnx = serverCnx;
        this.appId = str3;
        this.authenticationData = serverCnx.authenticationData;
        PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
        MESSAGE_PERMITS_UPDATER.set(this, 0);
        UNACKED_MESSAGES_UPDATER.set(this, 0);
        this.metadata = map != null ? map : Collections.emptyMap();
        this.stats = new ConsumerStats();
        this.stats.setAddress(serverCnx.clientAddress().toString());
        this.stats.consumerName = str2;
        this.stats.setConnectedSince(DateFormatter.now());
        this.stats.setClientVersion(serverCnx.getClientVersion());
        this.stats.metadata = this.metadata;
        if (Subscription.isIndividualAckMode(subType)) {
            this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
        } else {
            this.pendingAcks = null;
        }
    }

    public PulsarApi.CommandSubscribe.SubType subType() {
        return this.subType;
    }

    public long consumerId() {
        return this.consumerId;
    }

    public String consumerName() {
        return this.consumerName;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyActiveConsumerChange(Consumer consumer) {
        if (Commands.peerSupportsActiveConsumerListener(this.cnx.getRemoteEndpointProtocolVersion())) {
            if (log.isDebugEnabled()) {
                log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", new Object[]{Long.valueOf(this.consumerId), this.topicName, this.subscription.getName(), consumer});
            }
            this.cnx.ctx().writeAndFlush(Commands.newActiveConsumerChange(this.consumerId, this == consumer), this.cnx.ctx().voidPromise());
        }
    }

    public boolean readCompacted() {
        return this.readCompacted;
    }

    public ChannelPromise sendMessages(List<Entry> list, EntryBatchSizes entryBatchSizes, int i, long j, RedeliveryTracker redeliveryTracker) {
        this.lastConsumedTimestamp = System.currentTimeMillis();
        ChannelHandlerContext ctx = this.cnx.ctx();
        ChannelPromise newPromise = ctx.newPromise();
        if (list.isEmpty() || i == 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId)});
            }
            newPromise.setSuccess();
            entryBatchSizes.recyle();
            return newPromise;
        }
        if (this.pendingAcks != null) {
            for (int i2 = 0; i2 < list.size(); i2++) {
                Entry entry = list.get(i2);
                if (entry != null) {
                    this.pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), entryBatchSizes.getBatchSize(i2), 0L);
                }
            }
        }
        MESSAGE_PERMITS_UPDATER.addAndGet(this, -i);
        incrementUnackedMessages(i);
        this.msgOut.recordMultipleEvents(i, j);
        ctx.channel().eventLoop().execute(() -> {
            for (int i3 = 0; i3 < list.size(); i3++) {
                Entry entry2 = (Entry) list.get(i3);
                if (entry2 != null) {
                    if (entryBatchSizes.getBatchSize(i3) <= 1 || this.cnx.isBatchMessageCompatibleVersion()) {
                        PulsarApi.MessageIdData.Builder newBuilder = PulsarApi.MessageIdData.newBuilder();
                        PulsarApi.MessageIdData build = newBuilder.setLedgerId(entry2.getLedgerId()).setEntryId(entry2.getEntryId()).setPartition(this.partitionIdx).build();
                        ByteBuf dataBuffer = entry2.getDataBuffer();
                        dataBuffer.retain();
                        if (this.cnx.getRemoteEndpointProtocolVersion() < PulsarApi.ProtocolVersion.v11.getNumber()) {
                            Commands.skipChecksumIfPresent(dataBuffer);
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), Long.valueOf(entry2.getLedgerId()), Long.valueOf(entry2.getEntryId())});
                        }
                        PositionImpl positionImpl = PositionImpl.get(build.getLedgerId(), build.getEntryId());
                        ctx.write(Commands.newMessage(this.consumerId, build, redeliveryTracker.contains(positionImpl) ? redeliveryTracker.incrementAndGetRedeliveryCount(positionImpl) : 0, dataBuffer), ctx.voidPromise());
                        build.recycle();
                        newBuilder.recycle();
                        entry2.release();
                    } else {
                        log.warn("[{}-{}] Consumer doesn't support batch messages -  consumerId {}, msg id {}-{}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), Long.valueOf(entry2.getLedgerId()), Long.valueOf(entry2.getEntryId())});
                        ctx.close();
                        entry2.release();
                    }
                }
            }
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, newPromise);
            entryBatchSizes.recyle();
        });
        return newPromise;
    }

    private void incrementUnackedMessages(int i) {
        if (!Subscription.isIndividualAckMode(this.subType) || addAndGetUnAckedMsgs(this, i) < this.maxUnackedMessages || this.maxUnackedMessages <= 0) {
            return;
        }
        this.blockedConsumerOnUnackedMsgs = true;
    }

    public boolean isWritable() {
        return this.cnx.isWritable();
    }

    public void sendError(ByteBuf byteBuf) {
        this.cnx.ctx().writeAndFlush(byteBuf).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE);
    }

    public void close() throws BrokerServiceException {
        close(false);
    }

    public void close(boolean z) throws BrokerServiceException {
        this.subscription.removeConsumer(this, z);
        this.cnx.removedConsumer(this);
    }

    public void disconnect() {
        disconnect(false);
    }

    public void disconnect(boolean z) {
        log.info("Disconnecting consumer: {}", this);
        this.cnx.closeConsumer(this);
        try {
            close(z);
        } catch (BrokerServiceException e) {
            log.warn("Consumer {} was already closed: {}", new Object[]{this, e.getMessage(), e});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doUnsubscribe(long j) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        this.subscription.doUnsubscribe(this).thenAccept(r8 -> {
            log.info("Unsubscribed successfully from {}", this.subscription);
            this.cnx.removedConsumer(this);
            ctx.writeAndFlush(Commands.newSuccess(j));
        }).exceptionally(th -> {
            log.warn("Unsubscribe failed for {}", this.subscription, th);
            ctx.writeAndFlush(Commands.newError(j, BrokerServiceException.getClientErrorCode(th), th.getCause().getMessage()));
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void messageAcked(PulsarApi.CommandAck commandAck) {
        this.lastAckedTimestamp = System.currentTimeMillis();
        Map<String, Long> emptyMap = Collections.emptyMap();
        if (commandAck.getPropertiesCount() > 0) {
            emptyMap = (Map) commandAck.getPropertiesList().stream().collect(Collectors.toMap(keyLongValue -> {
                return keyLongValue.getKey();
            }, keyLongValue2 -> {
                return Long.valueOf(keyLongValue2.getValue());
            }));
        }
        if (commandAck.getAckType() == PulsarApi.CommandAck.AckType.Cumulative) {
            if (commandAck.getMessageIdCount() != 1) {
                log.warn("[{}] [{}] Received multi-message ack", this.subscription, Long.valueOf(this.consumerId));
                return;
            } else if (Subscription.isIndividualAckMode(this.subType)) {
                log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", this.subscription, Long.valueOf(this.consumerId));
                return;
            } else {
                PulsarApi.MessageIdData messageId = commandAck.getMessageId(0);
                this.subscription.acknowledgeMessage(Collections.singletonList(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())), PulsarApi.CommandAck.AckType.Cumulative, emptyMap);
                return;
            }
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < commandAck.getMessageIdCount(); i++) {
            PulsarApi.MessageIdData messageId2 = commandAck.getMessageId(i);
            PositionImpl positionImpl = PositionImpl.get(messageId2.getLedgerId(), messageId2.getEntryId());
            arrayList.add(positionImpl);
            if (Subscription.isIndividualAckMode(this.subType)) {
                removePendingAcks(positionImpl);
            }
            if (commandAck.hasValidationError()) {
                log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", new Object[]{this.subscription, Long.valueOf(this.consumerId), positionImpl, commandAck.getValidationError()});
            }
        }
        this.subscription.acknowledgeMessage(arrayList, PulsarApi.CommandAck.AckType.Individual, emptyMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flowPermits(int i) {
        int andAdd;
        Preconditions.checkArgument(i > 0);
        if (shouldBlockConsumerOnUnackMsgs() && this.unackedMessages >= this.maxUnackedMessages) {
            this.blockedConsumerOnUnackedMsgs = true;
        }
        if (this.blockedConsumerOnUnackedMsgs) {
            andAdd = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, i);
        } else {
            andAdd = MESSAGE_PERMITS_UPDATER.getAndAdd(this, i);
            this.subscription.consumerFlow(this, i);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = {} ", new Object[]{this.topicName, this.subscription, Integer.valueOf(i), Integer.valueOf(andAdd), Boolean.valueOf(this.blockedConsumerOnUnackedMsgs)});
        }
    }

    void flowConsumerBlockedPermits(Consumer consumer) {
        int andSet = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
        MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, andSet);
        this.subscription.consumerFlow(consumer, andSet);
    }

    public int getAvailablePermits() {
        return MESSAGE_PERMITS_UPDATER.get(this);
    }

    public boolean isBlocked() {
        return this.blockedConsumerOnUnackedMsgs;
    }

    public void reachedEndOfTopic() {
        if (this.cnx.getRemoteEndpointProtocolVersion() >= 9) {
            log.info("[{}] Notifying consumer that end of topic has been reached", this);
            this.cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(this.consumerId));
        }
    }

    private boolean shouldBlockConsumerOnUnackMsgs() {
        return Subscription.isIndividualAckMode(this.subType) && this.maxUnackedMessages > 0;
    }

    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgRedeliver.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateRedeliver = this.msgRedeliver.getRate();
    }

    public ConsumerStats getStats() {
        this.stats.lastAckedTimestamp = this.lastAckedTimestamp;
        this.stats.lastConsumedTimestamp = this.lastConsumedTimestamp;
        this.stats.availablePermits = getAvailablePermits();
        this.stats.unackedMessages = this.unackedMessages;
        this.stats.blockedConsumerOnUnackedMsgs = this.blockedConsumerOnUnackedMsgs;
        return this.stats;
    }

    public int getUnackedMessages() {
        return this.unackedMessages;
    }

    public PulsarApi.KeySharedMeta getKeySharedMeta() {
        return this.keySharedMeta;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("subscription", this.subscription).add("consumerId", this.consumerId).add("consumerName", this.consumerName).add("address", this.cnx.clientAddress()).toString();
    }

    public ChannelHandlerContext ctx() {
        return this.cnx.ctx();
    }

    public void checkPermissions() {
        TopicName topicName = TopicName.get(this.subscription.getTopicName());
        if (this.cnx.getBrokerService().getAuthorizationService() != null) {
            try {
                if (this.cnx.getBrokerService().getAuthorizationService().canConsume(topicName, this.appId, this.authenticationData, this.subscription.getName())) {
                    return;
                }
            } catch (Exception e) {
                log.warn("[{}] Get unexpected error while autorizing [{}]  {}", new Object[]{this.appId, this.subscription.getTopicName(), e.getMessage(), e});
            }
            log.info("[{}] is not allowed to consume from topic [{}] anymore", this.appId, this.subscription.getTopicName());
            disconnect();
        }
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof Consumer)) {
            return false;
        }
        Consumer consumer = (Consumer) obj;
        return Objects.equals(this.cnx.clientAddress(), consumer.cnx.clientAddress()) && this.consumerId == consumer.consumerId;
    }

    public int hashCode() {
        return this.consumerName.hashCode() + (31 * this.cnx.hashCode());
    }

    private void removePendingAcks(PositionImpl positionImpl) {
        Consumer consumer = null;
        if (this.pendingAcks.get(positionImpl.getLedgerId(), positionImpl.getEntryId()) == null) {
            Iterator<Consumer> it = this.subscription.getConsumers().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Consumer next = it.next();
                if (!next.equals(this) && next.getPendingAcks().containsKey(positionImpl.getLedgerId(), positionImpl.getEntryId())) {
                    consumer = next;
                    break;
                }
            }
        } else {
            consumer = this;
        }
        ConcurrentLongLongPairHashMap.LongPair longPair = consumer != null ? consumer.getPendingAcks().get(positionImpl.getLedgerId(), positionImpl.getEntryId()) : null;
        if (longPair != null) {
            int i = (int) longPair.first;
            if (consumer.getPendingAcks().remove(positionImpl.getLedgerId(), positionImpl.getEntryId())) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}-{}] consumer {} received ack {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), positionImpl});
                }
                if (addAndGetUnAckedMsgs(consumer, -i) <= this.maxUnackedMessages / 2 && consumer.blockedConsumerOnUnackedMsgs && consumer.shouldBlockConsumerOnUnackMsgs()) {
                    consumer.blockedConsumerOnUnackedMsgs = false;
                    flowConsumerBlockedPermits(consumer);
                }
            }
        }
    }

    public ConcurrentLongLongPairHashMap getPendingAcks() {
        return this.pendingAcks;
    }

    public int getPriorityLevel() {
        return this.priorityLevel;
    }

    public void redeliverUnacknowledgedMessages() {
        clearUnAckedMsgs();
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received redelivery", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId)});
        }
        if (this.pendingAcks != null) {
            ArrayList<PositionImpl> arrayList = new ArrayList((int) this.pendingAcks.size());
            MutableInt mutableInt = new MutableInt(0);
            this.pendingAcks.forEach((j, j2, j3, j4) -> {
                mutableInt.add((int) j3);
                arrayList.add(new PositionImpl(j, j2));
            });
            for (PositionImpl positionImpl : arrayList) {
                this.pendingAcks.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
            }
            this.msgRedeliver.recordMultipleEvents(mutableInt.intValue(), mutableInt.intValue());
            this.subscription.redeliverUnacknowledgedMessages(this, arrayList);
        } else {
            this.subscription.redeliverUnacknowledgedMessages(this);
        }
        flowConsumerBlockedPermits(this);
    }

    public void redeliverUnacknowledgedMessages(List<PulsarApi.MessageIdData> list) {
        int i = 0;
        ArrayList newArrayList = Lists.newArrayList();
        for (PulsarApi.MessageIdData messageIdData : list) {
            PositionImpl positionImpl = PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId());
            ConcurrentLongLongPairHashMap.LongPair longPair = this.pendingAcks.get(positionImpl.getLedgerId(), positionImpl.getEntryId());
            if (longPair != null) {
                this.pendingAcks.remove(positionImpl.getLedgerId(), positionImpl.getEntryId());
                i = (int) (i + longPair.first);
                newArrayList.add(positionImpl);
            }
        }
        addAndGetUnAckedMsgs(this, -i);
        this.blockedConsumerOnUnackedMsgs = false;
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", new Object[]{this.topicName, this.subscription, Long.valueOf(this.consumerId), Integer.valueOf(i), Integer.valueOf(newArrayList.size())});
        }
        this.subscription.redeliverUnacknowledgedMessages(this, newArrayList);
        this.msgRedeliver.recordMultipleEvents(i, i);
        int andSet = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);
        if (andSet > 0) {
            MESSAGE_PERMITS_UPDATER.getAndAdd(this, andSet);
            this.subscription.consumerFlow(this, andSet);
        }
    }

    public Subscription getSubscription() {
        return this.subscription;
    }

    private int addAndGetUnAckedMsgs(Consumer consumer, int i) {
        this.subscription.addUnAckedMessages(i);
        return UNACKED_MESSAGES_UPDATER.addAndGet(consumer, i);
    }

    private void clearUnAckedMsgs() {
        this.subscription.addUnAckedMessages(-UNACKED_MESSAGES_UPDATER.getAndSet(this, 0));
    }
}
