package org.apache.camel.component.azure.cosmosdb;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.implementation.apachecommons.lang.RandomStringUtils;
import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.cosmosdb.client.CosmosAsyncClientWrapper;
import org.apache.camel.component.azure.cosmosdb.operations.CosmosDbContainerOperations;
import org.apache.camel.component.azure.cosmosdb.operations.CosmosDbOperationsBuilder;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.EmptyAsyncCallback;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ObjectHelper;

/* loaded from: input_file:org/apache/camel/component/azure/cosmosdb/CosmosDbConsumer.class */
public class CosmosDbConsumer extends DefaultConsumer {
    private Synchronization onCompletion;
    private CosmosAsyncClientWrapper clientWrapper;
    private ChangeFeedProcessor changeFeedProcessor;

    /* loaded from: input_file:org/apache/camel/component/azure/cosmosdb/CosmosDbConsumer$ConsumerOnCompletion.class */
    private class ConsumerOnCompletion extends SynchronizationAdapter {
        private ConsumerOnCompletion() {
        }

        public void onFailure(Exchange exchange) {
            Exception exception = exchange.getException();
            if (exception != null) {
                CosmosDbConsumer.this.getExceptionHandler().handleException("Error during processing exchange.", exchange, exception);
            }
        }
    }

    public CosmosDbConsumer(CosmosDbEndpoint cosmosDbEndpoint, Processor processor) {
        super(cosmosDbEndpoint, processor);
    }

    protected void doInit() throws Exception {
        super.doInit();
        this.clientWrapper = new CosmosAsyncClientWrapper(m1getEndpoint().getCosmosAsyncClient());
        this.onCompletion = new ConsumerOnCompletion();
        this.changeFeedProcessor = getContainerOperations().captureEventsWithChangeFeed(getLeaseContainerOperations().getContainer(), getHostName(), this::onEventListener, getConfiguration().getChangeFeedProcessorOptions());
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.changeFeedProcessor.start().subscribe(r1 -> {
        }, this::onErrorListener);
    }

    protected void doStop() throws Exception {
        if (this.changeFeedProcessor != null) {
            this.changeFeedProcessor.stop().block();
        }
        super.doStop();
    }

    public CosmosDbConfiguration getConfiguration() {
        return m1getEndpoint().getConfiguration();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public CosmosDbEndpoint m1getEndpoint() {
        return super.getEndpoint();
    }

    private void onEventListener(List<Map<String, ?>> list) {
        Exchange createAzureCosmosDbExchange = createAzureCosmosDbExchange(list);
        createAzureCosmosDbExchange.adapt(ExtendedExchange.class).addOnCompletion(this.onCompletion);
        getAsyncProcessor().process(createAzureCosmosDbExchange, EmptyAsyncCallback.get());
    }

    private Exchange createAzureCosmosDbExchange(List<Map<String, ?>> list) {
        Exchange createExchange = createExchange(true);
        createExchange.getIn().setBody(list);
        return createExchange;
    }

    private void onErrorListener(Throwable th) {
        getExceptionHandler().handleException("Error processing exchange", th);
    }

    private CosmosDbContainerOperations getContainerOperations() {
        return CosmosDbOperationsBuilder.withClient(this.clientWrapper).withDatabaseName(getConfiguration().getDatabaseName()).withCreateDatabaseIfNotExist(getConfiguration().isCreateDatabaseIfNotExists()).withContainerName(getConfiguration().getContainerName()).withContainerPartitionKeyPath(getConfiguration().getContainerPartitionKeyPath()).withCreateContainerIfNotExist(getConfiguration().isCreateContainerIfNotExists()).withThroughputProperties(getConfiguration().getThroughputProperties()).buildContainerOperations();
    }

    private CosmosDbContainerOperations getLeaseContainerOperations() {
        return CosmosDbOperationsBuilder.withClient(this.clientWrapper).withDatabaseName(ObjectHelper.isEmpty(getConfiguration().getLeaseDatabaseName()) ? getConfiguration().getDatabaseName() : getConfiguration().getLeaseDatabaseName()).withCreateDatabaseIfNotExist(getConfiguration().isCreateLeaseDatabaseIfNotExists()).withContainerName(getConfiguration().getLeaseContainerName()).withContainerPartitionKeyPath("/id").withCreateContainerIfNotExist(getConfiguration().isCreateLeaseContainerIfNotExists()).withThroughputProperties(getConfiguration().getThroughputProperties()).buildContainerOperations();
    }

    private String getHostName() {
        return ObjectHelper.isEmpty(getConfiguration().getHostName()) ? RandomStringUtils.randomAlphabetic(10) : getConfiguration().getHostName();
    }
}
