package org.apache.pinot.plugin.stream.kafka20;

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumerManager.class */
public class KafkaStreamLevelConsumerManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamLevelConsumerManager.class);
    private static final Long IN_USE = -1L;
    private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60);
    private static final Map<ImmutableTriple<String, String, String>, KafkaConsumer> CONSUMER_FOR_CONFIG_KEY = new HashMap();
    private static final IdentityHashMap<KafkaConsumer, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>();

    public static KafkaConsumer acquireKafkaConsumerForConfig(KafkaStreamLevelStreamConfig kafkaStreamLevelStreamConfig) {
        ImmutableTriple<String, String, String> immutableTriple = new ImmutableTriple<>(kafkaStreamLevelStreamConfig.getKafkaTopicName(), kafkaStreamLevelStreamConfig.getGroupId(), kafkaStreamLevelStreamConfig.getBootstrapServers());
        synchronized (KafkaStreamLevelConsumerManager.class) {
            if (CONSUMER_FOR_CONFIG_KEY.containsKey(immutableTriple)) {
                KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(immutableTriple);
                if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) {
                    throw new RuntimeException("Consumer " + kafkaConsumer + " already in use!");
                }
                LOGGER.info("Reusing kafka consumer with id {}", kafkaConsumer);
                CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE);
                return kafkaConsumer;
            }
            LOGGER.info("Creating new kafka consumer and iterator for topic {}", kafkaStreamLevelStreamConfig.getKafkaTopicName());
            Properties properties = new Properties();
            properties.putAll(kafkaStreamLevelStreamConfig.getKafkaConsumerProperties());
            properties.put("bootstrap.servers", kafkaStreamLevelStreamConfig.getBootstrapServers());
            properties.put("key.deserializer", StringDeserializer.class.getName());
            properties.put("value.deserializer", BytesDeserializer.class.getName());
            if (properties.containsKey("auto.offset.reset") && properties.getProperty("auto.offset.reset").equals("smallest")) {
                properties.put("auto.offset.reset", "earliest");
            }
            KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties);
            kafkaConsumer2.subscribe(Collections.singletonList(kafkaStreamLevelStreamConfig.getKafkaTopicName()));
            CONSUMER_FOR_CONFIG_KEY.put(immutableTriple, kafkaConsumer2);
            CONSUMER_RELEASE_TIME.put(kafkaConsumer2, IN_USE);
            LOGGER.info("Created consumer with id {} for topic {}", kafkaConsumer2, kafkaStreamLevelStreamConfig.getKafkaTopicName());
            return kafkaConsumer2;
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.pinot.plugin.stream.kafka20.KafkaStreamLevelConsumerManager$1] */
    public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) {
        synchronized (KafkaStreamLevelConsumerManager.class) {
            final long currentTimeMillis = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS;
            CONSUMER_RELEASE_TIME.put(kafkaConsumer, Long.valueOf(currentTimeMillis));
            LOGGER.info("Marking consumer with id {} for release at {}", kafkaConsumer, Long.valueOf(currentTimeMillis));
            new Thread() { // from class: org.apache.pinot.plugin.stream.kafka20.KafkaStreamLevelConsumerManager.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        Uninterruptibles.sleepUninterruptibly(KafkaStreamLevelConsumerManager.CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS);
                        synchronized (KafkaStreamLevelConsumerManager.class) {
                            KafkaStreamLevelConsumerManager.LOGGER.info("Executing release check for consumer {} at {}, scheduled at {}", new Object[]{kafkaConsumer, Long.valueOf(System.currentTimeMillis()), Long.valueOf(currentTimeMillis)});
                            Iterator<Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer>> it = KafkaStreamLevelConsumerManager.CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
                            while (it.hasNext()) {
                                KafkaConsumer value = it.next().getValue();
                                Long l = KafkaStreamLevelConsumerManager.CONSUMER_RELEASE_TIME.get(value);
                                if (l.equals(KafkaStreamLevelConsumerManager.IN_USE) || l.longValue() >= System.currentTimeMillis()) {
                                    KafkaStreamLevelConsumerManager.LOGGER.info("Not releasing consumer {}, it has been reacquired", value);
                                } else {
                                    KafkaStreamLevelConsumerManager.LOGGER.info("Releasing consumer {}", value);
                                    try {
                                        value.close();
                                    } catch (Exception e) {
                                        KafkaStreamLevelConsumerManager.LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", value, e);
                                    }
                                    it.remove();
                                    KafkaStreamLevelConsumerManager.CONSUMER_RELEASE_TIME.remove(value);
                                }
                            }
                        }
                    } catch (Exception e2) {
                        KafkaStreamLevelConsumerManager.LOGGER.warn("Caught exception in release of consumer {}", kafkaConsumer, e2);
                    }
                }
            }.start();
        }
    }

    public static void closeAllConsumers() {
        try {
            synchronized (KafkaStreamLevelConsumerManager.class) {
                LOGGER.info("Trying to shutdown all the kafka consumers");
                Iterator<KafkaConsumer> it = CONSUMER_FOR_CONFIG_KEY.values().iterator();
                while (it.hasNext()) {
                    KafkaConsumer next = it.next();
                    LOGGER.info("Trying to shutdown consumer {}", next);
                    try {
                        next.close();
                    } catch (Exception e) {
                        LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", next, e);
                    }
                    it.remove();
                }
                CONSUMER_FOR_CONFIG_KEY.clear();
                CONSUMER_RELEASE_TIME.clear();
            }
        } catch (Exception e2) {
            LOGGER.warn("Caught exception during shutting down all kafka consumers", e2);
        }
    }
}
