package org.apache.camel.component.kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.task.ForegroundTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.TimeUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/KafkaFetchRecords.class */
public class KafkaFetchRecords implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFetchRecords.class);
    private final KafkaConsumer kafkaConsumer;
    private Consumer consumer;
    private volatile String clientId;
    private final String topicName;
    private final Pattern topicPattern;
    private final String threadId;
    private final Properties kafkaProps;
    private PollExceptionStrategy pollExceptionStrategy;
    private final BridgeExceptionHandlerToErrorHandler bridge;
    private CommitManager commitManager;
    private volatile Exception lastError;
    private final KafkaConsumerListener consumerListener;
    private volatile boolean terminated;
    private volatile long currentBackoffInterval;
    private volatile boolean reconnect;
    private volatile boolean connected;
    private final ReentrantLock lock = new ReentrantLock();
    private volatile State state = State.RUNNING;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/camel/component/kafka/KafkaFetchRecords$State.class */
    public enum State {
        RUNNING,
        PAUSE_REQUESTED,
        PAUSED,
        RESUME_REQUESTED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaFetchRecords(KafkaConsumer kafkaConsumer, BridgeExceptionHandlerToErrorHandler bridgeExceptionHandlerToErrorHandler, String str, Pattern pattern, String str2, Properties properties, KafkaConsumerListener kafkaConsumerListener) {
        this.kafkaConsumer = kafkaConsumer;
        this.bridge = bridgeExceptionHandlerToErrorHandler;
        this.topicName = str;
        this.topicPattern = pattern;
        this.consumerListener = kafkaConsumerListener;
        this.threadId = str + "-Thread " + str2;
        this.kafkaProps = properties;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!isKafkaConsumerRunnable()) {
            return;
        }
        while (true) {
            this.terminated = false;
            if (!isConnected()) {
                this.currentBackoffInterval = this.kafkaConsumer.m2getEndpoint().m5getComponent().getCreateConsumerBackoffInterval();
                ForegroundTask build = Tasks.foregroundTask().withName("Create KafkaConsumer").withBudget(Budgets.iterationBudget().withMaxIterations(this.kafkaConsumer.m2getEndpoint().m5getComponent().getCreateConsumerBackoffMaxAttempts()).withInitialDelay(Duration.ZERO).withInterval(Duration.ofMillis(this.currentBackoffInterval)).build()).build();
                if (!build.run(this::createConsumerTask)) {
                    setupCreateConsumerException(build, this.kafkaConsumer.m2getEndpoint().m5getComponent().getCreateConsumerBackoffMaxAttempts());
                    this.terminated = true;
                    break;
                }
                this.currentBackoffInterval = this.kafkaConsumer.m2getEndpoint().m5getComponent().getSubscribeConsumerBackoffInterval();
                ForegroundTask build2 = Tasks.foregroundTask().withName("Subscribe KafkaConsumer").withBudget(Budgets.iterationBudget().withMaxIterations(this.kafkaConsumer.m2getEndpoint().m5getComponent().getSubscribeConsumerBackoffMaxAttempts()).withInitialDelay(Duration.ZERO).withInterval(Duration.ofMillis(this.currentBackoffInterval)).build()).build();
                if (!build2.run(this::initializeConsumerTask)) {
                    setupInitializeErrorException(build2, this.kafkaConsumer.m2getEndpoint().m5getComponent().getCreateConsumerBackoffMaxAttempts());
                    this.terminated = true;
                    break;
                }
                setConnected(true);
            }
            setLastError(null);
            startPolling();
            if ((!this.pollExceptionStrategy.canContinue() && !isReconnect()) || !isKafkaConsumerRunnable()) {
                break;
            }
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Terminating KafkaConsumer thread {} receiving from {}", this.threadId, getPrintableTopic());
        }
        safeConsumerClose();
    }

    private void setupInitializeErrorException(ForegroundTask foregroundTask, int i) {
        String printDuration = TimeUtils.printDuration(foregroundTask.elapsed(), true);
        String str = "Gave up subscribing org.apache.kafka.clients.consumer.KafkaConsumer " + this.threadId + " to " + getPrintableTopic() + " after " + i + " attempts (elapsed: " + printDuration + ").";
        LOG.warn(str);
        setLastError(new KafkaConsumerFatalException(str, this.lastError));
    }

    private void setupCreateConsumerException(ForegroundTask foregroundTask, int i) {
        String printDuration = TimeUtils.printDuration(foregroundTask.elapsed(), true);
        setLastError(new KafkaConsumerFatalException("Gave up creating org.apache.kafka.clients.consumer.KafkaConsumer " + this.threadId + " to " + getPrintableTopic() + " after " + i + " attempts (elapsed: " + printDuration + ").", this.lastError));
    }

    private boolean initializeConsumerTask() {
        try {
            initializeConsumer();
            return true;
        } catch (Exception e) {
            setConnected(false);
            LOG.warn("Error subscribing org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(), e);
            setLastError(e);
            return false;
        }
    }

    private boolean createConsumerTask() {
        try {
            createConsumer();
            this.commitManager = CommitManagers.createCommitManager(this.consumer, this.kafkaConsumer, this.threadId, getPrintableTopic());
            if (this.consumerListener != null) {
                this.consumerListener.setConsumer(this.consumer);
                SeekPolicy seekTo = this.kafkaConsumer.m2getEndpoint().getConfiguration().getSeekTo();
                if (seekTo == null) {
                    seekTo = this.kafkaConsumer.m2getEndpoint().m5getComponent().getConfiguration().getSeekTo();
                    if (seekTo == null) {
                        seekTo = SeekPolicy.BEGINNING;
                    }
                }
                this.consumerListener.setSeekPolicy(seekTo);
            }
            return true;
        } catch (Exception e) {
            setConnected(false);
            LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(), e);
            setLastError(e);
            return false;
        }
    }

    protected void createConsumer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
            LOG.info("{} Kafka consumer thread ID {} with poll timeout of {} ms", new Object[]{this.consumer == null ? "Connecting" : "Reconnecting", this.threadId, Long.valueOf(this.kafkaConsumer.m2getEndpoint().getConfiguration().getPollTimeoutMs().longValue())});
            this.consumer = this.kafkaConsumer.m2getEndpoint().getKafkaClientFactory().getConsumer(this.kafkaProps);
            if (this.clientId == null) {
                this.clientId = getKafkaProps().getProperty("client.id");
                if (this.clientId == null) {
                    try {
                        this.clientId = (String) ReflectionHelper.getField(this.consumer.getClass().getDeclaredField("clientId"), this.consumer);
                    } catch (Exception e) {
                        this.clientId = "";
                    }
                }
            }
            this.pollExceptionStrategy = KafkaErrorStrategies.strategies(this, this.kafkaConsumer.m2getEndpoint(), this.consumer);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private void initializeConsumer() {
        subscribe();
        setConnected(false);
        this.pollExceptionStrategy.reset();
    }

    private void subscribe() {
        ConsumerRebalanceListener classicRebalanceListener = this.kafkaConsumer.getResumeStrategy() == null ? new ClassicRebalanceListener(this.threadId, this.kafkaConsumer.m2getEndpoint().getConfiguration(), this.commitManager, this.consumer) : new ResumeRebalanceListener(this.threadId, this.kafkaConsumer.m2getEndpoint().getConfiguration(), this.commitManager, this.consumer, this.kafkaConsumer.getResumeStrategy());
        if (LOG.isInfoEnabled()) {
            LOG.info("Subscribing {} to {}", this.threadId, getPrintableTopic());
        }
        if (this.topicPattern != null) {
            this.consumer.subscribe(this.topicPattern, classicRebalanceListener);
        } else {
            this.consumer.subscribe(Arrays.asList(this.topicName.split(",")), classicRebalanceListener);
        }
    }

    protected void startPolling() {
        try {
            try {
                try {
                    try {
                        this.lock.lock();
                        long longValue = this.kafkaConsumer.m2getEndpoint().getConfiguration().getPollTimeoutMs().longValue();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Polling {} from {} with timeout: {}", new Object[]{this.threadId, getPrintableTopic(), Long.valueOf(longValue)});
                        }
                        KafkaRecordProcessorFacade kafkaRecordProcessorFacade = new KafkaRecordProcessorFacade(this.kafkaConsumer, this.threadId, this.commitManager, this.consumerListener);
                        Duration ofMillis = Duration.ofMillis(longValue);
                        ProcessingResult processingResult = null;
                        while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && this.pollExceptionStrategy.canContinue()) {
                            ConsumerRecords<Object, Object> poll = this.consumer.poll(ofMillis);
                            if (this.consumerListener == null || this.consumerListener.afterConsume(this.consumer)) {
                                if (processingResult != null) {
                                    if (LOG.isTraceEnabled()) {
                                        LOG.trace("This polling iteration is using lastresult on partition {} and offset {}", Long.valueOf(processingResult.getPartition()), Long.valueOf(processingResult.getPartitionLastOffset()));
                                    }
                                } else if (LOG.isTraceEnabled()) {
                                    LOG.trace("This polling iteration is using lastresult of null");
                                }
                                ProcessingResult processPolledRecords = kafkaRecordProcessorFacade.processPolledRecords(poll, processingResult);
                                if (processPolledRecords != null) {
                                    if (LOG.isTraceEnabled()) {
                                        LOG.trace("This polling iteration had a result returned for partition {} and offset {}", Long.valueOf(processPolledRecords.getPartition()), Long.valueOf(processPolledRecords.getPartitionLastOffset()));
                                    }
                                } else if (LOG.isTraceEnabled()) {
                                    LOG.trace("This polling iteration had a result returned as null");
                                }
                                updateTaskState();
                                if (processPolledRecords == null || !processPolledRecords.isBreakOnErrorHit() || this.state.equals(State.PAUSED)) {
                                    processingResult = processPolledRecords;
                                    if (LOG.isTraceEnabled()) {
                                        LOG.trace("Setting lastresult to partition {} and offset {}", Long.valueOf(processingResult.getPartition()), Long.valueOf(processingResult.getPartitionLastOffset()));
                                    }
                                } else {
                                    LOG.debug("We hit an error ... setting flags to force reconnect");
                                    setReconnect(true);
                                    setConnected(false);
                                }
                            }
                        }
                        if (!isConnected()) {
                            LOG.debug("Not reconnecting, check whether to auto-commit or not ...");
                            this.commitManager.commit();
                        }
                        safeUnsubscribe();
                        if (!this.pollExceptionStrategy.canContinue()) {
                            safeUnsubscribe();
                            safeConsumerClose();
                        }
                        this.lock.unlock();
                    } catch (Exception e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}", new Object[]{e.getClass().getName(), this.threadId, getPrintableTopic(), e.getMessage(), e});
                        } else {
                            LOG.warn("Exception {} caught by thread {} while polling {} from kafka: {}", new Object[]{e.getClass().getName(), this.threadId, getPrintableTopic(), e.getMessage()});
                        }
                        this.pollExceptionStrategy.handle(-1L, e);
                        if (!this.pollExceptionStrategy.canContinue()) {
                            safeUnsubscribe();
                            safeConsumerClose();
                        }
                        this.lock.unlock();
                    }
                } catch (WakeupException e2) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", this.threadId, getPrintableTopic());
                    }
                    if (!this.pollExceptionStrategy.canContinue()) {
                        safeUnsubscribe();
                        safeConsumerClose();
                    }
                    this.lock.unlock();
                }
            } catch (InterruptException e3) {
                this.kafkaConsumer.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", e3);
                this.commitManager.commit();
                LOG.info("Unsubscribing {} from {}", this.threadId, getPrintableTopic());
                safeUnsubscribe();
                Thread.currentThread().interrupt();
                if (!this.pollExceptionStrategy.canContinue()) {
                    safeUnsubscribe();
                    safeConsumerClose();
                }
                this.lock.unlock();
            }
        } catch (Throwable th) {
            if (!this.pollExceptionStrategy.canContinue()) {
                safeUnsubscribe();
                safeConsumerClose();
            }
            this.lock.unlock();
            throw th;
        }
    }

    private void updateTaskState() {
        switch (this.state) {
            case PAUSE_REQUESTED:
                LOG.info("Pausing the consumer as a response to a pause request");
                this.consumer.pause(this.consumer.assignment());
                this.state = State.PAUSED;
                return;
            case RESUME_REQUESTED:
                LOG.info("Resuming the consumer as a response to a resume request");
                if (this.consumer.committed(this.consumer.assignment()) != null) {
                    this.consumer.committed(this.consumer.assignment()).forEach((obj, obj2) -> {
                        if (obj2 != null) {
                            TopicPartition topicPartition = (TopicPartition) obj;
                            LOG.info("Resuming from the offset {} for the topic {} with partition {}", new Object[]{Long.valueOf(((OffsetAndMetadata) obj2).offset()), topicPartition.topic(), Integer.valueOf(topicPartition.partition())});
                            this.consumer.seek(topicPartition, ((OffsetAndMetadata) obj2).offset());
                        }
                    });
                }
                this.consumer.resume(this.consumer.assignment());
                this.state = State.RUNNING;
                return;
            default:
                return;
        }
    }

    private void safeConsumerClose() {
        try {
            LOG.debug("Closing consumer {}", this.threadId);
            IOHelper.close(this.consumer, "Kafka consumer (thread ID " + this.threadId + ")", LOG);
        } catch (Exception e) {
            LOG.error("Error closing the Kafka consumer: {} (this error will be ignored)", e.getMessage(), e);
        }
    }

    private void safeUnsubscribe() {
        if (this.consumer == null) {
            return;
        }
        String printableTopic = getPrintableTopic();
        try {
            LOG.debug("Unsubscribing from Kafka");
            this.consumer.unsubscribe();
            LOG.debug("Done unsubscribing from Kafka");
        } catch (IllegalStateException e) {
            LOG.warn("The consumer is likely already closed. Skipping unsubscribing thread {} from kafka {}", this.threadId, printableTopic);
        } catch (Exception e2) {
            LOG.debug("Something went wrong while unsubscribing from Kafka: {}", e2.getMessage());
            this.kafkaConsumer.getExceptionHandler().handleException("Error unsubscribing thread " + this.threadId + " from kafka " + printableTopic, e2);
        }
    }

    private String getPrintableTopic() {
        return this.topicPattern != null ? "topic pattern " + this.topicPattern : "topic " + this.topicName;
    }

    private boolean isKafkaConsumerRunnable() {
        return (!this.kafkaConsumer.isRunAllowed() || this.kafkaConsumer.isStoppingOrStopped() || this.kafkaConsumer.isSuspendingOrSuspended()) ? false : true;
    }

    private boolean isKafkaConsumerRunnableAndNotStopped() {
        return this.kafkaConsumer.isRunAllowed() && !this.kafkaConsumer.isStoppingOrStopped();
    }

    private boolean isReconnect() {
        return this.reconnect;
    }

    public void setReconnect(boolean z) {
        this.reconnect = z;
    }

    private void safeStop() {
        if (this.consumer == null) {
            return;
        }
        long shutdownTimeout = this.kafkaConsumer.m2getEndpoint().getConfiguration().getShutdownTimeout();
        try {
            try {
                LOG.info("Waiting up to {} milliseconds for the processing to finish", Long.valueOf(shutdownTimeout));
                if (!this.lock.tryLock(shutdownTimeout, TimeUnit.MILLISECONDS)) {
                    LOG.warn("The processing of the current record did not finish within {} seconds", Long.valueOf(shutdownTimeout));
                }
                this.consumer.wakeup();
                if (this.lock.isHeldByCurrentThread()) {
                    this.lock.unlock();
                }
            } catch (InterruptedException e) {
                this.consumer.wakeup();
                Thread.currentThread().interrupt();
                if (this.lock.isHeldByCurrentThread()) {
                    this.lock.unlock();
                }
            }
        } catch (Throwable th) {
            if (this.lock.isHeldByCurrentThread()) {
                this.lock.unlock();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        safeStop();
    }

    public boolean isConnected() {
        return this.connected;
    }

    public boolean isPaused() {
        return !this.consumer.paused().isEmpty();
    }

    public void setConnected(boolean z) {
        this.connected = z;
    }

    private boolean isReady() {
        if (!this.connected) {
            return false;
        }
        boolean z = true;
        try {
            if (this.consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
                org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer = this.consumer;
                ConsumerNetworkClient consumerNetworkClient = (ConsumerNetworkClient) ReflectionHelper.getField(kafkaConsumer.getClass().getDeclaredField("client"), kafkaConsumer);
                LOG.trace("Health-Check calling org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.hasReadyNode");
                z = consumerNetworkClient.hasReadyNodes(System.currentTimeMillis());
            }
        } catch (Exception e) {
            LOG.debug("Cannot check hasReadyNodes on KafkaConsumer client (ConsumerNetworkClient) due to: " + e.getMessage() + ". This exception is ignored.", e);
        }
        return z;
    }

    private Properties getKafkaProps() {
        return this.kafkaProps;
    }

    private boolean isTerminated() {
        return this.terminated;
    }

    private boolean isRecoverable() {
        return ((this.pollExceptionStrategy != null && this.pollExceptionStrategy.canContinue()) || isReconnect()) && isKafkaConsumerRunnable();
    }

    public TaskHealthState healthState() {
        return new TaskHealthState(isReady(), isTerminated(), isRecoverable(), this.lastError, this.clientId, this.currentBackoffInterval, this.kafkaProps);
    }

    public BridgeExceptionHandlerToErrorHandler getBridge() {
        return this.bridge;
    }

    public void pause() {
        LOG.info("A pause request was issued and the consumer thread will pause after current processing has finished");
        this.state = State.PAUSE_REQUESTED;
    }

    public void resume() {
        LOG.info("A resume request was issued and the consumer thread will resume after current processing has finished");
        this.state = State.RESUME_REQUESTED;
    }

    private synchronized void setLastError(Exception exc) {
        this.lastError = exc;
    }
}
