package com.microsoft.azure.spring.integration.eventhub.factory;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.spring.cloud.context.core.util.Memoizer;
import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
import com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubRuntimeException;
import com.microsoft.azure.spring.integration.eventhub.util.HostnameHelper;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/factory/DefaultEventHubClientFactory.class */
public class DefaultEventHubClientFactory implements EventHubClientFactory, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventHubClientFactory.class);
    private static final String PROJECT_VERSION = DefaultEventHubClientFactory.class.getPackage().getImplementationVersion();
    private static final String USER_AGENT = "spring-cloud-azure/" + PROJECT_VERSION;
    private final String checkpointStorageConnectionString;
    private final EventHubConnectionStringProvider connectionStringProvider;
    private final Map<String, EventHubClient> clientsByName = new ConcurrentHashMap();
    private final Map<Tuple<EventHubClient, String>, PartitionSender> partitionSenderMap = new ConcurrentHashMap();
    private final Map<Tuple<String, String>, EventProcessorHost> processorHostMap = new ConcurrentHashMap();
    private final BiFunction<EventHubClient, String, PartitionSender> partitionSenderCreator = Memoizer.memoize(this.partitionSenderMap, this::createPartitionSender);
    private final Function<String, EventHubClient> eventHubClientCreator = Memoizer.memoize(this.clientsByName, this::createEventHubClient);
    private final BiFunction<String, String, EventProcessorHost> processorHostCreator = Memoizer.memoize(this.processorHostMap, this::createEventProcessorHost);

    public DefaultEventHubClientFactory(@NonNull EventHubConnectionStringProvider eventHubConnectionStringProvider, String str) {
        Assert.hasText(str, "checkpointConnectionString can't be null or empty");
        this.connectionStringProvider = eventHubConnectionStringProvider;
        this.checkpointStorageConnectionString = str;
        EventHubClientImpl.USER_AGENT = USER_AGENT + "/" + EventHubClientImpl.USER_AGENT;
    }

    private EventHubClient createEventHubClient(String str) {
        try {
            return EventHubClient.createSync(this.connectionStringProvider.getConnectionString(str), Executors.newSingleThreadScheduledExecutor());
        } catch (EventHubException | IOException e) {
            throw new EventHubRuntimeException("Error when creating event hub client", e);
        }
    }

    private PartitionSender createPartitionSender(EventHubClient eventHubClient, String str) {
        try {
            return eventHubClient.createPartitionSenderSync(str);
        } catch (EventHubException e) {
            throw new EventHubRuntimeException("Error when creating event hub partition sender", e);
        }
    }

    private EventProcessorHost createEventProcessorHost(String str, String str2) {
        return new EventProcessorHost(EventProcessorHost.createHostName(HostnameHelper.getHostname()), str, str2, this.connectionStringProvider.getConnectionString(str), this.checkpointStorageConnectionString, str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <K, V> void close(Map<K, V> map, Function<V, CompletableFuture<Void>> function) {
        CompletableFuture.allOf((CompletableFuture[]) map.values().stream().map(function).toArray(i -> {
            return new CompletableFuture[i];
        })).exceptionally(th -> {
            log.warn("Failed to clean event hub client factory", th);
            return null;
        });
    }

    public void destroy() throws Exception {
        close(this.clientsByName, (v0) -> {
            return v0.close();
        });
        close(this.partitionSenderMap, (v0) -> {
            return v0.close();
        });
        close(this.processorHostMap, (v0) -> {
            return v0.unregisterEventProcessor();
        });
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventHubClient getOrCreateClient(String str) {
        return this.eventHubClientCreator.apply(str);
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory
    public PartitionSender getOrCreatePartitionSender(String str, String str2) {
        return this.partitionSenderCreator.apply(getOrCreateClient(str), str2);
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventProcessorHost getOrCreateEventProcessorHost(String str, String str2) {
        return this.processorHostCreator.apply(str, str2);
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory
    public Optional<EventProcessorHost> getEventProcessorHost(String str, String str2) {
        return Optional.ofNullable(this.processorHostMap.get(Tuple.of(str, str2)));
    }

    @Override // com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory
    public EventProcessorHost removeEventProcessorHost(String str, String str2) {
        return this.processorHostMap.remove(Tuple.of(str, str2));
    }
}
