package com.microsoft.azure.spring.integration.servicebus.topic;

import com.google.common.collect.Sets;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusRuntimeException;
import com.microsoft.azure.spring.integration.servicebus.ServiceBusTemplate;
import com.microsoft.azure.spring.integration.servicebus.factory.ServiceBusTopicClientFactory;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate.class */
public class ServiceBusTopicTemplate extends ServiceBusTemplate<ServiceBusTopicClientFactory> implements ServiceBusTopicOperation {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusTopicTemplate.class);
    private Set<Tuple<String, String>> nameAndConsumerGroups;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/microsoft/azure/spring/integration/servicebus/topic/ServiceBusTopicTemplate$TopicMessageHandler.class */
    public class TopicMessageHandler<U> extends ServiceBusTemplate<ServiceBusTopicClientFactory>.ServiceBusMessageHandler<U> {
        private final ISubscriptionClient subscriptionClient;

        public TopicMessageHandler(Consumer<Message<U>> consumer, Class<U> cls, ISubscriptionClient iSubscriptionClient) {
            super(ServiceBusTopicTemplate.this, consumer, cls);
            this.subscriptionClient = iSubscriptionClient;
        }

        @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusTemplate.ServiceBusMessageHandler
        protected CompletableFuture<Void> success(UUID uuid) {
            return this.subscriptionClient.completeAsync(uuid);
        }

        @Override // com.microsoft.azure.spring.integration.servicebus.ServiceBusTemplate.ServiceBusMessageHandler
        protected CompletableFuture<Void> failure(UUID uuid) {
            return this.subscriptionClient.abandonAsync(uuid);
        }
    }

    public ServiceBusTopicTemplate(ServiceBusTopicClientFactory serviceBusTopicClientFactory) {
        super(serviceBusTopicClientFactory);
        this.nameAndConsumerGroups = Sets.newConcurrentHashSet();
    }

    @Override // com.microsoft.azure.spring.integration.core.api.SubscribeByGroupOperation
    public boolean subscribe(String str, String str2, @NonNull Consumer<Message<?>> consumer, Class<?> cls) {
        Assert.hasText(str, "destination can't be null or empty");
        Tuple<String, String> of = Tuple.of(str, str2);
        if (this.nameAndConsumerGroups.contains(of)) {
            return false;
        }
        this.nameAndConsumerGroups.add(of);
        internalSubscribe(str, str2, consumer, cls);
        return true;
    }

    public boolean unsubscribe(String str, String str2) {
        return this.nameAndConsumerGroups.remove(Tuple.of(str, str2));
    }

    protected void internalSubscribe(String str, String str2, Consumer<Message<?>> consumer, Class<?> cls) {
        ISubscriptionClient apply = ((ServiceBusTopicClientFactory) this.senderFactory).getSubscriptionClientCreator().apply(Tuple.of(str, str2));
        try {
            apply.registerMessageHandler(new TopicMessageHandler(consumer, cls, apply), this.options);
        } catch (ServiceBusException | InterruptedException e) {
            LOGGER.error("Failed to register topic message handler", e);
            throw new ServiceBusRuntimeException("Failed to register topic message handler", e);
        }
    }
}
