package org.apache.druid.query.lookup;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.server.lookup.namespace.cache.CacheHandler;
import org.apache.druid.server.lookup.namespace.cache.NamespaceExtractionCacheManager;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

@JsonTypeName("kafka")
/* loaded from: input_file:org/apache/druid/query/lookup/KafkaLookupExtractorFactory.class */
public class KafkaLookupExtractorFactory implements LookupExtractorFactory {
    private static final Logger LOG = new Logger(KafkaLookupExtractorFactory.class);
    private final ListeningExecutorService executorService;
    private final AtomicLong doubleEventCount;
    private final NamespaceExtractionCacheManager cacheManager;
    private final String factoryId;
    private final AtomicReference<Map<String, String>> mapRef;
    private final AtomicBoolean started;
    private CacheHandler cacheHandler;
    private volatile ListenableFuture<?> future;

    @JsonProperty
    private final String kafkaTopic;

    @JsonProperty
    private final Map<String, String> kafkaProperties;

    @JsonProperty
    private final long connectTimeout;

    @JsonProperty
    private final boolean injective;

    @JsonCreator
    public KafkaLookupExtractorFactory(@JacksonInject NamespaceExtractionCacheManager namespaceExtractionCacheManager, @JsonProperty("kafkaTopic") String str, @JsonProperty("kafkaProperties") Map<String, String> map, @JsonProperty("connectTimeout") @Min(0) long j, @JsonProperty("injective") boolean z) {
        this.doubleEventCount = new AtomicLong(0L);
        this.mapRef = new AtomicReference<>(null);
        this.started = new AtomicBoolean(false);
        this.future = null;
        this.kafkaTopic = (String) Preconditions.checkNotNull(str, "kafkaTopic required");
        this.kafkaProperties = (Map) Preconditions.checkNotNull(map, "kafkaProperties required");
        this.executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded("kafka-factory-" + StringUtils.encodeForFormat(str) + "-%s", 1));
        this.cacheManager = namespaceExtractionCacheManager;
        this.connectTimeout = j;
        this.injective = z;
        this.factoryId = "kafka-factory-" + str + UUID.randomUUID();
    }

    public KafkaLookupExtractorFactory(NamespaceExtractionCacheManager namespaceExtractionCacheManager, String str, Map<String, String> map) {
        this(namespaceExtractionCacheManager, str, map, 0L, false);
    }

    public String getKafkaTopic() {
        return this.kafkaTopic;
    }

    public Map<String, String> getKafkaProperties() {
        return this.kafkaProperties;
    }

    public long getConnectTimeout() {
        return this.connectTimeout;
    }

    public boolean isInjective() {
        return this.injective;
    }

    public boolean start() {
        synchronized (this.started) {
            if (this.started.get()) {
                LOG.warn("Already started, not starting again", new Object[0]);
                return true;
            }
            if (this.executorService.isShutdown()) {
                LOG.warn("Already shut down, not starting again", new Object[0]);
                return false;
            }
            verifyKafkaProperties();
            final String kafkaTopic = getKafkaTopic();
            LOG.debug("About to listen to topic [%s] with group.id [%s]", new Object[]{kafkaTopic, this.factoryId});
            this.cacheHandler = this.cacheManager.createCache();
            ConcurrentMap cache = this.cacheHandler.getCache();
            this.mapRef.set(cache);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ListenableFuture<?> submit = this.executorService.submit(() -> {
                Consumer<String, String> consumer = getConsumer();
                consumer.subscribe(Collections.singletonList(kafkaTopic));
                while (!this.executorService.isShutdown()) {
                    try {
                        try {
                        } catch (Exception e) {
                            LOG.error(e, "Error reading stream for topic [%s]", new Object[]{kafkaTopic});
                        }
                        if (this.executorService.isShutdown()) {
                            break;
                        }
                        ConsumerRecords poll = consumer.poll(1000L);
                        countDownLatch.countDown();
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            String str = (String) consumerRecord.key();
                            String str2 = (String) consumerRecord.value();
                            if (str == null || str2 == null) {
                                LOG.error("Bad key/message from topic [%s]: [%s]", new Object[]{kafkaTopic, consumerRecord});
                            } else {
                                this.doubleEventCount.incrementAndGet();
                                cache.put(str, str2);
                                this.doubleEventCount.incrementAndGet();
                                LOG.trace("Placed key[%s] val[%s]", new Object[]{str, str2});
                            }
                        }
                    } finally {
                        consumer.close();
                    }
                }
            });
            Futures.addCallback(submit, new FutureCallback<Object>() { // from class: org.apache.druid.query.lookup.KafkaLookupExtractorFactory.1
                public void onSuccess(Object obj) {
                    KafkaLookupExtractorFactory.LOG.debug("Success listening to [%s]", new Object[]{kafkaTopic});
                }

                public void onFailure(Throwable th) {
                    if (th instanceof CancellationException) {
                        KafkaLookupExtractorFactory.LOG.debug("Topic [%s] cancelled", new Object[]{kafkaTopic});
                    } else {
                        KafkaLookupExtractorFactory.LOG.error(th, "Error in listening to [%s]", new Object[]{kafkaTopic});
                    }
                }
            }, Execs.directExecutor());
            this.future = submit;
            Stopwatch createStarted = Stopwatch.createStarted();
            while (!countDownLatch.await(100L, TimeUnit.MILLISECONDS) && this.connectTimeout > 0) {
                try {
                    if (submit.isDone()) {
                        submit.get();
                    } else if (createStarted.elapsed(TimeUnit.MILLISECONDS) > this.connectTimeout) {
                        throw new TimeoutException("Failed to connect to kafka in sufficient time");
                    }
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    this.executorService.shutdown();
                    submit.cancel(true);
                    LOG.error(e, "Failed to start kafka extraction factory", new Object[0]);
                    this.cacheHandler.close();
                    return false;
                }
            }
            this.started.set(true);
            return true;
        }
    }

    public boolean close() {
        synchronized (this.started) {
            if (!this.started.get() || this.executorService.isShutdown()) {
                LOG.info("Already shutdown, ignoring", new Object[0]);
                return !this.started.get();
            }
            this.started.set(false);
            this.executorService.shutdown();
            ListenableFuture<?> listenableFuture = this.future;
            if (listenableFuture != null) {
                listenableFuture.cancel(true);
            }
            this.cacheHandler.close();
            return true;
        }
    }

    public boolean replaces(@Nullable LookupExtractorFactory lookupExtractorFactory) {
        if (this == lookupExtractorFactory) {
            return false;
        }
        if (lookupExtractorFactory == null || getClass() != lookupExtractorFactory.getClass()) {
            return true;
        }
        KafkaLookupExtractorFactory kafkaLookupExtractorFactory = (KafkaLookupExtractorFactory) lookupExtractorFactory;
        return (getKafkaTopic().equals(kafkaLookupExtractorFactory.getKafkaTopic()) && getKafkaProperties().equals(kafkaLookupExtractorFactory.getKafkaProperties()) && getConnectTimeout() == kafkaLookupExtractorFactory.getConnectTimeout() && isInjective() == kafkaLookupExtractorFactory.isInjective()) ? false : true;
    }

    @Nullable
    public LookupIntrospectHandler getIntrospectHandler() {
        return new KafkaLookupExtractorIntrospectionHandler(this);
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public LookupExtractor m1get() {
        Map map = (Map) Preconditions.checkNotNull(this.mapRef.get(), "Not started");
        final long j = this.doubleEventCount.get();
        return new MapLookupExtractor(map, isInjective()) { // from class: org.apache.druid.query.lookup.KafkaLookupExtractorFactory.2
            public byte[] getCacheKey() {
                byte[] utf8 = StringUtils.toUtf8(KafkaLookupExtractorFactory.this.factoryId);
                if (j == KafkaLookupExtractorFactory.this.doubleEventCount.get()) {
                    return ByteBuffer.allocate(utf8.length + 1 + 8).put(utf8).put((byte) -1).putLong(j).array();
                }
                byte[] utf82 = StringUtils.toUtf8(UUID.randomUUID().toString());
                return ByteBuffer.allocate(utf8.length + 1 + utf82.length + 1).put(utf8).put((byte) -1).put(utf82).put((byte) -1).array();
            }
        };
    }

    public long getCompletedEventCount() {
        return this.doubleEventCount.get() >> 1;
    }

    NamespaceExtractionCacheManager getCacheManager() {
        return this.cacheManager;
    }

    AtomicReference<Map<String, String>> getMapRef() {
        return this.mapRef;
    }

    AtomicLong getDoubleEventCount() {
        return this.doubleEventCount;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<?> getFuture() {
        return this.future;
    }

    private void verifyKafkaProperties() {
        if (this.kafkaProperties.containsKey("group.id")) {
            throw new IAE("Cannot set kafka property [group.id]. Property is randomly generated for you. Found [%s]", new Object[]{this.kafkaProperties.get("group.id")});
        }
        if (this.kafkaProperties.containsKey("auto.offset.reset")) {
            throw new IAE("Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]", new Object[]{this.kafkaProperties.get("auto.offset.reset")});
        }
        Preconditions.checkNotNull(this.kafkaProperties.get("bootstrap.servers"), "bootstrap.servers required property");
    }

    Consumer<String, String> getConsumer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Properties consumerProperties = getConsumerProperties();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties, new StringDeserializer(), new StringDeserializer());
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaConsumer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Nonnull
    private Properties getConsumerProperties() {
        Properties properties = new Properties();
        properties.putAll(this.kafkaProperties);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("group.id", this.factoryId);
        return properties;
    }
}
