package org.apache.camel.component.azure.servicebus;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
import org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/camel/component/azure/servicebus/ServiceBusProducer.class */
public class ServiceBusProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusProducer.class);
    private ServiceBusSenderAsyncClientWrapper senderClientWrapper;
    private ServiceBusConfigurationOptionsProxy configurationOptionsProxy;
    private ServiceBusSenderOperations serviceBusSenderOperations;
    private final Map<ServiceBusProducerOperationDefinition, BiConsumer<Exchange, AsyncCallback>> operationsToExecute;

    public ServiceBusProducer(Endpoint endpoint) {
        super(endpoint);
        this.operationsToExecute = new HashMap();
        bind(ServiceBusProducerOperationDefinition.sendMessages, sendMessages());
        bind(ServiceBusProducerOperationDefinition.scheduleMessages, scheduleMessages());
    }

    protected void doInit() throws Exception {
        super.doInit();
        this.configurationOptionsProxy = new ServiceBusConfigurationOptionsProxy(getConfiguration());
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.senderClientWrapper = new ServiceBusSenderAsyncClientWrapper(getConfiguration().getSenderAsyncClient() != null ? getConfiguration().getSenderAsyncClient() : ServiceBusClientFactory.createServiceBusSenderAsyncClient(getConfiguration()));
        this.serviceBusSenderOperations = new ServiceBusSenderOperations(this.senderClientWrapper);
    }

    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            invokeOperation(this.configurationOptionsProxy.getServiceBusProducerOperationDefinition(exchange), exchange, asyncCallback);
            return false;
        } catch (Exception e) {
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
    }

    protected void doStop() throws Exception {
        if (this.senderClientWrapper != null) {
            this.senderClientWrapper.close();
        }
        super.doStop();
    }

    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public ServiceBusEndpoint m5getEndpoint() {
        return super.getEndpoint();
    }

    public ServiceBusConfiguration getConfiguration() {
        return m5getEndpoint().getConfiguration();
    }

    private void bind(ServiceBusProducerOperationDefinition serviceBusProducerOperationDefinition, BiConsumer<Exchange, AsyncCallback> biConsumer) {
        this.operationsToExecute.put(serviceBusProducerOperationDefinition, biConsumer);
    }

    private void invokeOperation(ServiceBusProducerOperationDefinition serviceBusProducerOperationDefinition, Exchange exchange, AsyncCallback asyncCallback) {
        ServiceBusProducerOperationDefinition serviceBusProducerOperationDefinition2 = ObjectHelper.isEmpty(serviceBusProducerOperationDefinition) ? ServiceBusProducerOperationDefinition.sendMessages : serviceBusProducerOperationDefinition;
        BiConsumer<Exchange, AsyncCallback> biConsumer = this.operationsToExecute.get(serviceBusProducerOperationDefinition2);
        if (biConsumer == null) {
            throw new RuntimeCamelException("Operation not supported. Value: " + serviceBusProducerOperationDefinition2);
        }
        biConsumer.accept(exchange, asyncCallback);
    }

    private BiConsumer<Exchange, AsyncCallback> sendMessages() {
        return (exchange, asyncCallback) -> {
            Object body = exchange.getMessage().getBody();
            Map<String, Object> map = (Map) exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
            subscribeToMono(exchange.getMessage().getBody() instanceof Iterable ? this.serviceBusSenderOperations.sendMessages((Object) convertBodyToList((Iterable) body), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map) : this.serviceBusSenderOperations.sendMessages(exchange.getMessage().getBody(String.class), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map), exchange, r1 -> {
            }, asyncCallback);
        };
    }

    private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
        return (exchange, asyncCallback) -> {
            Object body = exchange.getMessage().getBody();
            Map<String, Object> map = (Map) exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.class);
            subscribeToMono(exchange.getMessage().getBody() instanceof Iterable ? this.serviceBusSenderOperations.scheduleMessages((Object) convertBodyToList((Iterable) body), this.configurationOptionsProxy.getScheduledEnqueueTime(exchange), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map) : this.serviceBusSenderOperations.scheduleMessages(exchange.getMessage().getBody(String.class), this.configurationOptionsProxy.getScheduledEnqueueTime(exchange), this.configurationOptionsProxy.getServiceBusTransactionContext(exchange), map), exchange, list -> {
                exchange.getMessage().setBody(list);
            }, asyncCallback);
        };
    }

    private List<String> convertBodyToList(Iterable<Object> iterable) {
        return (List) StreamSupport.stream(iterable.spliterator(), false).map(obj -> {
            return (String) m5getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, obj);
        }).collect(Collectors.toList());
    }

    private <T> void subscribeToMono(Mono<T> mono, Exchange exchange, Consumer<T> consumer, AsyncCallback asyncCallback) {
        mono.subscribe(consumer, th -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error processing async exchange with error: {}", th.getMessage());
            }
            exchange.setException(th);
            asyncCallback.done(false);
        }, () -> {
            LOG.trace("All events with exchange have been sent successfully.");
            asyncCallback.done(false);
        });
    }
}
