/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ChunkFetchIntegrationSuite {
    static final long STREAM_ID = 1L;
    static final int BUFFER_CHUNK_INDEX = 0;
    static final int FILE_CHUNK_INDEX = 1;
    static TransportContext context;
    static TransportServer server;
    static TransportClientFactory clientFactory;
    static StreamManager streamManager;
    static File testFile;
    static ManagedBuffer bufferChunk;
    static ManagedBuffer fileChunk;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @BeforeClass
    public static void setUp() throws Exception {
        int bufSize = 100000;
        final ByteBuffer buf = ByteBuffer.allocate(bufSize);
        for (int i = 0; i < bufSize; ++i) {
            buf.put((byte)i);
        }
        buf.flip();
        bufferChunk = new NioManagedBuffer(buf);
        testFile = File.createTempFile("shuffle-test-file", "txt");
        testFile.deleteOnExit();
        RandomAccessFile fp = new RandomAccessFile(testFile, "rw");
        boolean shouldSuppressIOException = true;
        try {
            byte[] fileContent = new byte[1024];
            new Random().nextBytes(fileContent);
            fp.write(fileContent);
            shouldSuppressIOException = false;
        }
        finally {
            Closeables.close((Closeable)fp, (boolean)shouldSuppressIOException);
        }
        final TransportConf conf = new TransportConf("shuffle", (ConfigProvider)MapConfigProvider.EMPTY);
        fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10L, testFile.length() - 25L);
        streamManager = new StreamManager(){

            public ManagedBuffer getChunk(long streamId, int chunkIndex) {
                Assert.assertEquals((long)1L, (long)streamId);
                if (chunkIndex == 0) {
                    return new NioManagedBuffer(buf);
                }
                if (chunkIndex == 1) {
                    return new FileSegmentManagedBuffer(conf, testFile, 10L, testFile.length() - 25L);
                }
                throw new IllegalArgumentException("Invalid chunk index: " + chunkIndex);
            }
        };
        RpcHandler handler = new RpcHandler(){

            public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
                throw new UnsupportedOperationException();
            }

            public StreamManager getStreamManager() {
                return streamManager;
            }
        };
        context = new TransportContext(conf, handler);
        server = context.createServer();
        clientFactory = context.createClientFactory();
    }

    @AfterClass
    public static void tearDown() {
        bufferChunk.release();
        server.close();
        clientFactory.close();
        context.close();
        testFile.delete();
    }

    private FetchResult fetchChunks(List<Integer> chunkIndices) throws Exception {
        final FetchResult res = new FetchResult();
        try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());){
            final Semaphore sem = new Semaphore(0);
            res.successChunks = Collections.synchronizedSet(new HashSet());
            res.failedChunks = Collections.synchronizedSet(new HashSet());
            res.buffers = Collections.synchronizedList(new LinkedList());
            ChunkReceivedCallback callback = new ChunkReceivedCallback(){

                public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
                    buffer.retain();
                    res.successChunks.add(chunkIndex);
                    res.buffers.add(buffer);
                    sem.release();
                }

                public void onFailure(int chunkIndex, Throwable e) {
                    res.failedChunks.add(chunkIndex);
                    sem.release();
                }
            };
            for (int chunkIndex : chunkIndices) {
                client.fetchChunk(1L, chunkIndex, callback);
            }
            if (!sem.tryAcquire(chunkIndices.size(), 60L, TimeUnit.SECONDS)) {
                Assert.fail((String)"Timeout getting response from the server");
            }
        }
        return res;
    }

    @Test
    public void fetchBufferChunk() throws Exception {
        FetchResult res = this.fetchChunks(Arrays.asList(0));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{0}), res.successChunks);
        Assert.assertTrue((boolean)res.failedChunks.isEmpty());
        ChunkFetchIntegrationSuite.assertBufferListsEqual(Arrays.asList(bufferChunk), res.buffers);
        res.releaseBuffers();
    }

    @Test
    public void fetchFileChunk() throws Exception {
        FetchResult res = this.fetchChunks(Arrays.asList(1));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{1}), res.successChunks);
        Assert.assertTrue((boolean)res.failedChunks.isEmpty());
        ChunkFetchIntegrationSuite.assertBufferListsEqual(Arrays.asList(fileChunk), res.buffers);
        res.releaseBuffers();
    }

    @Test
    public void fetchNonExistentChunk() throws Exception {
        FetchResult res = this.fetchChunks(Arrays.asList(12345));
        Assert.assertTrue((boolean)res.successChunks.isEmpty());
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{12345}), res.failedChunks);
        Assert.assertTrue((boolean)res.buffers.isEmpty());
    }

    @Test
    public void fetchBothChunks() throws Exception {
        FetchResult res = this.fetchChunks(Arrays.asList(0, 1));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{0, 1}), res.successChunks);
        Assert.assertTrue((boolean)res.failedChunks.isEmpty());
        ChunkFetchIntegrationSuite.assertBufferListsEqual(Arrays.asList(bufferChunk, fileChunk), res.buffers);
        res.releaseBuffers();
    }

    @Test
    public void fetchChunkAndNonExistent() throws Exception {
        FetchResult res = this.fetchChunks(Arrays.asList(0, 12345));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{0}), res.successChunks);
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new Integer[]{12345}), res.failedChunks);
        ChunkFetchIntegrationSuite.assertBufferListsEqual(Arrays.asList(bufferChunk), res.buffers);
        res.releaseBuffers();
    }

    private static void assertBufferListsEqual(List<ManagedBuffer> list0, List<ManagedBuffer> list1) throws Exception {
        Assert.assertEquals((long)list0.size(), (long)list1.size());
        for (int i = 0; i < list0.size(); ++i) {
            ChunkFetchIntegrationSuite.assertBuffersEqual(list0.get(i), list1.get(i));
        }
    }

    private static void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception {
        ByteBuffer nio0 = buffer0.nioByteBuffer();
        ByteBuffer nio1 = buffer1.nioByteBuffer();
        int len = nio0.remaining();
        Assert.assertEquals((long)nio0.remaining(), (long)nio1.remaining());
        for (int i = 0; i < len; ++i) {
            Assert.assertEquals((long)nio0.get(), (long)nio1.get());
        }
    }

    static class FetchResult {
        public Set<Integer> successChunks;
        public Set<Integer> failedChunks;
        public List<ManagedBuffer> buffers;

        FetchResult() {
        }

        public void releaseBuffers() {
            for (ManagedBuffer buffer : this.buffers) {
                buffer.release();
            }
        }
    }
}

