/*
 * Decompiled with CFR 0.152.
 */
package net.leanix.dropkit.amqp;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.leanix.dropkit.amqp.ConnectionHolder;
import net.leanix.dropkit.amqp.QueueConsumer;
import net.leanix.dropkit.amqp.QueueConsumerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ConsumerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ConsumerRegistry.class);
    public static final long QUEUE_X_EXPIRY_MILLIS = 120000L;
    private final ConnectionHolder connectionHolder;
    private final Map<String, QueueConsumer> consumerMap = new HashMap<String, QueueConsumer>();
    private final QueueConsumerFactory consumerFactory;

    @Inject
    public ConsumerRegistry(ConnectionHolder connectionHolder, QueueConsumerFactory queueingService) {
        this.connectionHolder = connectionHolder;
        this.consumerFactory = queueingService;
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(new RemoveOldConsumerRunnable(this));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregister(QueueConsumer consumer) {
        log.info("unregistering consumer for " + consumer.getQueueName());
        Map<String, QueueConsumer> map = this.consumerMap;
        synchronized (map) {
            QueueConsumer registered = this.consumerMap.remove(consumer.getQueueName());
            if (registered != consumer) {
                throw new IllegalStateException("serious kuddelmuddel if this really happens");
            }
            this.internalPostRemove(consumer);
        }
    }

    private void internalPostRemove(QueueConsumer consumer) {
        consumer.setUnregistering();
        try {
            if (consumer.getChannel().isOpen()) {
                log.info("closing channel for consumer '{}'", (Object)consumer.getQueueName());
                consumer.getChannel().close();
            }
        }
        catch (ShutdownSignalException | IOException e) {
            log.error("could not close channel - ignoring", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public QueueConsumer consumerPresto(String queueName) throws IOException {
        QueueConsumer consumer;
        Map<String, QueueConsumer> map = this.consumerMap;
        synchronized (map) {
            consumer = this.consumerMap.get(queueName);
            if (consumer == null) {
                log.info("for queue name '{}' creating a channel to AMQP server and a consumer using it", (Object)queueName);
                Channel channel = this.connectionHolder.createNewChannel();
                consumer = this.consumerFactory.createConsumer(queueName, channel, this);
                this.consumerMap.put(queueName, consumer);
                Map<String, Long> args = Collections.singletonMap("x-expires", 120000L);
                channel.queueDeclare(queueName, true, false, false, args);
                try {
                    String consumerTag = consumer.getChannel().basicConsume(queueName, false, "", false, true, null, (Consumer)consumer);
                    consumer.setRegisteredConsumerTag(consumerTag);
                }
                catch (Exception e) {
                    log.warn("Internal error subscribing new consumer to queue '{}'", (Object)queueName);
                    throw e;
                }
            }
        }
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeOldConsumers(long inactivityMillis) {
        long now = System.currentTimeMillis();
        Map<String, QueueConsumer> map = this.consumerMap;
        synchronized (map) {
            log.debug("consumer remover: removing old consumers from {} total consumers", (Object)this.consumerMap.size());
            int count = 0;
            Iterator<QueueConsumer> iter = this.consumerMap.values().iterator();
            while (iter.hasNext()) {
                QueueConsumer consumer = iter.next();
                if (now - consumer.getLastUsed() <= inactivityMillis) continue;
                log.info("removing old consumer with queue name " + consumer.getQueueName());
                iter.remove();
                this.internalPostRemove(consumer);
                ++count;
            }
            log.debug("{} old consumers removed", (Object)count);
        }
    }

    static class RemoveOldConsumerRunnable
    implements Runnable {
        private final ConsumerRegistry consumerRegistry;

        RemoveOldConsumerRunnable(ConsumerRegistry consumerRegistry) {
            this.consumerRegistry = consumerRegistry;
        }

        @Override
        public void run() {
            while (true) {
                this.consumerRegistry.removeOldConsumers(600000L);
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException e) {
                    log.info("Stopping RemoveOldConsumerRunnable loop used to clean up unused consumers.");
                    return;
                }
            }
        }
    }
}

