package org.apache.camel.processor.resume.kafka;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.resume.ResumeStrategyConfiguration;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JdkService("kafka-resume-strategy")
/* loaded from: input_file:org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.class */
public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, CamelContextAware {
    private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
    private Consumer<byte[], byte[]> consumer;
    private Producer<byte[], byte[]> producer;
    private boolean subscribed;
    private ResumeAdapter adapter;
    private KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
    private ExecutorService executorService;
    private CountDownLatch initLatch;
    private CamelContext camelContext;
    private Duration pollDuration = Duration.ofSeconds(1);
    private final ReentrantLock writeLock = new ReentrantLock();

    public SingleNodeKafkaResumeStrategy() {
    }

    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration kafkaResumeStrategyConfiguration) {
        this.resumeStrategyConfiguration = kafkaResumeStrategyConfiguration;
    }

    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration kafkaResumeStrategyConfiguration, ExecutorService executorService) {
        this.resumeStrategyConfiguration = kafkaResumeStrategyConfiguration;
        this.executorService = executorService;
    }

    protected void produce(byte[] bArr, byte[] bArr2, ResumeStrategy.UpdateCallBack updateCallBack) {
        this.producer.send(new ProducerRecord(this.resumeStrategyConfiguration.getTopic(), bArr, bArr2), (recordMetadata, exc) -> {
            if (exc != null) {
                LOG.error("Failed to send message {}", exc.getMessage(), exc);
            }
            if (updateCallBack != null) {
                updateCallBack.onUpdate(exc);
            }
        });
    }

    protected void doAdd(OffsetKey<?> offsetKey, Offset<?> offset) {
        if (this.adapter instanceof Cacheable) {
            this.adapter.add(offsetKey, offset);
        }
    }

    public <T extends Resumable> void updateLastOffset(T t) throws Exception {
        updateLastOffset((SingleNodeKafkaResumeStrategy) t, (ResumeStrategy.UpdateCallBack) null);
    }

    public <T extends Resumable> void updateLastOffset(T t, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        OffsetKey<?> offsetKey = t.getOffsetKey();
        Offset<?> lastOffset = t.getLastOffset();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating offset on Kafka with key {} to {}", offsetKey.getValue(), lastOffset.getValue());
        }
        updateLastOffset(offsetKey, lastOffset);
    }

    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) throws Exception {
        updateLastOffset(offsetKey, offset, null);
    }

    public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, ResumeStrategy.UpdateCallBack updateCallBack) throws Exception {
        ByteBuffer serialize = offsetKey.serialize();
        ByteBuffer serialize2 = offset.serialize();
        try {
            this.writeLock.lock();
            produce(serialize.array(), serialize2.array(), updateCallBack);
            this.writeLock.unlock();
            doAdd(offsetKey, offset);
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    public void loadCache() {
        if (!(this.adapter instanceof Deserializable)) {
            throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
        }
        this.initLatch = new CountDownLatch(this.resumeStrategyConfiguration.getMaxInitializationRetries());
        if (this.executorService == null) {
            this.executorService = this.camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "SingleNodeKafkaResumeStrategy");
        }
        this.executorService.submit(() -> {
            refresh(this.initLatch);
        });
    }

    private void waitForInitialization() {
        try {
            LOG.trace("Waiting for kafka resume strategy async initialization");
            if (!this.initLatch.await(this.resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
                LOG.debug("The initialization timed out");
            }
            LOG.trace("Kafka resume strategy initialization complete");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void refresh(CountDownLatch countDownLatch) {
        LOG.trace("Creating a offset cache refresher");
        try {
            try {
                try {
                    this.consumer = createConsumer();
                    subscribe(this.consumer);
                    LOG.debug("Loading records from topic {}", this.resumeStrategyConfiguration.getTopic());
                    this.consumer.subscribe(Collections.singletonList(this.resumeStrategyConfiguration.getTopic()));
                    poll(this.consumer, countDownLatch);
                    if (this.consumer != null) {
                        this.consumer.unsubscribe();
                        try {
                            this.consumer.close(Duration.ofSeconds(5L));
                        } catch (Exception e) {
                            LOG.warn("Error closing the consumer: {} (this error will be ignored)", e.getMessage(), e);
                        }
                    }
                } catch (Throwable th) {
                    if (this.consumer != null) {
                        this.consumer.unsubscribe();
                        try {
                            this.consumer.close(Duration.ofSeconds(5L));
                        } catch (Exception e2) {
                            LOG.warn("Error closing the consumer: {} (this error will be ignored)", e2.getMessage(), e2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e3) {
                LOG.error("Error while refreshing the local cache: {}", e3.getMessage(), e3);
                if (this.consumer != null) {
                    this.consumer.unsubscribe();
                    try {
                        this.consumer.close(Duration.ofSeconds(5L));
                    } catch (Exception e4) {
                        LOG.warn("Error closing the consumer: {} (this error will be ignored)", e4.getMessage(), e4);
                    }
                }
            }
        } catch (WakeupException e5) {
            LOG.info("Kafka consumer was interrupted during a blocking call");
            if (this.consumer != null) {
                this.consumer.unsubscribe();
                try {
                    this.consumer.close(Duration.ofSeconds(5L));
                } catch (Exception e6) {
                    LOG.warn("Error closing the consumer: {} (this error will be ignored)", e6.getMessage(), e6);
                }
            }
        }
    }

    protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch countDownLatch) {
        Deserializable deserializable = this.adapter;
        boolean z = false;
        while (true) {
            Iterator it = consume(consumer).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                byte[] bArr = (byte[]) consumerRecord.value();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Read from Kafka at {} ({}): {}", new Object[]{Instant.ofEpochMilli(consumerRecord.timestamp()), consumerRecord.timestampType(), bArr});
                }
                if (!deserializable.deserialize(ByteBuffer.wrap((byte[]) consumerRecord.key()), ByteBuffer.wrap((byte[]) consumerRecord.value()))) {
                    LOG.warn("Deserializer indicates that this is the last record to deserialize");
                }
            }
            if (!z) {
                if (countDownLatch.getCount() == 1) {
                    z = true;
                }
                countDownLatch.countDown();
            }
        }
    }

    protected void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String str) {
        if (this.subscribed) {
            return;
        }
        consumer.subscribe(Collections.singletonList(str));
        this.subscribed = true;
    }

    public void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String str, long j) {
        if (this.subscribed) {
            return;
        }
        consumer.subscribe(Collections.singletonList(str), getConsumerRebalanceListener(consumer, j));
        this.subscribed = true;
    }

    private ConsumerRebalanceListener getConsumerRebalanceListener(final Consumer<byte[], byte[]> consumer, final long j) {
        return new ConsumerRebalanceListener() { // from class: org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                for (TopicPartition topicPartition : collection) {
                    long position = consumer.position(topicPartition) - j;
                    if (position >= 0) {
                        consumer.seek(topicPartition, position);
                    } else {
                        SingleNodeKafkaResumeStrategy.LOG.info("Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
                    }
                }
            }
        };
    }

    protected ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> consumer) {
        ConsumerRecords<byte[], byte[]> poll = consumer.poll(this.pollDuration);
        return !poll.isEmpty() ? poll : ConsumerRecords.empty();
    }

    protected ConsumerRecords<byte[], byte[]> consume(int i, Consumer<byte[], byte[]> consumer) {
        while (i > 0) {
            ConsumerRecords<byte[], byte[]> poll = consumer.poll(this.pollDuration);
            if (!poll.isEmpty()) {
                return poll;
            }
            i--;
        }
        return ConsumerRecords.empty();
    }

    private void subscribe(Consumer<byte[], byte[]> consumer) {
        if (!(this.adapter instanceof Cacheable)) {
            checkAndSubscribe(consumer, this.resumeStrategyConfiguration.getTopic());
            return;
        }
        ResumeCache cache = this.adapter.getCache();
        if (cache.capacity() >= 1) {
            checkAndSubscribe(consumer, this.resumeStrategyConfiguration.getTopic(), cache.capacity());
        } else {
            checkAndSubscribe(consumer, this.resumeStrategyConfiguration.getTopic());
        }
    }

    public ResumeAdapter getAdapter() {
        if (this.adapter == null) {
            waitForInitialization();
        }
        return this.adapter;
    }

    public void setAdapter(ResumeAdapter resumeAdapter) {
        this.adapter = resumeAdapter;
    }

    public void build() {
    }

    public void init() {
        LOG.debug("Initializing the Kafka resume strategy");
    }

    private void createProducer() {
        if (this.producer == null) {
            this.producer = new KafkaProducer(this.resumeStrategyConfiguration.getProducerProperties());
        }
    }

    private Consumer<byte[], byte[]> createConsumer() {
        return new KafkaConsumer(this.resumeStrategyConfiguration.getConsumerProperties());
    }

    public void stop() {
        try {
            LOG.trace("Trying to obtain a lock for closing the producer");
            if (!this.writeLock.tryLock(1L, TimeUnit.SECONDS)) {
                LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ...");
            }
            LOG.info("Closing the Kafka producer");
            IOHelper.close(this.producer, "Kafka producer", LOG);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            LOG.warn("Error closing the Kafka producer: {} (this error will be ignored)", e2.getMessage(), e2);
        } finally {
            this.writeLock.unlock();
        }
        try {
            LOG.info("Closing the Kafka consumer");
            this.consumer.wakeup();
            if (this.executorService != null) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(2L, TimeUnit.SECONDS)) {
                    LOG.warn("Kafka consumer did not shutdown within 2 seconds");
                    this.executorService.shutdownNow();
                }
            } else {
                LOG.trace("There's no executor service to shutdown");
            }
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
        }
    }

    public void close() throws IOException {
        stop();
    }

    public void start() {
        LOG.info("Starting the kafka resume strategy");
        createProducer();
    }

    public Duration getPollDuration() {
        return this.pollDuration;
    }

    public void setPollDuration(Duration duration) {
        this.pollDuration = (Duration) Objects.requireNonNull(duration, "The poll duration cannot be null");
    }

    protected Producer<byte[], byte[]> getProducer() {
        return this.producer;
    }

    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
        if (!(resumeStrategyConfiguration instanceof KafkaResumeStrategyConfiguration)) {
            throw new RuntimeCamelException("Invalid resume strategy configuration of type " + ObjectHelper.className(resumeStrategyConfiguration));
        }
        this.resumeStrategyConfiguration = (KafkaResumeStrategyConfiguration) resumeStrategyConfiguration;
    }

    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
        return this.resumeStrategyConfiguration;
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }
}
