package com.microsoft.azure.spring.integration.servicebus.topic;

import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.cloud.context.core.Tuple;
import com.microsoft.azure.spring.integration.core.Checkpointer;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusMessageHandler;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusSendTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.class */
public class ServiceBusTopicTemplate extends ServiceBusSendTemplate<ServiceBusTopicClientFactory> implements ServiceBusTopicOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusTopicTemplate.class);
    private final Map<Tuple<String, String>, Set<Consumer<Iterable<IMessage>>>> consumersByNameAndConsumerGroup;
    private final Function<Tuple<String, String>, Checkpointer<IMessage>> checkpointGetter;

    public ServiceBusTopicTemplate(ServiceBusTopicClientFactory serviceBusTopicClientFactory) {
        super(serviceBusTopicClientFactory);
        this.consumersByNameAndConsumerGroup = new ConcurrentHashMap();
        this.checkpointGetter = Memoizer.memoize(this::createCheckpointer);
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation
    public synchronized boolean subscribe(String str, @NonNull Consumer<Iterable<IMessage>> consumer, @NonNull String str2) {
        Assert.hasText(str, "destination can't be null or empty");
        Tuple<String, String> of = Tuple.of(str, str2);
        this.consumersByNameAndConsumerGroup.putIfAbsent(of, new CopyOnWriteArraySet());
        boolean add = this.consumersByNameAndConsumerGroup.get(of).add(consumer);
        try {
            ((ServiceBusTopicClientFactory) this.senderFactory).getSubscriptionClientCreator().apply(Tuple.of(str, str2)).registerMessageHandler(new ServiceBusMessageHandler(this.consumersByNameAndConsumerGroup.get(of)));
            return add;
        } catch (ServiceBusException | InterruptedException e) {
            LOGGER.error("Failed to register message handler", e);
            throw new ServiceBusRuntimeException("Failed to register message handler", e);
        }
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation
    public synchronized boolean unsubscribe(String str, Consumer<Iterable<IMessage>> consumer, String str2) {
        return this.consumersByNameAndConsumerGroup.get(Tuple.of(str, str2)).remove(consumer);
    }

    @Override // com.microsoft.azure.spring.integration.core.SubscribeByGroupOperation
    public Checkpointer<IMessage> getCheckpointer(String str, String str2) {
        return this.checkpointGetter.apply(Tuple.of(str, str2));
    }

    private Checkpointer<IMessage> createCheckpointer(Tuple<String, String> tuple) {
        return new ServiceBusTopicCheckpointer(((ServiceBusTopicClientFactory) this.senderFactory).getSubscriptionClientCreator().apply(Tuple.of((String) tuple.getFirst(), (String) tuple.getSecond())));
    }
}
