/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal;

import java.io.IOException;
import java.net.URI;
import java.util.function.Supplier;
import org.apache.flink.kinesis.shaded.io.netty.buffer.Unpooled;
import org.apache.flink.kinesis.shaded.io.netty.channel.Channel;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelDuplexHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelPipeline;
import org.apache.flink.kinesis.shaded.io.netty.channel.pool.ChannelPool;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.kinesis.shaded.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Future;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.Promise;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.annotations.SdkTestInternalApi;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.Logger;

@SdkInternalApi
public final class ProxyTunnelInitHandler
extends ChannelDuplexHandler {
    public static final Logger log = Logger.loggerFor(ProxyTunnelInitHandler.class);
    private final ChannelPool sourcePool;
    private final URI remoteHost;
    private final Promise<Channel> initPromise;
    private final Supplier<HttpClientCodec> httpCodecSupplier;

    public ProxyTunnelInitHandler(ChannelPool sourcePool, URI remoteHost, Promise<Channel> initPromise) {
        this(sourcePool, remoteHost, initPromise, HttpClientCodec::new);
    }

    @SdkTestInternalApi
    public ProxyTunnelInitHandler(ChannelPool sourcePool, URI remoteHost, Promise<Channel> initPromise, Supplier<HttpClientCodec> httpCodecSupplier) {
        this.sourcePool = sourcePool;
        this.remoteHost = remoteHost;
        this.initPromise = initPromise;
        this.httpCodecSupplier = httpCodecSupplier;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        ChannelPipeline pipeline = ctx.pipeline();
        pipeline.addBefore(ctx.name(), null, this.httpCodecSupplier.get());
        HttpRequest connectRequest = this.connectRequest();
        ctx.channel().writeAndFlush(connectRequest).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)f -> {
            if (!f.isSuccess()) {
                this.handleConnectRequestFailure(ctx, f.cause());
            }
        }));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        if (ctx.pipeline().get(HttpClientCodec.class) != null) {
            ctx.pipeline().remove(HttpClientCodec.class);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        HttpResponse response;
        if (msg instanceof HttpResponse && (response = (HttpResponse)msg).status().code() == 200) {
            ctx.pipeline().remove(this);
            this.initPromise.setSuccess(ctx.channel());
            return;
        }
        ctx.pipeline().remove(this);
        ctx.close();
        this.sourcePool.release(ctx.channel());
        this.initPromise.setFailure(new IOException("Could not connect to proxy"));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (!this.initPromise.isDone()) {
            this.handleConnectRequestFailure(ctx, null);
        } else {
            log.debug(() -> "The proxy channel (" + ctx.channel().id() + ") is inactive");
            this.closeAndRelease(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (!this.initPromise.isDone()) {
            this.handleConnectRequestFailure(ctx, cause);
        } else {
            log.debug(() -> "An exception occurred on the proxy tunnel channel (" + ctx.channel().id() + "). The channel has been closed to prevent any ongoing issues.", cause);
            this.closeAndRelease(ctx);
        }
    }

    private void handleConnectRequestFailure(ChannelHandlerContext ctx, Throwable cause) {
        this.closeAndRelease(ctx);
        String errorMsg = "Unable to send CONNECT request to proxy";
        IOException ioException = cause == null ? new IOException(errorMsg) : new IOException(errorMsg, cause);
        this.initPromise.setFailure(ioException);
    }

    private void closeAndRelease(ChannelHandlerContext ctx) {
        ctx.close();
        this.sourcePool.release(ctx.channel());
    }

    private HttpRequest connectRequest() {
        String uri = this.getUri();
        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.CONNECT, uri, Unpooled.EMPTY_BUFFER, false);
        request.headers().add((CharSequence)HttpHeaderNames.HOST, (Object)uri);
        return request;
    }

    private String getUri() {
        return this.remoteHost.getHost() + ":" + this.remoteHost.getPort();
    }
}

