package com.microsoft.azure.spring.integration.servicebus.queue;

import com.google.common.collect.Sets;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageSession;
import com.microsoft.azure.servicebus.IQueueClient;
import com.microsoft.azure.servicebus.ISessionHandler;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.integration.core.AzureCheckpointer;
import com.microsoft.azure.spring.integration.core.api.CheckpointMode;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusClientConfig;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.class */
public class ServiceBusQueueTemplate extends ServiceBusTemplate<ServiceBusQueueClientFactory> implements ServiceBusQueueOperation {
    private static final Logger log = LoggerFactory.getLogger(ServiceBusQueueTemplate.class);
    private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s in queue '%s'";
    private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in queue '%s' in %s mode";
    private final Set<String> subscribedQueues;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate$QueueMessageHandler.class */
    public class QueueMessageHandler<U> extends ServiceBusMessageHandler<U> implements ISessionHandler {
        private final IQueueClient queueClient;

        public QueueMessageHandler(Consumer<Message<U>> consumer, Class<U> cls, IQueueClient iQueueClient) {
            super(consumer, cls, ServiceBusQueueTemplate.this.getCheckpointConfig(), ServiceBusQueueTemplate.this.getMessageConverter());
            this.queueClient = iQueueClient;
        }

        @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler
        protected CompletableFuture<Void> success(UUID uuid) {
            return this.queueClient.completeAsync(uuid);
        }

        @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler
        protected CompletableFuture<Void> failure(UUID uuid) {
            return this.queueClient.abandonAsync(uuid);
        }

        @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler
        protected String buildCheckpointFailMessage(Message<?> message) {
            return String.format(ServiceBusQueueTemplate.MSG_FAIL_CHECKPOINT, message, this.queueClient.getQueueName());
        }

        @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler
        protected String buildCheckpointSuccessMessage(Message<?> message) {
            return String.format(ServiceBusQueueTemplate.MSG_SUCCESS_CHECKPOINT, message, this.queueClient.getQueueName(), ServiceBusQueueTemplate.this.getCheckpointConfig().getCheckpointMode());
        }

        public CompletableFuture<Void> onMessageAsync(IMessageSession iMessageSession, IMessage iMessage) {
            HashMap hashMap = new HashMap();
            AzureCheckpointer azureCheckpointer = new AzureCheckpointer(() -> {
                return iMessageSession.completeAsync(iMessage.getLockToken());
            }, () -> {
                return iMessageSession.abandonAsync(iMessage.getLockToken());
            });
            if (this.checkpointConfig.getCheckpointMode() == CheckpointMode.MANUAL) {
                hashMap.put("azure_checkpointer", azureCheckpointer);
            }
            Message<U> message = this.messageConverter.toMessage(iMessage, new MessageHeaders(hashMap), this.payloadType);
            this.consumer.accept(message);
            return this.checkpointConfig.getCheckpointMode() == CheckpointMode.RECORD ? azureCheckpointer.success().whenComplete((r6, th) -> {
                super.checkpointHandler(message, th);
            }) : CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<Void> OnCloseSessionAsync(IMessageSession iMessageSession) {
            ServiceBusQueueTemplate.log.info("Closed session '" + iMessageSession.getSessionId() + "' for subscription: " + iMessageSession.getEntityPath());
            return CompletableFuture.completedFuture(null);
        }
    }

    public ServiceBusQueueTemplate(ServiceBusQueueClientFactory serviceBusQueueClientFactory) {
        super(serviceBusQueueClientFactory);
        this.subscribedQueues = Sets.newConcurrentHashSet();
    }

    public boolean subscribe(String str, @NonNull Consumer<Message<?>> consumer, @NonNull Class<?> cls) {
        Assert.hasText(str, "destination can't be null or empty");
        if (this.subscribedQueues.contains(str)) {
            return false;
        }
        this.subscribedQueues.add(str);
        internalSubscribe(str, consumer, cls);
        return true;
    }

    public boolean unsubscribe(String str) {
        return this.subscribedQueues.remove(str);
    }

    protected void internalSubscribe(String str, Consumer<Message<?>> consumer, Class<?> cls) {
        IQueueClient orCreateClient = ((ServiceBusQueueClientFactory) this.senderFactory).getOrCreateClient(str);
        String format = String.format("%s-handler", str);
        try {
            orCreateClient.setPrefetchCount(this.clientConfig.getPrefetchCount());
            if (this.clientConfig.isSessionsEnabled()) {
                orCreateClient.registerSessionHandler(new QueueMessageHandler(consumer, cls, orCreateClient), buildSessionHandlerOptions(), buildHandlerExecutors(format));
            } else {
                orCreateClient.registerMessageHandler(new QueueMessageHandler(consumer, cls, orCreateClient), buildHandlerOptions(), buildHandlerExecutors(format));
            }
        } catch (ServiceBusException | InterruptedException e) {
            log.error("Failed to register queue message handler", e);
            throw new ServiceBusRuntimeException("Failed to register queue message handler", e);
        }
    }

    @Override // com.microsoft.azure.spring.integration.servicebus.queue.ServiceBusQueueOperation
    public void setClientConfig(@NonNull ServiceBusClientConfig serviceBusClientConfig) {
        this.clientConfig = serviceBusClientConfig;
    }
}
