package org.apache.camel.processor.idempotent.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.Metadata;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Configurer(metadataOnly = true)
@ManagedResource(description = "Kafka IdempotentRepository")
@Metadata(label = "bean", description = "Idempotent repository that uses Kafka to store message ids. Uses a local cache of previously seen Message IDs. The topic used must be unique per logical repository (i.e. two routes de-duplicate using different repositories, and different topics) On startup, the instance consumes the full content of the topic, rebuilding the cache to the latest state.", annotations = {"interfaceName=org.apache.camel.spi.IdempotentRepository"})
/* loaded from: input_file:org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.class */
public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository, CamelContextAware {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaIdempotentRepository.class);
    private static final int DEFAULT_MAXIMUM_CACHE_SIZE = 1000;
    private static final int DEFAULT_POLL_DURATION_MS = 100;
    private CamelContext camelContext;
    private Map<String, Object> cache;
    private Consumer<String, String> consumer;
    private Producer<String, String> producer;
    private Properties producerConfig;
    private Properties consumerConfig;
    private String groupId;

    @Metadata(description = "Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should use a different topic.", required = true)
    private String topic;

    @Metadata(description = "The URL for the kafka brokers to use", required = true)
    private String bootstrapServers;

    @Metadata(description = "Sets the maximum size of the local key cache.", defaultValue = "1000")
    private int maxCacheSize;

    @Metadata(description = "Sets the poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect how far behind other peers in the cluster are, which are updating their caches from the topic, relative to the idempotent consumer instance issued the cache action message. The default value of this is 100 If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will operate in an inconsistent state relative to its peers until it catches up.", defaultValue = "100")
    private int pollDurationMs;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository$CacheAction.class */
    public enum CacheAction {
        add,
        remove,
        clear
    }

    public KafkaIdempotentRepository() {
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = DEFAULT_POLL_DURATION_MS;
    }

    public KafkaIdempotentRepository(String str, String str2) {
        this(str, str2, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
    }

    @Deprecated
    public KafkaIdempotentRepository(String str, String str2, String str3) {
        this(str, str2, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, str3);
    }

    public KafkaIdempotentRepository(String str, String str2, int i, int i2) {
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = DEFAULT_POLL_DURATION_MS;
        this.topic = str;
        this.bootstrapServers = str2;
        this.maxCacheSize = i;
        this.pollDurationMs = i2;
    }

    public KafkaIdempotentRepository(String str, Properties properties, Properties properties2) {
        this(str, properties, properties2, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS);
    }

    @Deprecated
    public KafkaIdempotentRepository(String str, Properties properties, Properties properties2, String str2) {
        this(str, properties, properties2, DEFAULT_MAXIMUM_CACHE_SIZE, DEFAULT_POLL_DURATION_MS, str2);
    }

    public KafkaIdempotentRepository(String str, Properties properties, Properties properties2, int i, int i2) {
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = DEFAULT_POLL_DURATION_MS;
        this.topic = str;
        this.consumerConfig = properties;
        this.producerConfig = properties2;
        this.maxCacheSize = i;
        this.pollDurationMs = i2;
    }

    @Deprecated
    public KafkaIdempotentRepository(String str, String str2, int i, int i2, String str3) {
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = DEFAULT_POLL_DURATION_MS;
        this.topic = str;
        this.bootstrapServers = str2;
        this.maxCacheSize = i;
        this.pollDurationMs = i2;
        this.groupId = str3;
    }

    @Deprecated
    public KafkaIdempotentRepository(String str, Properties properties, Properties properties2, int i, int i2, String str2) {
        this.maxCacheSize = DEFAULT_MAXIMUM_CACHE_SIZE;
        this.pollDurationMs = DEFAULT_POLL_DURATION_MS;
        this.topic = str;
        this.consumerConfig = properties;
        this.producerConfig = properties2;
        this.maxCacheSize = i;
        this.pollDurationMs = i2;
        this.groupId = str2;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getBootstrapServers() {
        return this.bootstrapServers;
    }

    public void setBootstrapServers(String str) {
        this.bootstrapServers = str;
    }

    public Properties getProducerConfig() {
        return this.producerConfig;
    }

    public void setProducerConfig(Properties properties) {
        this.producerConfig = properties;
    }

    public Properties getConsumerConfig() {
        return this.consumerConfig;
    }

    public void setConsumerConfig(Properties properties) {
        this.consumerConfig = properties;
    }

    public int getMaxCacheSize() {
        return this.maxCacheSize;
    }

    public void setMaxCacheSize(int i) {
        this.maxCacheSize = i;
    }

    public int getPollDurationMs() {
        return this.pollDurationMs;
    }

    public void setPollDurationMs(int i) {
        this.pollDurationMs = i;
    }

    @Deprecated
    public String getGroupId() {
        return this.groupId;
    }

    @Deprecated
    public void setGroupId(String str) {
        this.groupId = str;
    }

    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    protected void doStart() throws Exception {
        ObjectHelper.notNull(this.camelContext, "camelContext");
        StringHelper.notEmpty(this.topic, "topic");
        this.cache = LRUCacheFactory.newLRUCache(this.maxCacheSize);
        if (this.consumerConfig == null) {
            this.consumerConfig = new Properties();
            StringHelper.notEmpty(this.bootstrapServers, "bootstrapServers");
            this.consumerConfig.put("bootstrap.servers", this.bootstrapServers);
        }
        if (this.producerConfig == null) {
            this.producerConfig = new Properties();
            StringHelper.notEmpty(this.bootstrapServers, "bootstrapServers");
            this.producerConfig.put("bootstrap.servers", this.bootstrapServers);
        }
        ObjectHelper.notNull(this.consumerConfig, "consumerConfig");
        ObjectHelper.notNull(this.producerConfig, "producerConfig");
        this.consumerConfig.put("enable.auto.commit", Boolean.FALSE.toString());
        this.consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
        this.consumerConfig.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer(this.consumerConfig);
        this.producerConfig.put("key.serializer", StringSerializer.class.getName());
        this.producerConfig.put("value.serializer", StringSerializer.class.getName());
        this.producerConfig.putIfAbsent("acks", "1");
        this.producerConfig.putIfAbsent("batch.size", "0");
        this.producer = new KafkaProducer(this.producerConfig);
        populateCache();
    }

    private void populateCache() {
        LOG.debug("Getting partitions of topic {}", this.topic);
        Collection collection = (Collection) this.consumer.partitionsFor(this.topic).stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toUnmodifiableList());
        LOG.debug("Assigning consumer to partitions {}", collection);
        this.consumer.assign(collection);
        LOG.debug("Seeking consumer to beginning of partitions {}", collection);
        this.consumer.seekToBeginning(collection);
        Map endOffsets = this.consumer.endOffsets(collection);
        LOG.debug("Consuming records from partitions {} till end offsets {}", collection, endOffsets);
        while (!KafkaConsumerUtil.isReachedOffsets(this.consumer, endOffsets)) {
            Iterator it = this.consumer.poll(Duration.ofMillis(this.pollDurationMs)).iterator();
            while (it.hasNext()) {
                addToCache((ConsumerRecord) it.next());
            }
        }
    }

    private void addToCache(ConsumerRecord<String, String> consumerRecord) {
        CacheAction cacheAction = null;
        try {
            cacheAction = CacheAction.valueOf((String) consumerRecord.value());
        } catch (IllegalArgumentException e) {
            LOG.error("Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.", new Object[]{consumerRecord.key(), consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset())});
        }
        String str = (String) consumerRecord.key();
        if (cacheAction == CacheAction.add) {
            LOG.debug("Adding to cache messageId:{}", str);
            this.cache.put(str, str);
        } else if (cacheAction == CacheAction.remove) {
            LOG.debug("Removing from cache messageId:{}", str);
            this.cache.remove(str);
        } else {
            if (cacheAction != CacheAction.clear) {
                throw new RuntimeException("Illegal action " + String.valueOf(cacheAction) + " for key " + ((String) consumerRecord.key()));
            }
            this.cache.clear();
        }
    }

    protected void doStop() {
        IOHelper.close(this.consumer, "consumer", LOG);
        IOHelper.close(this.producer, "producer", LOG);
    }

    public boolean add(String str) {
        if (this.cache.containsKey(str)) {
            return false;
        }
        this.cache.put(str, str);
        broadcastAction(str, CacheAction.add);
        return true;
    }

    private void broadcastAction(String str, CacheAction cacheAction) {
        try {
            LOG.debug("Broadcasting action:{} for key:{}", cacheAction, str);
            ObjectHelper.notNull(this.producer, "producer");
            this.producer.send(new ProducerRecord(this.topic, str, cacheAction.toString())).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeCamelException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeCamelException(e2);
        }
    }

    @ManagedOperation(description = "Does the store contain the given key")
    public boolean contains(String str) {
        LOG.debug("Checking cache for key:{}", str);
        return this.cache.containsKey(str);
    }

    @ManagedOperation(description = "Remove the key from the store")
    public boolean remove(String str) {
        this.cache.remove(str, str);
        broadcastAction(str, CacheAction.remove);
        return true;
    }

    public boolean confirm(String str) {
        return true;
    }

    public void clear() {
        broadcastAction(null, CacheAction.clear);
    }
}
