package org.apache.camel.component.azure.eventhubs.operations;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
import org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.class */
public class EventHubsProducerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(EventHubsProducerOperations.class);
    private final EventHubProducerAsyncClient producerAsyncClient;
    private final EventHubsConfigurationOptionsProxy configurationOptionsProxy;

    public EventHubsProducerOperations(EventHubProducerAsyncClient eventHubProducerAsyncClient, EventHubsConfiguration eventHubsConfiguration) {
        ObjectHelper.notNull(eventHubProducerAsyncClient, "client cannot be null");
        this.producerAsyncClient = eventHubProducerAsyncClient;
        this.configurationOptionsProxy = new EventHubsConfigurationOptionsProxy(eventHubsConfiguration);
    }

    public boolean sendEvents(Exchange exchange, AsyncCallback asyncCallback) {
        ObjectHelper.notNull(exchange, "exchange cannot be null");
        ObjectHelper.notNull(asyncCallback, "callback cannot be null");
        return sendAsyncEvents(createEventData(exchange), createSendOptions(this.configurationOptionsProxy.getPartitionKey(exchange), this.configurationOptionsProxy.getPartitionId(exchange)), exchange, asyncCallback);
    }

    private boolean sendAsyncEvents(Iterable<EventData> iterable, SendOptions sendOptions, Exchange exchange, AsyncCallback asyncCallback) {
        sendAsyncEventsWithSuitableMethod(iterable, sendOptions).subscribe(r3 -> {
            LOG.debug("Processed one event...");
        }, th -> {
            LOG.debug("Error processing async exchange with error: {}", th.getMessage());
            exchange.setException(th);
            asyncCallback.done(false);
        }, () -> {
            LOG.debug("All events with exchange have been sent successfully.");
            asyncCallback.done(false);
        });
        return false;
    }

    private Mono<Void> sendAsyncEventsWithSuitableMethod(Iterable<EventData> iterable, SendOptions sendOptions) {
        return ObjectHelper.isEmpty(sendOptions) ? this.producerAsyncClient.send(iterable) : this.producerAsyncClient.send(iterable, sendOptions);
    }

    private SendOptions createSendOptions(String str, String str2) {
        if (ObjectHelper.isNotEmpty(str) && ObjectHelper.isNotEmpty(str2)) {
            throw new IllegalArgumentException("Both partitionKey and partitionId are set. Only one or the other can be set.");
        }
        if (ObjectHelper.isEmpty(str) && ObjectHelper.isEmpty(str2)) {
            return null;
        }
        return new SendOptions().setPartitionId(str2).setPartitionKey(str);
    }

    private Iterable<EventData> createEventData(Exchange exchange) {
        return exchange.getIn().getBody() instanceof Iterable ? createEventDataFromIterable((Iterable) exchange.getIn().getBody(), exchange.getContext().getTypeConverter(), exchange.getIn().getHeaders()) : Collections.singletonList(createEventDataFromExchange(exchange));
    }

    private Iterable<EventData> createEventDataFromIterable(Iterable<Object> iterable, TypeConverter typeConverter, Map<String, Object> map) {
        LinkedList linkedList = new LinkedList();
        iterable.forEach(obj -> {
            if (obj instanceof Exchange) {
                linkedList.add(createEventDataFromExchange((Exchange) obj));
            } else if (obj instanceof Message) {
                linkedList.add(createEventDataFromMessage((Message) obj));
            } else {
                linkedList.add(createEventDataFromObject(obj, typeConverter, map));
            }
        });
        return linkedList;
    }

    private EventData createEventDataFromExchange(Exchange exchange) {
        return createEventDataFromMessage(exchange.getIn());
    }

    private EventData createEventDataFromMessage(Message message) {
        return createEventDataFromObject(message.getBody(), message.getExchange().getContext().getTypeConverter(), message.getHeaders());
    }

    private EventData createEventDataFromObject(Object obj, TypeConverter typeConverter, Map<String, Object> map) {
        byte[] bArr = (byte[]) typeConverter.convertTo(byte[].class, obj);
        if (ObjectHelper.isEmpty(bArr)) {
            throw new IllegalArgumentException(String.format("Cannot convert message body %s to byte[]. You will need to make sure the data encoded in byte[] or add a Camel TypeConverter to convert the data to byte[]", obj));
        }
        EventData eventData = new EventData(bArr);
        eventData.getProperties().putAll(map);
        return eventData;
    }
}
