package com.microsoft.azure.spring.integration.servicebus.queue;

import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusSendTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusQueueClientFactory;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/queue/ServiceBusQueueTemplate.class */
public class ServiceBusQueueTemplate extends ServiceBusSendTemplate<ServiceBusQueueClientFactory> implements ServiceBusQueueOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueTemplate.class);
    private final Map<String, Set<Consumer<Iterable<IMessage>>>> consumersByName;
    private final Function<String, Checkpointer<IMessage>> checkpointGetter;

    public ServiceBusQueueTemplate(ServiceBusQueueClientFactory serviceBusQueueClientFactory) {
        super(serviceBusQueueClientFactory);
        this.consumersByName = new ConcurrentHashMap();
        this.checkpointGetter = Memoizer.memoize(this::createCheckpointer);
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeOperation
    public synchronized boolean subscribe(String str, @NonNull Consumer<Iterable<IMessage>> consumer) {
        Assert.hasText(str, "destination can't be null or empty");
        this.consumersByName.putIfAbsent(str, new CopyOnWriteArraySet());
        boolean add = this.consumersByName.get(str).add(consumer);
        try {
            ((ServiceBusQueueClientFactory) this.senderFactory).getQueueClientCreator().apply(str).registerMessageHandler(new ServiceBusMessageHandler(this.consumersByName.get(str)));
            return add;
        } catch (ServiceBusException | InterruptedException e) {
            LOGGER.error("Failed to register message handler", e);
            throw new ServiceBusRuntimeException("Failed to register message handler", e);
        }
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeOperation
    public synchronized boolean unsubscribe(String str, Consumer<Iterable<IMessage>> consumer) {
        return this.consumersByName.get(str).remove(consumer);
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeOperation
    public Checkpointer<IMessage> getCheckpointer(String str) {
        return this.checkpointGetter.apply(str);
    }

    private Checkpointer<IMessage> createCheckpointer(String str) {
        return new ServiceBusQueueCheckpointer(((ServiceBusQueueClientFactory) this.senderFactory).getQueueClientCreator().apply(str));
    }
}
