package com.rabbitmq.perf;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.perf.AgentBase;
import com.rabbitmq.perf.MulticastSet;
import com.rabbitmq.perf.Recovery;
import com.rabbitmq.perf.TopologyRecording;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/rabbitmq/perf/Consumer.class */
public class Consumer extends AgentBase implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    private static final AckNackOperation ACK_OPERATION = (channel, envelope, z, z2) -> {
        channel.basicAck(envelope.getDeliveryTag(), z);
    };
    private static final AckNackOperation NACK_OPERATION = (channel, envelope, z, z2) -> {
        channel.basicNack(envelope.getDeliveryTag(), z, z2);
    };
    static final String STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT = "Consumer reached message limit";
    private volatile ConsumerImpl q;
    private final Channel channel;
    private final String id;
    private final int txSize;
    private final boolean autoAck;
    private final int multiAckEvery;
    private final boolean requeue;
    private final Stats stats;
    private final int msgLimit;
    private final ConsumerLatency consumerLatency;
    private final BiFunction<AMQP.BasicProperties, byte[], Long> timestampExtractor;
    private final TimestampProvider timestampProvider;
    private final MulticastSet.CompletionHandler completionHandler;
    private final List<String> initialQueueNames;
    private final ConsumerState state;
    private final Recovery.RecoveryProcess recoveryProcess;
    private final ExecutorService executorService;
    private final boolean polling;
    private final int pollingInterval;
    private final AckNackOperation ackNackOperation;
    private final Map<String, Object> consumerArguments;
    private final Map<String, String> consumerTagBranchMap = Collections.synchronizedMap(new HashMap());
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicReference<List<String>> queueNames = new AtomicReference<>();
    private final AtomicLong queueNamesVersion = new AtomicLong(0);

    @FunctionalInterface
    /* loaded from: input_file:com/rabbitmq/perf/Consumer$AckNackOperation.class */
    private interface AckNackOperation {
        void apply(Channel channel, Envelope envelope, boolean z, boolean z2) throws IOException;
    }

    /* loaded from: input_file:com/rabbitmq/perf/Consumer$BusyWaitConsumerLatency.class */
    private static class BusyWaitConsumerLatency implements ConsumerLatency {
        private final ValueIndicator<Long> consumerLatenciesIndicator;

        private BusyWaitConsumerLatency(ValueIndicator<Long> valueIndicator) {
            this.consumerLatenciesIndicator = valueIndicator;
        }

        @Override // com.rabbitmq.perf.Consumer.ConsumerLatency
        public boolean simulateLatency() {
            return Consumer.latencyBusyWait(this.consumerLatenciesIndicator.getValue().longValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/perf/Consumer$ConsumerImpl.class */
    public class ConsumerImpl extends DefaultConsumer {
        private final boolean rateLimitation;

        private ConsumerImpl(Channel channel) {
            super(channel);
            Consumer.this.state.setLastStatsTime(System.currentTimeMillis());
            Consumer.this.state.setMsgCount(0);
            this.rateLimitation = Consumer.this.state.getRateLimit() > 0.0f;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            handleMessage(envelope, basicProperties, bArr, Consumer.this.channel);
        }

        void handleMessage(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr, Channel channel) throws IOException {
            int incrementMessageCount = Consumer.this.state.incrementMessageCount();
            if (Consumer.this.msgLimit == 0 || incrementMessageCount <= Consumer.this.msgLimit) {
                Consumer.this.stats.handleRecv(Consumer.this.id.equals(envelope.getRoutingKey()) ? Consumer.this.timestampProvider.getDifference(Consumer.this.timestampProvider.getCurrentTime(), ((Long) Consumer.this.timestampExtractor.apply(basicProperties, bArr)).longValue()) : 0L);
                if (Consumer.this.consumerLatency.simulateLatency()) {
                    ackIfNecessary(envelope, incrementMessageCount, channel);
                    commitTransactionIfNecessary(incrementMessageCount, channel);
                    long currentTimeMillis = System.currentTimeMillis();
                    if (this.rateLimitation) {
                        if (currentTimeMillis - Consumer.this.state.getLastStatsTime() > 1000) {
                            Consumer.this.state.setLastStatsTime(currentTimeMillis);
                            Consumer.this.state.setMsgCount(0);
                        }
                        Consumer.this.delay(currentTimeMillis, Consumer.this.state);
                    }
                }
            }
            if (Consumer.this.msgLimit == 0 || incrementMessageCount < Consumer.this.msgLimit) {
                return;
            }
            Consumer.this.countDown(Consumer.STOP_REASON_CONSUMER_REACHED_MESSAGE_LIMIT);
        }

        private void ackIfNecessary(Envelope envelope, int i, Channel channel) throws IOException {
            if (Consumer.this.autoAck) {
                return;
            }
            Consumer.this.dealWithWriteOperation(() -> {
                if (Consumer.this.multiAckEvery == 0) {
                    Consumer.this.ackNackOperation.apply(channel, envelope, false, Consumer.this.requeue);
                } else if (i % Consumer.this.multiAckEvery == 0) {
                    Consumer.this.ackNackOperation.apply(channel, envelope, true, Consumer.this.requeue);
                }
            }, Consumer.this.recoveryProcess);
        }

        private void commitTransactionIfNecessary(int i, Channel channel) throws IOException {
            if (Consumer.this.txSize == 0 || i % Consumer.this.txSize != 0) {
                return;
            }
            Consumer.this.dealWithWriteOperation(() -> {
                channel.txCommit();
            }, Consumer.this.recoveryProcess);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            Consumer.LOGGER.debug("Consumer received shutdown signal, recovery process enabled? {}, condition to trigger connection recovery? {}", Boolean.valueOf(Consumer.this.recoveryProcess.isEnabled()), Boolean.valueOf(Consumer.this.isConnectionRecoveryTriggered(shutdownSignalException)));
            if (Consumer.this.recoveryProcess.isEnabled()) {
                return;
            }
            Consumer.LOGGER.debug("Counting down for consumer");
            Consumer.this.countDown("Consumer shut down");
        }

        public void handleCancel(String str) throws IOException {
            System.out.printf("Consumer cancelled by broker for tag: %s", str);
            if (!Consumer.this.consumerTagBranchMap.containsKey(str)) {
                System.out.printf("Could not find queue for consumer tag: %s", str);
                return;
            }
            String str2 = (String) Consumer.this.consumerTagBranchMap.get(str);
            System.out.printf("Re-consuming. Queue: %s for Tag: %s", str2, str);
            Consumer.this.channel.basicConsume(str2, Consumer.this.autoAck, Consumer.this.consumerArguments, Consumer.this.q);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/perf/Consumer$ConsumerLatency.class */
    public interface ConsumerLatency {
        boolean simulateLatency();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/rabbitmq/perf/Consumer$ConsumerState.class */
    public static class ConsumerState implements AgentBase.AgentState {
        private final float rateLimit;
        private volatile long lastStatsTime;
        private final AtomicInteger msgCount = new AtomicInteger(0);

        protected ConsumerState(float f) {
            this.rateLimit = f;
        }

        @Override // com.rabbitmq.perf.AgentBase.AgentState
        public float getRateLimit() {
            return this.rateLimit;
        }

        @Override // com.rabbitmq.perf.AgentBase.AgentState
        public long getLastStatsTime() {
            return this.lastStatsTime;
        }

        protected void setLastStatsTime(long j) {
            this.lastStatsTime = j;
        }

        @Override // com.rabbitmq.perf.AgentBase.AgentState
        public int getMsgCount() {
            return this.msgCount.get();
        }

        protected void setMsgCount(int i) {
            this.msgCount.set(i);
        }

        @Override // com.rabbitmq.perf.AgentBase.AgentState
        public int incrementMessageCount() {
            return this.msgCount.incrementAndGet();
        }
    }

    /* loaded from: input_file:com/rabbitmq/perf/Consumer$NoWaitConsumerLatency.class */
    private static class NoWaitConsumerLatency implements ConsumerLatency {
        private NoWaitConsumerLatency() {
        }

        @Override // com.rabbitmq.perf.Consumer.ConsumerLatency
        public boolean simulateLatency() {
            return true;
        }
    }

    /* loaded from: input_file:com/rabbitmq/perf/Consumer$ThreadSleepConsumerLatency.class */
    private static class ThreadSleepConsumerLatency implements ConsumerLatency {
        private final ValueIndicator<Long> consumerLatenciesIndicator;

        private ThreadSleepConsumerLatency(ValueIndicator<Long> valueIndicator) {
            this.consumerLatenciesIndicator = valueIndicator;
        }

        @Override // com.rabbitmq.perf.Consumer.ConsumerLatency
        public boolean simulateLatency() {
            return Consumer.latencySleep(this.consumerLatenciesIndicator.getValue().longValue());
        }
    }

    /* loaded from: input_file:com/rabbitmq/perf/Consumer$VariableConsumerLatency.class */
    private static class VariableConsumerLatency implements ConsumerLatency {
        private final ValueIndicator<Long> consumerLatenciesIndicator;

        private VariableConsumerLatency(ValueIndicator<Long> valueIndicator) {
            this.consumerLatenciesIndicator = valueIndicator;
        }

        @Override // com.rabbitmq.perf.Consumer.ConsumerLatency
        public boolean simulateLatency() {
            long longValue = this.consumerLatenciesIndicator.getValue().longValue();
            if (longValue <= 0) {
                return true;
            }
            return longValue >= 1000 ? Consumer.latencySleep(longValue) : Consumer.latencyBusyWait(longValue);
        }
    }

    public Consumer(ConsumerParameters consumerParameters) {
        this.channel = consumerParameters.getChannel();
        this.id = consumerParameters.getId();
        this.txSize = consumerParameters.getTxSize();
        this.autoAck = consumerParameters.isAutoAck();
        this.multiAckEvery = consumerParameters.getMultiAckEvery();
        this.requeue = consumerParameters.isRequeue();
        this.stats = consumerParameters.getStats();
        this.msgLimit = consumerParameters.getMsgLimit();
        this.timestampProvider = consumerParameters.getTimestampProvider();
        this.completionHandler = consumerParameters.getCompletionHandler();
        this.executorService = consumerParameters.getExecutorService();
        this.polling = consumerParameters.isPolling();
        this.pollingInterval = consumerParameters.getPollingInterval();
        this.consumerArguments = consumerParameters.getConsumerArguments();
        this.queueNames.set(new ArrayList(consumerParameters.getQueueNames()));
        this.initialQueueNames = new ArrayList(consumerParameters.getQueueNames());
        if (consumerParameters.getConsumerLatenciesIndicator().isVariable()) {
            this.consumerLatency = new VariableConsumerLatency(consumerParameters.getConsumerLatenciesIndicator());
        } else {
            long longValue = consumerParameters.getConsumerLatenciesIndicator().getValue().longValue();
            if (longValue <= 0) {
                this.consumerLatency = new NoWaitConsumerLatency();
            } else if (longValue >= 1000) {
                this.consumerLatency = new ThreadSleepConsumerLatency(consumerParameters.getConsumerLatenciesIndicator());
            } else {
                this.consumerLatency = new BusyWaitConsumerLatency(consumerParameters.getConsumerLatenciesIndicator());
            }
        }
        if (this.timestampProvider.isTimestampInHeader()) {
            this.timestampExtractor = (basicProperties, bArr) -> {
                Object obj = basicProperties.getHeaders().get("timestamp");
                return Long.valueOf(obj == null ? Long.MAX_VALUE : ((Long) obj).longValue());
            };
        } else {
            this.timestampExtractor = (basicProperties2, bArr2) -> {
                DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bArr2));
                try {
                    dataInputStream.readInt();
                    return Long.valueOf(dataInputStream.readLong());
                } catch (IOException e) {
                    throw new RuntimeException("Error while extracting timestamp from body");
                }
            };
        }
        if (consumerParameters.isNack()) {
            this.ackNackOperation = NACK_OPERATION;
        } else {
            this.ackNackOperation = ACK_OPERATION;
        }
        this.state = new ConsumerState(consumerParameters.getRateLimit());
        this.recoveryProcess = consumerParameters.getRecoveryProcess();
        this.recoveryProcess.init(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.polling) {
            startBasicGetConsumer();
        } else {
            registerAsynchronousConsumer();
        }
    }

    private void startBasicGetConsumer() {
        this.executorService.execute(() -> {
            ConsumerImpl consumerImpl = new ConsumerImpl(this.channel);
            boolean z = this.pollingInterval > 0;
            long j = this.queueNamesVersion.get();
            List<String> list = this.queueNames.get();
            Channel channel = this.channel;
            Connection connection = this.channel.getConnection();
            while (!this.completed.get() && !Thread.interrupted()) {
                if (j != this.queueNamesVersion.get()) {
                    list = this.queueNames.get();
                    j = this.queueNamesVersion.get();
                }
                for (String str : list) {
                    if (this.recoveryProcess.isRecoverying()) {
                        try {
                            LOGGER.debug("Recovery in progress, sleeping for a sec");
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    } else {
                        try {
                            GetResponse basicGet = channel.basicGet(str, this.autoAck);
                            if (basicGet != null) {
                                consumerImpl.handleMessage(basicGet.getEnvelope(), basicGet.getProps(), basicGet.getBody(), channel);
                            }
                        } catch (AlreadyClosedException e2) {
                            LOGGER.debug("Tried to basic.get from a closed connection");
                        } catch (IOException e3) {
                            LOGGER.debug("Basic.get error on queue {}: {}", str, e3.getMessage());
                            try {
                                channel = connection.createChannel();
                            } catch (Exception e4) {
                                LOGGER.debug("Error while trying to create a channel: {}", str, e3.getMessage());
                            }
                        }
                        if (z) {
                            try {
                                Thread.sleep(this.pollingInterval);
                            } catch (InterruptedException e5) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }
            }
        });
    }

    private void registerAsynchronousConsumer() {
        try {
            this.q = new ConsumerImpl(this.channel);
            for (String str : this.queueNames.get()) {
                this.consumerTagBranchMap.put(this.channel.basicConsume(str, this.autoAck, this.consumerArguments, this.q), str);
            }
        } catch (ShutdownSignalException e) {
            throw new RuntimeException((Throwable) e);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void countDown(String str) {
        if (this.completed.compareAndSet(false, true)) {
            this.completionHandler.countDown(str);
        }
    }

    @Override // com.rabbitmq.perf.AgentBase
    public void recover(TopologyRecording topologyRecording) {
        if (this.polling) {
            ArrayList arrayList = new ArrayList(this.initialQueueNames.size());
            Iterator<String> it = this.initialQueueNames.iterator();
            while (it.hasNext()) {
                arrayList.add(queueName(topologyRecording, it.next()));
            }
            this.queueNames.set(arrayList);
            this.queueNamesVersion.incrementAndGet();
            return;
        }
        for (Map.Entry<String, String> entry : this.consumerTagBranchMap.entrySet()) {
            String queueName = queueName(topologyRecording, entry.getValue());
            LOGGER.debug("Recovering consumer, starting consuming on {}", queueName);
            try {
                this.channel.basicConsume(queueName, this.autoAck, entry.getKey(), false, false, this.consumerArguments, this.q);
            } catch (IOException e) {
                LOGGER.warn("Error while recovering consumer {} on queue {} on connection {}", new Object[]{entry.getKey(), queueName, this.channel.getConnection().getClientProvidedName(), e});
            }
        }
    }

    private static String queueName(TopologyRecording topologyRecording, String str) {
        TopologyRecording.RecordedQueue queue = topologyRecording.queue(str);
        return queue == null ? str : queue.name();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean latencySleep(long j) {
        try {
            Thread.sleep(j / 1000);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean latencyBusyWait(long j) {
        do {
        } while (System.nanoTime() - System.nanoTime() < j * 1000);
        return true;
    }
}
