/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.MathUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.bootstrap.Bootstrap;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.apache.pulsar.shade.io.netty.channel.ChannelException;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.resolver.AddressResolver;
import org.apache.pulsar.shade.io.netty.resolver.dns.DnsAddressResolverGroup;
import org.apache.pulsar.shade.io.netty.resolver.dns.DnsNameResolverBuilder;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionPool
implements AutoCloseable {
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    private final ClientConfigurationData clientConfig;
    private final EventLoopGroup eventLoopGroup;
    private final int maxConnectionsPerHosts;
    private final boolean isSniProxy;
    protected final AddressResolver<InetSocketAddress> addressResolver;
    private final boolean shouldCloseDnsResolver;
    private static final Random random = new Random();
    private static final Logger log = LoggerFactory.getLogger(ConnectionPool.class);

    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
        this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup));
    }

    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier) throws PulsarClientException {
        this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty());
    }

    public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier<ClientCnx> clientCnxSupplier, Optional<AddressResolver<InetSocketAddress>> addressResolver) throws PulsarClientException {
        this.eventLoopGroup = eventLoopGroup;
        this.clientConfig = conf;
        this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
        this.isSniProxy = this.clientConfig.isUseTls() && this.clientConfig.getProxyProtocol() != null && StringUtils.isNotBlank(this.clientConfig.getProxyServiceUrl());
        this.pool = new ConcurrentHashMap();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup);
        this.bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        this.bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        this.bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        try {
            this.channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
            this.bootstrap.handler(this.channelInitializerHandler);
        }
        catch (Exception e) {
            log.error("Failed to create channel initializer");
            throw new PulsarClientException(e);
        }
        this.shouldCloseDnsResolver = !addressResolver.isPresent();
        this.addressResolver = addressResolver.orElseGet(() -> ConnectionPool.createAddressResolver(conf, eventLoopGroup));
    }

    private static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder().traceEnabled(true).channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup));
        if (conf.getDnsLookupBindAddress() != null) {
            InetSocketAddress addr = new InetSocketAddress(conf.getDnsLookupBindAddress(), conf.getDnsLookupBindPort());
            dnsNameResolverBuilder.localAddress(addr);
        }
        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
        return new DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next());
    }

    public int genRandomKeyToSelectCon() {
        if (this.maxConnectionsPerHosts == 0) {
            return -1;
        }
        return MathUtils.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts);
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress address) {
        if (this.maxConnectionsPerHosts == 0) {
            return this.getConnection(address, address, -1);
        }
        return this.getConnection(address, address, MathUtils.signSafeMod(random.nextInt(), this.maxConnectionsPerHosts));
    }

    void closeAllConnections() {
        this.pool.values().forEach(map -> map.values().forEach(future -> {
            if (future.isDone()) {
                if (!future.isCompletedExceptionally()) {
                    ((ClientCnx)future.join()).close();
                }
            } else {
                future.thenAccept(ClientCnx::close);
            }
        }));
    }

    public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress, int randomKey) {
        if (this.maxConnectionsPerHosts == 0) {
            return this.createConnection(logicalAddress, physicalAddress, -1);
        }
        ConcurrentMap innerPool = this.pool.computeIfAbsent(logicalAddress, a -> new ConcurrentHashMap());
        CompletableFuture completableFuture = innerPool.computeIfAbsent(randomKey, k -> this.createConnection(logicalAddress, physicalAddress, randomKey));
        if (completableFuture.isCompletedExceptionally()) {
            this.cleanupConnection(logicalAddress, randomKey, completableFuture);
        }
        return completableFuture;
    }

    private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress, int connectionKey) {
        if (log.isDebugEnabled()) {
            log.debug("Connection for {} not found in cache", (Object)logicalAddress);
        }
        CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();
        ((CompletableFuture)this.createConnection(logicalAddress, physicalAddress).thenAccept(channel -> {
            log.info("[{}] Connected to server", channel);
            channel.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)v -> {
                if (log.isDebugEnabled()) {
                    log.debug("Removing closed connection from pool: {}", (Object)v);
                }
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
            }));
            ClientCnx cnx = (ClientCnx)channel.pipeline().get("handler");
            if (!channel.isActive() || cnx == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection was already closed by the time we got notified", channel);
                }
                cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
                return;
            }
            ((CompletableFuture)cnx.connectionFuture().thenRun(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Connection handshake completed", (Object)cnx.channel());
                }
                cnxFuture.complete(cnx);
            })).exceptionally(exception -> {
                log.warn("[{}] Connection handshake failed: {}", (Object)cnx.channel(), (Object)exception.getMessage());
                cnxFuture.completeExceptionally((Throwable)exception);
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
        })).exceptionally(exception -> {
            this.eventLoopGroup.execute(() -> {
                log.warn("Failed to open connection to {} : {}", (Object)physicalAddress, (Object)exception.getMessage());
                this.cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnxFuture.completeExceptionally(new PulsarClientException((Throwable)exception));
            });
            return null;
        });
        return cnxFuture;
    }

    private CompletableFuture<Channel> createConnection(InetSocketAddress logicalAddress, InetSocketAddress unresolvedPhysicalAddress) {
        try {
            CompletableFuture<List<InetSocketAddress>> resolvedAddress;
            if (this.isSniProxy) {
                URI proxyURI = new URI(this.clientConfig.getProxyServiceUrl());
                resolvedAddress = this.resolveName(InetSocketAddress.createUnresolved(proxyURI.getHost(), proxyURI.getPort()));
            } else {
                resolvedAddress = this.resolveName(unresolvedPhysicalAddress);
            }
            return resolvedAddress.thenCompose(inetAddresses -> this.connectToResolvedAddresses(logicalAddress, unresolvedPhysicalAddress, inetAddresses.iterator(), this.isSniProxy ? unresolvedPhysicalAddress : null));
        }
        catch (URISyntaxException e) {
            log.error("Invalid Proxy url {}", (Object)this.clientConfig.getProxyServiceUrl(), (Object)e);
            return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL("Invalid url " + this.clientConfig.getProxyServiceUrl(), e));
        }
    }

    private CompletableFuture<Channel> connectToResolvedAddresses(InetSocketAddress logicalAddress, InetSocketAddress unresolvedPhysicalAddress, Iterator<InetSocketAddress> resolvedPhysicalAddress, InetSocketAddress sniHost) {
        CompletableFuture<Channel> future = new CompletableFuture<Channel>();
        ((CompletableFuture)this.connectToAddress(logicalAddress, resolvedPhysicalAddress.next(), unresolvedPhysicalAddress, sniHost).thenAccept(future::complete)).exceptionally(exception -> {
            if (resolvedPhysicalAddress.hasNext()) {
                ((CompletableFuture)this.connectToResolvedAddresses(logicalAddress, unresolvedPhysicalAddress, resolvedPhysicalAddress, sniHost).thenAccept(future::complete)).exceptionally(ex -> {
                    future.completeExceptionally((Throwable)ex);
                    return null;
                });
            } else {
                future.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return future;
    }

    CompletableFuture<List<InetSocketAddress>> resolveName(InetSocketAddress unresolvedAddress) {
        CompletableFuture<List<InetSocketAddress>> future = new CompletableFuture<List<InetSocketAddress>>();
        this.addressResolver.resolveAll(unresolvedAddress).addListener(resolveFuture -> {
            if (resolveFuture.isSuccess()) {
                future.complete((List)resolveFuture.get());
            } else {
                future.completeExceptionally(resolveFuture.cause());
            }
        });
        return future;
    }

    private CompletableFuture<Channel> connectToAddress(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress, InetSocketAddress unresolvedPhysicalAddress, InetSocketAddress sniHost) {
        if (this.clientConfig.isUseTls()) {
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)ChannelFutures.toCompletableFuture(this.bootstrap.register()).thenCompose(channel -> this.channelInitializerHandler.initTls((Channel)channel, sniHost != null ? sniHost : physicalAddress))).thenCompose(this.channelInitializerHandler::initSocks5IfConfig)).thenCompose(ch -> this.channelInitializerHandler.initializeClientCnx((Channel)ch, logicalAddress, unresolvedPhysicalAddress))).thenCompose(channel -> ChannelFutures.toCompletableFuture(channel.connect(physicalAddress)));
        }
        return ((CompletableFuture)((CompletableFuture)ChannelFutures.toCompletableFuture(this.bootstrap.register()).thenCompose(this.channelInitializerHandler::initSocks5IfConfig)).thenCompose(ch -> this.channelInitializerHandler.initializeClientCnx((Channel)ch, logicalAddress, unresolvedPhysicalAddress))).thenCompose(channel -> ChannelFutures.toCompletableFuture(channel.connect(physicalAddress)));
    }

    public void releaseConnection(ClientCnx cnx) {
        if (this.maxConnectionsPerHosts == 0 && cnx.channel().isActive()) {
            if (log.isDebugEnabled()) {
                log.debug("close connection due to pooling disabled.");
            }
            cnx.close();
        }
    }

    @Override
    public void close() throws Exception {
        this.closeAllConnections();
        if (this.shouldCloseDnsResolver) {
            this.addressResolver.close();
        }
    }

    private void cleanupConnection(InetSocketAddress address, int connectionKey, CompletableFuture<ClientCnx> connectionFuture) {
        ConcurrentMap<Integer, CompletableFuture<ClientCnx>> map = this.pool.get(address);
        if (map != null) {
            map.remove(connectionKey, connectionFuture);
        }
    }

    @VisibleForTesting
    int getPoolSize() {
        return this.pool.values().stream().mapToInt(Map::size).sum();
    }

    public Set<CompletableFuture<ClientCnx>> getConnections() {
        return Collections.unmodifiableSet(this.pool.values().stream().flatMap(n -> n.values().stream()).collect(Collectors.toSet()));
    }
}

