package com.microsoft.azure.spring.integration.eventhub;

import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.management.eventhub.EventHubNamespace;
import com.microsoft.azure.spring.cloud.context.core.AzureAdmin;
import com.microsoft.azure.spring.cloud.context.core.AzureUtil;
import com.microsoft.azure.spring.cloud.context.core.Tuple;
import com.microsoft.azure.spring.integration.core.Memoizer;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/DefaultEventHubClientFactory.class */
public class DefaultEventHubClientFactory implements EventHubClientFactory, DisposableBean {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventHubClientFactory.class);
    private final Map<String, EventHubClient> clientsByName = new ConcurrentHashMap();
    private final Map<Tuple<EventHubClient, String>, PartitionSender> partitionSenderMap = new ConcurrentHashMap();
    private final Map<Tuple<String, String>, EventProcessorHost> processorHostMap = new ConcurrentHashMap();
    private final AzureAdmin azureAdmin;
    private final EventHubNamespace namespace;
    private String checkpointStorageConnectionString;

    public DefaultEventHubClientFactory(AzureAdmin azureAdmin, String str) {
        Assert.notNull(azureAdmin, "azureAdmin can't be null.");
        Assert.hasText(str, "namespace can't be null or empty");
        this.azureAdmin = azureAdmin;
        this.namespace = azureAdmin.getOrCreateEventHubNamespace(str);
    }

    public void initCheckpointConnectionString(String str) {
        Assert.hasText(str, "checkpointStorageAccount can't be null or empty");
        this.checkpointStorageConnectionString = AzureUtil.getConnectionString(this.azureAdmin.getOrCreateStorageAccount(str));
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubClientFactory
    public Function<String, EventHubClient> getEventHubClientCreator() {
        return Memoizer.memoize(this.clientsByName, this::createEventHubClient);
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubClientFactory
    public Function<Tuple<EventHubClient, String>, PartitionSender> getPartitionSenderCreator() {
        return Memoizer.memoize(this.partitionSenderMap, this::createPartitionSender);
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.EventHubClientFactory
    public Function<Tuple<String, String>, EventProcessorHost> getProcessorHostCreator() {
        return Memoizer.memoize(this.processorHostMap, this::createEventProcessorHost);
    }

    private EventHubClient createEventHubClient(String str) {
        try {
            return EventHubClient.createSync(connectionStringCreator().apply(str), Executors.newSingleThreadExecutor());
        } catch (EventHubException | IOException e) {
            throw new EventHubRuntimeException("Error when creating event hub client", e);
        }
    }

    private PartitionSender createPartitionSender(Tuple<EventHubClient, String> tuple) {
        try {
            return ((EventHubClient) tuple.getFirst()).createPartitionSenderSync((String) tuple.getSecond());
        } catch (EventHubException e) {
            throw new EventHubRuntimeException("Error when creating event hub partition sender", e);
        }
    }

    private EventProcessorHost createEventProcessorHost(Tuple<String, String> tuple) {
        String str = (String) tuple.getFirst();
        return new EventProcessorHost(EventProcessorHost.createHostName(HostnameHelper.getHostname()), str, (String) tuple.getSecond(), connectionStringCreator().apply(str), this.checkpointStorageConnectionString, str);
    }

    private Function<String, String> connectionStringCreator() {
        return Memoizer.memoize(this::getConnectionString);
    }

    private String getConnectionString(String str) {
        return (String) this.namespace.listAuthorizationRules().stream().findFirst().map((v0) -> {
            return v0.getKeys();
        }).map((v0) -> {
            return v0.primaryConnectionString();
        }).map(str2 -> {
            return new ConnectionStringBuilder(str2).setEventHubName(str).toString();
        }).orElseThrow(() -> {
            return new RuntimeException(String.format("Failed to fetch connection string of '%s'", str), null);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <K, V> void close(Map<K, V> map, Function<V, CompletableFuture<Void>> function) {
        CompletableFuture.allOf((CompletableFuture[]) map.values().stream().map(function).toArray(i -> {
            return new CompletableFuture[i];
        })).exceptionally(th -> {
            LOGGER.warn("Failed to clean event hub client factory", th);
            return null;
        });
    }

    public void destroy() throws Exception {
        close(this.clientsByName, (v0) -> {
            return v0.close();
        });
        close(this.partitionSenderMap, (v0) -> {
            return v0.close();
        });
        close(this.processorHostMap, (v0) -> {
            return v0.unregisterEventProcessor();
        });
    }
}
