package co.cask.cdap.internal.app.runtime.monitor.proxy;

import co.cask.cdap.common.http.Channels;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.internal.app.runtime.monitor.SSHSessionProvider;
import co.cask.cdap.runtime.spi.ssh.PortForwarding;
import com.google.common.io.Closeables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.socksx.SocksMessage;
import io.netty.handler.codec.socksx.v4.DefaultSocks4CommandResponse;
import io.netty.handler.codec.socksx.v4.Socks4CommandRequest;
import io.netty.handler.codec.socksx.v4.Socks4CommandStatus;
import io.netty.handler.codec.socksx.v4.Socks4CommandType;
import io.netty.handler.codec.socksx.v5.DefaultSocks5CommandResponse;
import io.netty.handler.codec.socksx.v5.Socks5CommandRequest;
import io.netty.handler.codec.socksx.v5.Socks5CommandStatus;
import io.netty.handler.codec.socksx.v5.Socks5CommandType;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/proxy/SocksServerConnectHandler.class */
final class SocksServerConnectHandler extends SimpleChannelInboundHandler<SocksMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(SocksServerHandler.class);
    private static final Logger OUTAGE_LOG = Loggers.sampling(LOG, LogSamplers.perMessage(() -> {
        return LogSamplers.limitRate(TimeUnit.MINUTES.toMillis(1L));
    }));
    private final SSHSessionProvider sshSessionProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/proxy/SocksServerConnectHandler$SocksException.class */
    public static final class SocksException extends Exception {
        private final SocksMessage response;

        private SocksException(SocksMessage socksMessage, Throwable th) {
            super(th);
            this.response = socksMessage;
        }

        SocksMessage getResponse() {
            return this.response;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SocksServerConnectHandler(SSHSessionProvider sSHSessionProvider) {
        this.sshSessionProvider = sSHSessionProvider;
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, SocksMessage socksMessage) throws SocksException {
        if (socksMessage instanceof Socks4CommandRequest) {
            Socks4CommandRequest socks4CommandRequest = (Socks4CommandRequest) socksMessage;
            if (!socks4CommandRequest.type().equals(Socks4CommandType.CONNECT)) {
                sendAndClose(channelHandlerContext, new DefaultSocks4CommandResponse(Socks4CommandStatus.REJECTED_OR_FAILED));
                return;
            }
            try {
                handleConnectRequest(channelHandlerContext, socks4CommandRequest.dstAddr(), socks4CommandRequest.dstPort(), new DefaultSocks4CommandResponse(Socks4CommandStatus.SUCCESS, socks4CommandRequest.dstAddr(), socks4CommandRequest.dstPort()));
                return;
            } catch (Exception e) {
                throw new SocksException(new DefaultSocks4CommandResponse(Socks4CommandStatus.REJECTED_OR_FAILED), e);
            }
        }
        if (!(socksMessage instanceof Socks5CommandRequest)) {
            Channels.closeOnFlush(channelHandlerContext.channel());
            return;
        }
        Socks5CommandRequest socks5CommandRequest = (Socks5CommandRequest) socksMessage;
        if (!socks5CommandRequest.type().equals(Socks5CommandType.CONNECT)) {
            sendAndClose(channelHandlerContext, new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, socks5CommandRequest.dstAddrType()));
            return;
        }
        try {
            handleConnectRequest(channelHandlerContext, socks5CommandRequest.dstAddr(), socks5CommandRequest.dstPort(), new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, socks5CommandRequest.dstAddrType(), socks5CommandRequest.dstAddr(), socks5CommandRequest.dstPort()));
        } catch (Exception e2) {
            throw new SocksException(new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, socks5CommandRequest.dstAddrType()), e2);
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        OUTAGE_LOG.warn("Failed to tunnel traffic for channel {}", channelHandlerContext.channel(), th);
        if (th instanceof SocksException) {
            sendAndClose(channelHandlerContext, ((SocksException) th).getResponse());
        } else {
            channelHandlerContext.close();
        }
    }

    private void handleConnectRequest(ChannelHandlerContext channelHandlerContext, String str, int i, SocksMessage socksMessage) throws IOException {
        ChannelHandler createForwardingChannelHandler = createForwardingChannelHandler(channelHandlerContext.channel(), str, i);
        channelHandlerContext.channel().write(socksMessage).addListener(channelFuture -> {
            if (!channelFuture.isSuccess()) {
                Channels.closeOnFlush(channelHandlerContext.channel());
            } else {
                channelHandlerContext.pipeline().remove(this);
                channelHandlerContext.pipeline().addLast(new ChannelHandler[]{createForwardingChannelHandler});
            }
        });
    }

    private void sendAndClose(ChannelHandlerContext channelHandlerContext, Object obj) {
        channelHandlerContext.writeAndFlush(obj).addListener(ChannelFutureListener.CLOSE);
    }

    private ChannelHandler createForwardingChannelHandler(final Channel channel, String str, int i) throws IOException {
        final PortForwarding createLocalPortForward = this.sshSessionProvider.getSession(new InetSocketAddress(str, i)).createLocalPortForward("localhost", i, i, new PortForwarding.DataConsumer() { // from class: co.cask.cdap.internal.app.runtime.monitor.proxy.SocksServerConnectHandler.1
            public void received(ByteBuffer byteBuffer) {
                channel.write(Unpooled.wrappedBuffer(byteBuffer).copy());
            }

            public void flushed() {
                channel.flush();
            }

            public void finished() {
                Channels.closeOnFlush(channel);
            }
        });
        channel.closeFuture().addListener(future -> {
            Closeables.closeQuietly(createLocalPortForward);
        });
        return new ChannelInboundHandlerAdapter() { // from class: co.cask.cdap.internal.app.runtime.monitor.proxy.SocksServerConnectHandler.2
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                ByteBuf byteBuf = (ByteBuf) obj;
                try {
                    createLocalPortForward.write(byteBuf.nioBuffer());
                    byteBuf.release();
                } catch (Throwable th) {
                    byteBuf.release();
                    throw th;
                }
            }

            public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
                createLocalPortForward.flush();
            }

            public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
                SocksServerConnectHandler.OUTAGE_LOG.warn("Exception raised when relaying messages", th);
                channelHandlerContext.close();
            }
        };
    }
}
