package co.cask.cdap.internal.app.runtime.monitor.proxy;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.ssh.DefaultSSHSession;
import co.cask.cdap.common.ssh.SSHConfig;
import co.cask.cdap.common.ssh.TestSSHServer;
import co.cask.cdap.runtime.spi.ssh.PortForwarding;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequests;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.BodyProducer;
import co.cask.http.HttpHandler;
import co.cask.http.HttpResponder;
import co.cask.http.NettyHttpService;
import com.google.common.base.Strings;
import com.google.common.io.ByteStreams;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.KeyPair;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/proxy/MonitorSocksProxyTest.class */
public class MonitorSocksProxyTest {
    private static final Logger LOG = LoggerFactory.getLogger(MonitorSocksProxyTest.class);

    @ClassRule
    public static final TestSSHServer SSH_SERVER = new TestSSHServer();
    private static KeyPair keyPair;
    private NettyHttpService httpService;
    private TestSSHSession sshSession;
    private MonitorSocksProxy proxyServer;
    private ProxySelector defaultProxySelector;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/proxy/MonitorSocksProxyTest$TestHandler.class */
    public static final class TestHandler extends AbstractHttpHandler {
        @GET
        @Path("/ping")
        public void ping(HttpRequest httpRequest, HttpResponder httpResponder) {
            httpResponder.sendStatus(HttpResponseStatus.OK);
        }

        @POST
        @Path("/chunk")
        public void chunk(FullHttpRequest fullHttpRequest, HttpResponder httpResponder) {
            final ByteBuf copy = fullHttpRequest.content().copy();
            httpResponder.sendContent(HttpResponseStatus.OK, new BodyProducer() { // from class: co.cask.cdap.internal.app.runtime.monitor.proxy.MonitorSocksProxyTest.TestHandler.1
                int count = 0;

                public ByteBuf nextChunk() {
                    int i = this.count;
                    this.count = i + 1;
                    return i < 10 ? copy.copy() : Unpooled.EMPTY_BUFFER;
                }

                public void finished() {
                }

                public void handleError(@Nullable Throwable th) {
                }
            }, new DefaultHttpHeaders());
        }
    }

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/proxy/MonitorSocksProxyTest$TestSSHSession.class */
    private static final class TestSSHSession extends DefaultSSHSession {
        private final AtomicInteger portForwardCreated;
        private final AtomicInteger portForwardClosed;

        TestSSHSession(SSHConfig sSHConfig) throws IOException {
            super(sSHConfig);
            this.portForwardCreated = new AtomicInteger();
            this.portForwardClosed = new AtomicInteger();
        }

        public PortForwarding createLocalPortForward(String str, int i, int i2, PortForwarding.DataConsumer dataConsumer) throws IOException {
            final PortForwarding createLocalPortForward = super.createLocalPortForward(str, i, i2, dataConsumer);
            this.portForwardCreated.incrementAndGet();
            return new PortForwarding() { // from class: co.cask.cdap.internal.app.runtime.monitor.proxy.MonitorSocksProxyTest.TestSSHSession.1
                public int write(ByteBuffer byteBuffer) throws IOException {
                    return createLocalPortForward.write(byteBuffer);
                }

                public void flush() throws IOException {
                    createLocalPortForward.flush();
                }

                public boolean isOpen() {
                    return createLocalPortForward.isOpen();
                }

                public void close() throws IOException {
                    createLocalPortForward.close();
                    TestSSHSession.this.portForwardClosed.incrementAndGet();
                }
            };
        }
    }

    @BeforeClass
    public static void init() throws Exception {
        keyPair = KeyPair.genKeyPair(new JSch(), 2, 1024);
        SSH_SERVER.addAuthorizedKey(keyPair, "cdap");
    }

    @Before
    public void beforeTest() throws Exception {
        this.httpService = NettyHttpService.builder("test").setHttpHandlers(new HttpHandler[]{new TestHandler()}).build();
        this.httpService.start();
        this.sshSession = new TestSSHSession(getSSHConfig());
        this.proxyServer = new MonitorSocksProxy(CConfiguration.create(), inetSocketAddress -> {
            return (TestSSHSession) Optional.ofNullable(this.sshSession).orElseThrow(() -> {
                return new IllegalArgumentException("No SSH session available for " + inetSocketAddress);
            });
        });
        this.proxyServer.startAndWait();
        final Proxy proxy = new Proxy(Proxy.Type.SOCKS, this.proxyServer.getBindAddress());
        this.defaultProxySelector = ProxySelector.getDefault();
        ProxySelector.setDefault(new ProxySelector() { // from class: co.cask.cdap.internal.app.runtime.monitor.proxy.MonitorSocksProxyTest.1
            @Override // java.net.ProxySelector
            public List<Proxy> select(URI uri) {
                return Collections.singletonList(proxy);
            }

            @Override // java.net.ProxySelector
            public void connectFailed(URI uri, SocketAddress socketAddress, IOException iOException) {
                MonitorSocksProxyTest.LOG.error("Connect failed {} {}", new Object[]{uri, socketAddress, iOException});
            }
        });
    }

    @After
    public void afterTest() throws Exception {
        ProxySelector.setDefault(this.defaultProxySelector);
        this.proxyServer.stopAndWait();
        this.httpService.stop();
    }

    @Test
    public void testSocksProxy() throws Exception {
        InetSocketAddress bindAddress = this.httpService.getBindAddress();
        URL url = new URL(String.format("http://%s:%d/ping", bindAddress.getHostName(), Integer.valueOf(bindAddress.getPort())));
        for (int i = 0; i < 10; i++) {
            Assert.assertEquals(200L, HttpRequests.execute(co.cask.common.http.HttpRequest.get(url).build()).getResponseCode());
        }
        Assert.assertEquals(1L, this.sshSession.portForwardCreated.get());
        Assert.assertEquals(200L, HttpRequests.execute(co.cask.common.http.HttpRequest.builder(HttpMethod.GET, url).addHeader(HttpHeaderNames.CONNECTION.toString(), HttpHeaderValues.CLOSE.toString()).build()).getResponseCode());
        Assert.assertEquals(1L, this.sshSession.portForwardCreated.get());
        Assert.assertEquals(1L, this.sshSession.portForwardClosed.get());
    }

    @Test
    public void testChunkCall() throws Exception {
        InetSocketAddress bindAddress = this.httpService.getBindAddress();
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(String.format("http://%s:%d/chunk", bindAddress.getHostName(), Integer.valueOf(bindAddress.getPort()))).openConnection();
        httpURLConnection.setDoOutput(true);
        OutputStream outputStream = httpURLConnection.getOutputStream();
        Throwable th = null;
        try {
            try {
                outputStream.write("Testing".getBytes(StandardCharsets.UTF_8));
                if (outputStream != null) {
                    if (0 != 0) {
                        try {
                            outputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        outputStream.close();
                    }
                }
                Assert.assertEquals(200L, httpURLConnection.getResponseCode());
                Assert.assertEquals(Strings.repeat("Testing", 10), new String(ByteStreams.toByteArray(httpURLConnection.getInputStream()), StandardCharsets.UTF_8));
            } finally {
            }
        } catch (Throwable th3) {
            if (outputStream != null) {
                if (th != null) {
                    try {
                        outputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    outputStream.close();
                }
            }
            throw th3;
        }
    }

    @Test(expected = IOException.class)
    public void testFailTunnel() throws Exception {
        InetSocketAddress bindAddress = this.httpService.getBindAddress();
        this.sshSession.close();
        HttpRequests.execute(co.cask.common.http.HttpRequest.get(new URL(String.format("http://%s:%d/ping", bindAddress.getHostName(), Integer.valueOf(bindAddress.getPort())))).build());
    }

    @Test(expected = IOException.class)
    public void testMissingSSHSession() throws IOException {
        TestSSHSession testSSHSession = this.sshSession;
        this.sshSession = null;
        try {
            InetSocketAddress bindAddress = this.httpService.getBindAddress();
            HttpRequests.execute(co.cask.common.http.HttpRequest.get(new URL(String.format("http://%s:%d/ping", bindAddress.getHostName(), Integer.valueOf(bindAddress.getPort())))).build());
            this.sshSession = testSSHSession;
        } catch (Throwable th) {
            this.sshSession = testSSHSession;
            throw th;
        }
    }

    private SSHConfig getSSHConfig() {
        return SSHConfig.builder(SSH_SERVER.getHost()).setPort(SSH_SERVER.getPort()).setUser("cdap").setPrivateKeySupplier(() -> {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            keyPair.writePrivateKey(byteArrayOutputStream, (byte[]) null);
            return byteArrayOutputStream.toByteArray();
        }).build();
    }
}
