package org.apache.camel.component.azure.eventhubs;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/azure/eventhubs/EventHubsConsumer.class */
public class EventHubsConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(EventHubsConsumer.class);
    private EventProcessorClient processorClient;

    public EventHubsConsumer(EventHubsEndpoint eventHubsEndpoint, Processor processor) {
        super(eventHubsEndpoint, processor);
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.processorClient = EventHubsClientFactory.createEventProcessorClient(getConfiguration(), this::onEventListener, this::onErrorListener);
        this.processorClient.start();
    }

    protected void doStop() throws Exception {
        if (this.processorClient != null) {
            this.processorClient.stop();
        }
        super.doStop();
    }

    public EventHubsConfiguration getConfiguration() {
        return m2getEndpoint().getConfiguration();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public EventHubsEndpoint m2getEndpoint() {
        return super.getEndpoint();
    }

    private Exchange createAzureEventHubExchange(EventContext eventContext) {
        Exchange createExchange = createExchange(true);
        Message in = createExchange.getIn();
        in.setBody(eventContext.getEventData().getBody());
        in.setHeader(EventHubsConstants.PARTITION_ID, eventContext.getPartitionContext().getPartitionId());
        in.setHeader(EventHubsConstants.PARTITION_KEY, eventContext.getEventData().getPartitionKey());
        in.setHeader(EventHubsConstants.OFFSET, eventContext.getEventData().getOffset());
        in.setHeader(EventHubsConstants.ENQUEUED_TIME, eventContext.getEventData().getEnqueuedTime());
        in.setHeader(EventHubsConstants.SEQUENCE_NUMBER, eventContext.getEventData().getSequenceNumber());
        return createExchange;
    }

    private Exchange createAzureEventHubExchange(ErrorContext errorContext) {
        Exchange createExchange = createExchange(true);
        createExchange.getIn().setHeader(EventHubsConstants.PARTITION_ID, errorContext.getPartitionContext().getPartitionId());
        createExchange.setException(errorContext.getThrowable());
        return createExchange;
    }

    private void onEventListener(final EventContext eventContext) {
        Exchange createAzureEventHubExchange = createAzureEventHubExchange(eventContext);
        createAzureEventHubExchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.1
            public void onComplete(Exchange exchange) {
                EventHubsConsumer.this.processCommit(exchange, eventContext);
            }

            public void onFailure(Exchange exchange) {
                EventHubsConsumer.this.processRollback(exchange);
            }
        });
        getAsyncProcessor().process(createAzureEventHubExchange, defaultConsumerCallback(createAzureEventHubExchange, true));
    }

    private void onErrorListener(ErrorContext errorContext) {
        Exchange createAzureEventHubExchange = createAzureEventHubExchange(errorContext);
        if (createAzureEventHubExchange.getException() != null) {
            getExceptionHandler().handleException("Error processing exchange", createAzureEventHubExchange, createAzureEventHubExchange.getException());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processCommit(Exchange exchange, EventContext eventContext) {
        try {
            eventContext.updateCheckpoint();
        } catch (Exception e) {
            getExceptionHandler().handleException("Error occurred during updating the checkpoint. This exception is ignored.", exchange, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange.", exchange, exception);
        }
    }
}
