package com.microsoft.azure.spring.integration.eventhub;

import com.google.common.base.Strings;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.cloud.context.core.Tuple;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import com.microsoft.azure.spring.integration.core.PartitionSupplier;
import com.microsoft.azure.spring.integration.eventhub.inbound.EventHubCheckpointer;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/EventHubTemplate.class */
public class EventHubTemplate implements EventHubOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubTemplate.class);
    private final ConcurrentHashMap<Tuple<String, String>, Set<Consumer<Iterable<EventData>>>> consumersByNameAndConsumerGroup = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Tuple<String, String>, EventHubCheckpointer> checkpointersByNameAndConsumerGroup = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Tuple<String, String>, EventProcessorHost> processorHostsByNameAndConsumerGroup = new ConcurrentHashMap<>();
    private final EventHubClientFactory clientFactory;

    public EventHubTemplate(EventHubClientFactory eventHubClientFactory) {
        this.clientFactory = eventHubClientFactory;
    }

    @Override // com.microsoft.azure.spring.integration.core.SendOperation
    public CompletableFuture<Void> sendAsync(String str, EventData eventData, PartitionSupplier partitionSupplier) {
        Assert.hasText(str, "eventHubName can't be null or empty");
        Assert.notNull(eventData, "message can't be null");
        try {
            EventHubClient apply = this.clientFactory.getEventHubClientCreator().apply(str);
            return partitionSupplier == null ? apply.send(eventData) : !Strings.isNullOrEmpty(partitionSupplier.getPartitionId()) ? this.clientFactory.getPartitionSenderCreator().apply(Tuple.of(apply, partitionSupplier.getPartitionId())).send(eventData) : !Strings.isNullOrEmpty(partitionSupplier.getPartitionKey()) ? apply.send(eventData, partitionSupplier.getPartitionKey()) : apply.send(eventData);
        } catch (EventHubRuntimeException e) {
            LOGGER.error(String.format("Failed to send to '%s' ", str), e);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation
    public Checkpointer<EventData> getCheckpointer(String str, String str2) {
        return this.checkpointersByNameAndConsumerGroup.get(Tuple.of(str, str2));
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation
    public synchronized boolean subscribe(String str, Consumer<Iterable<EventData>> consumer, String str2) {
        Tuple<String, String> of = Tuple.of(str, str2);
        this.consumersByNameAndConsumerGroup.putIfAbsent(of, new CopyOnWriteArraySet());
        if (!this.consumersByNameAndConsumerGroup.get(of).add(consumer)) {
            return false;
        }
        this.processorHostsByNameAndConsumerGroup.computeIfAbsent(of, tuple -> {
            EventProcessorHost apply = this.clientFactory.getProcessorHostCreator().apply(Tuple.of(str, str2));
            apply.registerEventProcessorFactory(partitionContext -> {
                return new IEventProcessor() { // from class: com.microsoft.azure.spring.integration.eventhub.EventHubTemplate.1
                    public void onOpen(PartitionContext partitionContext) throws Exception {
                        EventHubTemplate.LOGGER.info(String.format("Partition %s is opening", partitionContext.getPartitionId()));
                        EventHubTemplate.this.checkpointersByNameAndConsumerGroup.putIfAbsent(of, new EventHubCheckpointer());
                        ((EventHubCheckpointer) EventHubTemplate.this.checkpointersByNameAndConsumerGroup.get(Tuple.of(str, str2))).addPartitionContext(partitionContext);
                    }

                    public void onClose(PartitionContext partitionContext, CloseReason closeReason) throws Exception {
                        EventHubTemplate.LOGGER.info(String.format("Partition %s is closing for reason %s", partitionContext.getPartitionId(), closeReason));
                        ((EventHubCheckpointer) EventHubTemplate.this.checkpointersByNameAndConsumerGroup.get(Tuple.of(str, str2))).removePartitionContext(partitionContext);
                    }

                    public void onEvents(PartitionContext partitionContext, Iterable<EventData> iterable) throws Exception {
                        ((Set) EventHubTemplate.this.consumersByNameAndConsumerGroup.get(of)).forEach(consumer2 -> {
                            consumer2.accept(iterable);
                        });
                    }

                    public void onError(PartitionContext partitionContext, Throwable th) {
                        EventHubTemplate.LOGGER.error(String.format("Partition %s onError", partitionContext.getPartitionId()), th);
                    }
                };
            });
            return apply;
        });
        return true;
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation
    public synchronized boolean unsubscribe(String str, Consumer<Iterable<EventData>> consumer, String str2) {
        Tuple of = Tuple.of(str, str2);
        if (!this.consumersByNameAndConsumerGroup.containsKey(of)) {
            return false;
        }
        boolean remove = this.consumersByNameAndConsumerGroup.get(of).remove(consumer);
        if (this.consumersByNameAndConsumerGroup.get(of).isEmpty()) {
            this.processorHostsByNameAndConsumerGroup.remove(of).unregisterEventProcessor();
        }
        return remove;
    }
}
