package org.apache.camel.component.vertx.kafka;

import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfiguration;
import org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.class */
public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
    private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaConsumer.class);
    private KafkaConsumer<Object, Object> kafkaConsumer;

    public VertxKafkaConsumer(VertxKafkaEndpoint vertxKafkaEndpoint, Processor processor) {
        super(vertxKafkaEndpoint, processor);
    }

    protected void doStart() throws Exception {
        super.doStart();
        String bootstrapBrokers = m1getEndpoint().m2getComponent().getVertxKafkaClientFactory().getBootstrapBrokers(getConfiguration());
        if (bootstrapBrokers != null) {
            LOG.debug("Creating KafkaConsumer connecting to BootstrapBrokers: {}", bootstrapBrokers);
        }
        this.kafkaConsumer = m1getEndpoint().m2getComponent().getVertxKafkaClientFactory().getVertxKafkaConsumer(m1getEndpoint().getVertx(), getConfiguration().createConsumerConfiguration());
        new VertxKafkaConsumerOperations(this.kafkaConsumer, getConfiguration()).receiveEvents(this::onEventListener, this::onErrorListener);
    }

    protected void doStop() throws Exception {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
        }
        super.doStop();
    }

    protected void doSuspend() throws Exception {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.pause();
        }
    }

    protected void doResume() throws Exception {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.resume();
        }
    }

    public VertxKafkaConfiguration getConfiguration() {
        return m1getEndpoint().getConfiguration();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public VertxKafkaEndpoint m1getEndpoint() {
        return super.getEndpoint();
    }

    private void onEventListener(KafkaConsumerRecord<Object, Object> kafkaConsumerRecord) {
        Exchange createExchange = m1getEndpoint().createExchange(kafkaConsumerRecord);
        new VertxKafkaHeadersPropagation(getConfiguration().getHeaderFilterStrategy()).getPropagatedHeaders(kafkaConsumerRecord.headers(), createExchange.getIn()).forEach((str, buffer) -> {
            createExchange.getIn().setHeader(str, buffer);
        });
        createExchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.vertx.kafka.VertxKafkaConsumer.1
            public void onComplete(Exchange exchange) {
            }

            public void onFailure(Exchange exchange) {
                VertxKafkaConsumer.this.processRollback(exchange);
            }
        });
        getAsyncProcessor().process(createExchange, z -> {
            LOG.trace("Processing exchange [{}] done.", createExchange);
        });
    }

    private void onErrorListener(Throwable th) {
        Exchange createExchange = m1getEndpoint().createExchange();
        createExchange.setException(th);
        if (createExchange.getException() != null) {
            getExceptionHandler().handleException("Error processing exchange", createExchange, createExchange.getException());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange.", exchange, exception);
        }
    }
}
