package org.apache.hadoop.hive.llap.shufflehandler;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hive.common.util.Retry;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/TestShuffleHandler.class */
public class TestShuffleHandler {

    @Rule
    public Retry retry = new Retry(2);
    private static final File TEST_DIR = new File(System.getProperty("test.build.data"), TestShuffleHandler.class.getName()).getAbsoluteFile();
    private static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";

    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/TestShuffleHandler$LastSocketAddress.class */
    static class LastSocketAddress {
        SocketAddress lastAddress;

        LastSocketAddress() {
        }

        void setAddress(SocketAddress socketAddress) {
            this.lastAddress = socketAddress;
        }

        SocketAddress getSocketAddress() {
            return this.lastAddress;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/TestShuffleHandler$MockShuffleHandler2.class */
    private static class MockShuffleHandler2 extends ShuffleHandler {
        boolean socketKeepAlive;

        MockShuffleHandler2(Configuration configuration) {
            super(configuration);
            this.socketKeepAlive = false;
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration configuration) {
            return new ShuffleHandler.Shuffle(configuration) { // from class: org.apache.hadoop.hive.llap.shufflehandler.TestShuffleHandler.MockShuffleHandler2.1
                protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    SocketChannel channel = channelHandlerContext.channel();
                    MockShuffleHandler2.this.socketKeepAlive = channel.config().isKeepAlive();
                }
            };
        }

        protected boolean isSocketKeepAlive() {
            return this.socketKeepAlive;
        }
    }

    @Test(timeout = 10000)
    public void testKeepAlive() throws Exception {
        final ArrayList arrayList = new ArrayList(1);
        Configuration configuration = new Configuration();
        configuration.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        configuration.setInt("llap.shuffle.port", 0);
        configuration.setBoolean("llap.shuffle.connection-keep-alive.enable", true);
        configuration.setInt("llap.shuffle.connection-keep-alive.timeout", -100);
        final LastSocketAddress lastSocketAddress = new LastSocketAddress();
        new ShuffleHandler(configuration) { // from class: org.apache.hadoop.hive.llap.shufflehandler.TestShuffleHandler.1
            protected ShuffleHandler.Shuffle getShuffle(Configuration configuration2) {
                return new ShuffleHandler.Shuffle(configuration2) { // from class: org.apache.hadoop.hive.llap.shufflehandler.TestShuffleHandler.1.1
                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String str, int i, String str2, int i2, String str3) throws IOException {
                        return null;
                    }

                    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
                    }

                    protected void populateHeaders(List<String> list, String str, int i, String str2, int i2, HttpResponse httpResponse, boolean z, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> map) throws IOException {
                        ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        shuffleHeader.write(new DataOutputBuffer());
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        for (int i3 = 0; i3 < 100000; i3++) {
                            shuffleHeader.write(dataOutputBuffer);
                        }
                        super.setResponseHeaders(httpResponse, z, dataOutputBuffer.getLength());
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, int i, ShuffleHandler.Shuffle.MapOutputInfo mapOutputInfo) throws IOException {
                        lastSocketAddress.setAddress(channel.remoteAddress());
                        ShuffleHeader shuffleHeader = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
                        shuffleHeader.write(dataOutputBuffer);
                        channel.writeAndFlush(Unpooled.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
                        DataOutputBuffer dataOutputBuffer2 = new DataOutputBuffer();
                        for (int i2 = 0; i2 < 100000; i2++) {
                            shuffleHeader.write(dataOutputBuffer2);
                        }
                        return channel.writeAndFlush(Unpooled.wrappedBuffer(dataOutputBuffer2.getData(), 0, dataOutputBuffer2.getLength()));
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error());
                            channelHandlerContext.channel().close();
                        }
                    }

                    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
                        if (arrayList.size() == 0) {
                            arrayList.add(new Error());
                            channelHandlerContext.channel().close();
                        }
                    }
                };
            }
        }.start();
        String str = "http://127.0.0.1:" + configuration.get("llap.shuffle.port");
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0").openConnection();
        httpURLConnection.setRequestProperty("name", "mapreduce");
        httpURLConnection.setRequestProperty("version", "1.0.0");
        httpURLConnection.connect();
        DataInputStream dataInputStream = new DataInputStream(httpURLConnection.getInputStream());
        Assert.assertEquals("keep-alive", httpURLConnection.getHeaderField("Connection"));
        Assert.assertEquals("timeout=1", httpURLConnection.getHeaderField("keep-alive"));
        Assert.assertEquals(200L, httpURLConnection.getResponseCode());
        new ShuffleHeader().readFields(dataInputStream);
        do {
        } while (dataInputStream.read(new byte[1024]) != -1);
        SocketAddress socketAddress = lastSocketAddress.getSocketAddress();
        dataInputStream.close();
        HttpURLConnection httpURLConnection2 = (HttpURLConnection) new URL(str + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0&keepAlive=true").openConnection();
        httpURLConnection2.setRequestProperty("name", "mapreduce");
        httpURLConnection2.setRequestProperty("version", "1.0.0");
        httpURLConnection2.connect();
        DataInputStream dataInputStream2 = new DataInputStream(httpURLConnection2.getInputStream());
        Assert.assertEquals("keep-alive", httpURLConnection2.getHeaderField("Connection"));
        Assert.assertEquals("timeout=1", httpURLConnection2.getHeaderField("keep-alive"));
        Assert.assertEquals(200L, httpURLConnection2.getResponseCode());
        new ShuffleHeader().readFields(dataInputStream2);
        dataInputStream2.close();
        SocketAddress socketAddress2 = lastSocketAddress.getSocketAddress();
        Assert.assertNotNull("Initial shuffle address should not be null", socketAddress);
        Assert.assertNotNull("Keep-Alive shuffle address should not be null", socketAddress2);
        Assert.assertEquals("Initial shuffle address and keep-alive shuffle address should be the same", socketAddress, socketAddress2);
    }

    @Test
    public void testSocketKeepAlive() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        configuration.setInt("llap.shuffle.port", 0);
        configuration.setBoolean("llap.shuffle.connection-keep-alive.enable", true);
        configuration.setInt("llap.shuffle.connection-keep-alive.timeout", -100);
        HttpURLConnection httpURLConnection = null;
        MockShuffleHandler2 mockShuffleHandler2 = new MockShuffleHandler2(configuration);
        try {
            mockShuffleHandler2.start();
            httpURLConnection = (HttpURLConnection) new URL(("http://127.0.0.1:" + configuration.get("llap.shuffle.port")) + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0").openConnection();
            httpURLConnection.setRequestProperty("name", "mapreduce");
            httpURLConnection.setRequestProperty("version", "1.0.0");
            httpURLConnection.connect();
            httpURLConnection.getInputStream();
            Assert.assertTrue("socket should be set KEEP_ALIVE", mockShuffleHandler2.isSocketKeepAlive());
            if (httpURLConnection != null) {
                httpURLConnection.disconnect();
            }
            mockShuffleHandler2.stop();
        } catch (Throwable th) {
            if (httpURLConnection != null) {
                httpURLConnection.disconnect();
            }
            mockShuffleHandler2.stop();
            throw th;
        }
    }

    @Test
    public void testConfigPortStatic() throws Exception {
        int nextInt = new Random().nextInt(10) + 50000;
        Configuration configuration = new Configuration();
        configuration.setInt("llap.shuffle.port", nextInt);
        MockShuffleHandler2 mockShuffleHandler2 = new MockShuffleHandler2(configuration);
        try {
            mockShuffleHandler2.start();
            Assert.assertEquals(nextInt, mockShuffleHandler2.getPort());
            mockShuffleHandler2.stop();
        } catch (Throwable th) {
            mockShuffleHandler2.stop();
            throw th;
        }
    }

    @Test
    public void testConfigPortDynamic() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInt("llap.shuffle.port", 0);
        MockShuffleHandler2 mockShuffleHandler2 = new MockShuffleHandler2(configuration);
        try {
            mockShuffleHandler2.start();
            Assert.assertTrue("ShuffleHandler should use a random chosen port", mockShuffleHandler2.getPort() > 0);
        } finally {
            mockShuffleHandler2.stop();
        }
    }
}
