/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal;

import java.net.URI;
import org.apache.flink.kinesis.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.kinesis.shaded.io.netty.channel.Channel;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.EventLoop;
import org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool;
import org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPoolHandler;
import org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslContext;
import org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler;
import org.apache.flink.kinesis.shaded.io.netty.util.AttributeKey;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Future;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Promise;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkTestInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.ProxyTunnelInitHandler;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.Logger;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.StringUtils;

@SdkInternalApi
public class Http1TunnelConnectionPool
implements ChannelPool {
    static final AttributeKey<Boolean> TUNNEL_ESTABLISHED_KEY = NettyUtils.getOrCreateAttributeKey("aws.http.nio.netty.async.Http1TunnelConnectionPool.tunnelEstablished");
    private static final Logger log = Logger.loggerFor(Http1TunnelConnectionPool.class);
    private final EventLoop eventLoop;
    private final ChannelPool delegate;
    private final SslContext sslContext;
    private final URI proxyAddress;
    private final URI remoteAddress;
    private final ChannelPoolHandler handler;
    private final InitHandlerSupplier initHandlerSupplier;

    public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler) {
        this(eventLoop, delegate, sslContext, proxyAddress, remoteAddress, handler, ProxyTunnelInitHandler::new);
    }

    @SdkTestInternalApi
    Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler, InitHandlerSupplier initHandlerSupplier) {
        this.eventLoop = eventLoop;
        this.delegate = delegate;
        this.sslContext = sslContext;
        this.proxyAddress = proxyAddress;
        this.remoteAddress = remoteAddress;
        this.handler = handler;
        this.initHandlerSupplier = initHandlerSupplier;
    }

    @Override
    public Future<Channel> acquire() {
        return this.acquire(this.eventLoop.newPromise());
    }

    @Override
    public Future<Channel> acquire(Promise<Channel> promise) {
        this.delegate.acquire(this.eventLoop.newPromise()).addListener(f -> {
            if (f.isSuccess()) {
                this.setupChannel((Channel)f.getNow(), promise);
            } else {
                promise.setFailure(f.cause());
            }
        });
        return promise;
    }

    @Override
    public Future<Void> release(Channel channel) {
        return this.release(channel, this.eventLoop.newPromise());
    }

    @Override
    public Future<Void> release(Channel channel, Promise<Void> promise) {
        return this.delegate.release(channel, promise);
    }

    @Override
    public void close() {
        this.delegate.close();
    }

    private void setupChannel(Channel ch, Promise<Channel> acquirePromise) {
        if (Http1TunnelConnectionPool.isTunnelEstablished(ch)) {
            log.debug(() -> String.format("Tunnel already established for %s", ch.id().asShortText()));
            acquirePromise.setSuccess(ch);
            return;
        }
        log.debug(() -> String.format("Tunnel not yet established for channel %s. Establishing tunnel now.", ch.id().asShortText()));
        Promise<Channel> tunnelEstablishedPromise = this.eventLoop.newPromise();
        SslHandler sslHandler = this.createSslHandlerIfNeeded(ch.alloc());
        if (sslHandler != null) {
            ch.pipeline().addLast(sslHandler);
        }
        ch.pipeline().addLast(this.initHandlerSupplier.newInitHandler(this.delegate, this.remoteAddress, tunnelEstablishedPromise));
        tunnelEstablishedPromise.addListener(f -> {
            if (f.isSuccess()) {
                Channel tunnel = (Channel)f.getNow();
                this.handler.channelCreated(tunnel);
                tunnel.attr(TUNNEL_ESTABLISHED_KEY).set(true);
                acquirePromise.setSuccess(tunnel);
            } else {
                ch.close();
                this.delegate.release(ch);
                Throwable cause = f.cause();
                log.error(() -> String.format("Unable to establish tunnel for channel %s", ch.id().asShortText()), cause);
                acquirePromise.setFailure(cause);
            }
        });
    }

    private SslHandler createSslHandlerIfNeeded(ByteBufAllocator alloc) {
        if (this.sslContext == null) {
            return null;
        }
        String scheme = this.proxyAddress.getScheme();
        if (!"https".equals(StringUtils.lowerCase(scheme))) {
            return null;
        }
        return NettyUtils.newSslHandler(this.sslContext, alloc, this.proxyAddress.getHost(), this.proxyAddress.getPort());
    }

    private static boolean isTunnelEstablished(Channel ch) {
        Boolean established = ch.attr(TUNNEL_ESTABLISHED_KEY).get();
        return Boolean.TRUE.equals(established);
    }

    @FunctionalInterface
    @SdkTestInternalApi
    static interface InitHandlerSupplier {
        public ChannelHandler newInitHandler(ChannelPool var1, URI var2, Promise<Channel> var3);
    }
}

