package com.microsoft.azure.spring.integration.eventhub;

import com.google.common.base.Strings;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.AzureHeaders;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.core.api.StartPosition;
import com.microsoft.azure.spring.integration.eventhub.converter.EventHubMessageConverter;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/EventHubTemplate.class */
public class EventHubTemplate implements EventHubOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubTemplate.class);
    private final EventHubClientFactory clientFactory;
    private final ConcurrentHashMap<Tuple<String, String>, EventProcessorHost> processorByNameAndGroup = new ConcurrentHashMap<>();
    private EventHubMessageConverter messageConverter = new EventHubMessageConverter();
    private StartPosition startPosition = StartPosition.LATEST;
    private CheckpointMode checkpointMode = CheckpointMode.BATCH;

    /* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/EventHubTemplate$EventHubProcessor.class */
    public class EventHubProcessor<T> implements IEventProcessor {
        private final Consumer<Message<T>> consumer;
        private final Class payloadType;

        public EventHubProcessor(@NonNull Consumer<Message<T>> consumer, @NonNull Class<T> cls) {
            if (consumer == null) {
                throw new NullPointerException("consumer");
            }
            if (cls == null) {
                throw new NullPointerException("payloadType");
            }
            this.consumer = consumer;
            this.payloadType = cls;
        }

        public void onOpen(PartitionContext partitionContext) throws Exception {
            EventHubTemplate.LOGGER.info("Partition {} is opening", partitionContext.getPartitionId());
        }

        public void onClose(PartitionContext partitionContext, CloseReason closeReason) throws Exception {
            EventHubTemplate.LOGGER.info("Partition {} is closing for reason {}", partitionContext.getPartitionId(), closeReason);
        }

        public void onEvents(PartitionContext partitionContext, Iterable<EventData> iterable) throws Exception {
            HashMap hashMap = new HashMap();
            hashMap.put(AzureHeaders.PARTITION_ID, partitionContext.getPartitionId());
            for (EventData eventData : iterable) {
                AzureCheckpointer azureCheckpointer = new AzureCheckpointer(() -> {
                    return partitionContext.checkpoint(eventData);
                });
                if (EventHubTemplate.this.checkpointMode == CheckpointMode.MANUAL) {
                    hashMap.put(AzureHeaders.CHECKPOINTER, azureCheckpointer);
                }
                this.consumer.accept(EventHubTemplate.this.messageConverter.toMessage(eventData, new MessageHeaders(hashMap), this.payloadType));
                if (EventHubTemplate.this.checkpointMode == CheckpointMode.RECORD) {
                    azureCheckpointer.success();
                }
            }
            if (EventHubTemplate.this.checkpointMode == CheckpointMode.BATCH) {
                partitionContext.checkpoint().whenComplete((BiConsumer) (r4, th) -> {
                    if (th != null) {
                        EventHubTemplate.LOGGER.warn("Failed to checkpoint", th);
                    }
                });
            }
        }

        public void onError(PartitionContext partitionContext, Throwable th) {
            EventHubTemplate.LOGGER.error("Partition {} onError", partitionContext.getPartitionId(), th);
        }
    }

    public EventHubTemplate(EventHubClientFactory eventHubClientFactory) {
        this.clientFactory = eventHubClientFactory;
    }

    private static EventProcessorOptions buildEventProcessorOptions(StartPosition startPosition) {
        EventProcessorOptions defaultOptions = EventProcessorOptions.getDefaultOptions();
        if (startPosition == StartPosition.EARLISET) {
            defaultOptions.setInitialPositionProvider(str -> {
                return EventPosition.fromStartOfStream();
            });
        } else {
            defaultOptions.setInitialPositionProvider(str2 -> {
                return EventPosition.fromEndOfStream();
            });
        }
        return defaultOptions;
    }

    @Override // com.microsoft.azure.spring.integration.core.api.SendOperation
    public <T> CompletableFuture<Void> sendAsync(String str, @NonNull Message<T> message, PartitionSupplier partitionSupplier) {
        if (message == null) {
            throw new NullPointerException("message");
        }
        Assert.hasText(str, "eventHubName can't be null or empty");
        EventData fromMessage = this.messageConverter.fromMessage(message, EventData.class);
        try {
            EventHubClient apply = this.clientFactory.getEventHubClientCreator().apply(str);
            return partitionSupplier == null ? apply.send(fromMessage) : !Strings.isNullOrEmpty(partitionSupplier.getPartitionId()) ? this.clientFactory.getPartitionSenderCreator().apply(Tuple.of(apply, partitionSupplier.getPartitionId())).send(fromMessage) : !Strings.isNullOrEmpty(partitionSupplier.getPartitionKey()) ? apply.send(fromMessage, partitionSupplier.getPartitionKey()) : apply.send(fromMessage);
        } catch (EventHubRuntimeException e) {
            LOGGER.error(String.format("Failed to send to '%s' ", str), e);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(e);
            return completableFuture;
        }
    }

    @Override // com.microsoft.azure.spring.integration.core.api.SubscribeByGroupOperation
    public boolean subscribe(String str, String str2, Consumer<Message<?>> consumer, Class<?> cls) {
        Tuple<String, String> of = Tuple.of(str, str2);
        if (this.processorByNameAndGroup.containsKey(of)) {
            return false;
        }
        this.processorByNameAndGroup.computeIfAbsent(of, tuple -> {
            return register(tuple, consumer, cls);
        });
        return true;
    }

    public boolean unsubscribe(String str, String str2) {
        Tuple of = Tuple.of(str, str2);
        if (!this.processorByNameAndGroup.containsKey(of)) {
            return false;
        }
        this.processorByNameAndGroup.remove(of).unregisterEventProcessor().whenComplete((r9, th) -> {
            if (th != null) {
                LOGGER.warn(String.format("Failed to unregister consumer '%s' with group '%s'", str, str2), th);
            }
        });
        return true;
    }

    protected EventProcessorHost register(Tuple<String, String> tuple, Consumer<Message<?>> consumer, Class<?> cls) {
        EventProcessorHost apply = this.clientFactory.getProcessorHostCreator().apply(tuple);
        apply.registerEventProcessorFactory(partitionContext -> {
            return new EventHubProcessor(consumer, cls);
        }, buildEventProcessorOptions(this.startPosition));
        return apply;
    }

    public EventHubMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(EventHubMessageConverter eventHubMessageConverter) {
        this.messageConverter = eventHubMessageConverter;
    }

    public StartPosition getStartPosition() {
        return this.startPosition;
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubOperation
    public void setStartPosition(StartPosition startPosition) {
        this.startPosition = startPosition;
    }

    @Override // com.microsoft.azure.spring.integration.core.api.SubscribeByGroupOperation
    public void setCheckpointMode(CheckpointMode checkpointMode) {
        this.checkpointMode = checkpointMode;
    }
}
