/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network;

import com.google.common.collect.Sets;
import com.google.common.io.Files;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.network.StreamSuite;
import org.apache.spark.network.StreamTestHelper;
import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.MapConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class RpcIntegrationSuite {
    static TransportConf conf;
    static TransportContext context;
    static TransportServer server;
    static TransportClientFactory clientFactory;
    static RpcHandler rpcHandler;
    static List<String> oneWayMsgs;
    static StreamTestHelper testData;
    static ConcurrentHashMap<String, VerifyingStreamCallback> streamCallbacks;

    @BeforeClass
    public static void setUp() throws Exception {
        conf = new TransportConf("shuffle", (ConfigProvider)MapConfigProvider.EMPTY);
        testData = new StreamTestHelper();
        rpcHandler = new RpcHandler(){

            public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
                String msg = JavaUtils.bytesToString((ByteBuffer)message);
                String[] parts = msg.split("/");
                switch (parts[0]) {
                    case "hello": {
                        callback.onSuccess(JavaUtils.stringToBytes((String)("Hello, " + parts[1] + "!")));
                        break;
                    }
                    case "return error": {
                        callback.onFailure((Throwable)new RuntimeException("Returned: " + parts[1]));
                        break;
                    }
                    case "throw error": {
                        throw new RuntimeException("Thrown: " + parts[1]);
                    }
                }
            }

            public StreamCallbackWithID receiveStream(TransportClient client, ByteBuffer messageHeader, RpcResponseCallback callback) {
                return RpcIntegrationSuite.receiveStreamHelper(JavaUtils.bytesToString((ByteBuffer)messageHeader));
            }

            public void receive(TransportClient client, ByteBuffer message) {
                oneWayMsgs.add(JavaUtils.bytesToString((ByteBuffer)message));
            }

            public StreamManager getStreamManager() {
                return new OneForOneStreamManager();
            }
        };
        context = new TransportContext(conf, rpcHandler);
        server = context.createServer();
        clientFactory = context.createClientFactory();
        oneWayMsgs = new ArrayList<String>();
    }

    private static StreamCallbackWithID receiveStreamHelper(final String msg) {
        try {
            if (msg.startsWith("fail/")) {
                String[] parts = msg.split("/");
                switch (parts[1]) {
                    case "exception-ondata": {
                        return new StreamCallbackWithID(){

                            public void onData(String streamId, ByteBuffer buf) throws IOException {
                                throw new IOException("failed to read stream data!");
                            }

                            public void onComplete(String streamId) throws IOException {
                            }

                            public void onFailure(String streamId, Throwable cause) throws IOException {
                            }

                            public String getID() {
                                return msg;
                            }
                        };
                    }
                    case "exception-oncomplete": {
                        return new StreamCallbackWithID(){

                            public void onData(String streamId, ByteBuffer buf) throws IOException {
                            }

                            public void onComplete(String streamId) throws IOException {
                                throw new IOException("exception in onComplete");
                            }

                            public void onFailure(String streamId, Throwable cause) throws IOException {
                            }

                            public String getID() {
                                return msg;
                            }
                        };
                    }
                    case "null": {
                        return null;
                    }
                }
                throw new IllegalArgumentException("unexpected msg: " + msg);
            }
            VerifyingStreamCallback streamCallback = new VerifyingStreamCallback(msg);
            streamCallbacks.put(msg, streamCallback);
            return streamCallback;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @AfterClass
    public static void tearDown() {
        server.close();
        clientFactory.close();
        context.close();
        testData.cleanup();
    }

    private RpcResult sendRPC(String ... commands) throws Exception {
        TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        final Semaphore sem = new Semaphore(0);
        final RpcResult res = new RpcResult();
        res.successMessages = Collections.synchronizedSet(new HashSet());
        res.errorMessages = Collections.synchronizedSet(new HashSet());
        RpcResponseCallback callback = new RpcResponseCallback(){

            public void onSuccess(ByteBuffer message) {
                String response = JavaUtils.bytesToString((ByteBuffer)message);
                res.successMessages.add(response);
                sem.release();
            }

            public void onFailure(Throwable e) {
                res.errorMessages.add(e.getMessage());
                sem.release();
            }
        };
        for (String command : commands) {
            client.sendRpc(JavaUtils.stringToBytes((String)command), callback);
        }
        if (!sem.tryAcquire(commands.length, 5L, TimeUnit.SECONDS)) {
            Assert.fail((String)"Timeout getting response from the server");
        }
        client.close();
        return res;
    }

    private RpcResult sendRpcWithStream(String ... streams) throws Exception {
        TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        Semaphore sem = new Semaphore(0);
        RpcResult res = new RpcResult();
        res.successMessages = Collections.synchronizedSet(new HashSet());
        res.errorMessages = Collections.synchronizedSet(new HashSet());
        for (String stream : streams) {
            int idx = stream.lastIndexOf(47);
            NioManagedBuffer meta = new NioManagedBuffer(JavaUtils.stringToBytes((String)stream));
            String streamName = idx == -1 ? stream : stream.substring(idx + 1);
            ManagedBuffer data = testData.openStream(conf, streamName);
            client.uploadStream((ManagedBuffer)meta, data, (RpcResponseCallback)new RpcStreamCallback(stream, res, sem));
        }
        if (!sem.tryAcquire(streams.length, 5L, TimeUnit.SECONDS)) {
            Assert.fail((String)"Timeout getting response from the server");
        }
        streamCallbacks.values().forEach(streamCallback -> {
            try {
                streamCallback.verify();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        client.close();
        return res;
    }

    @Test
    public void singleRPC() throws Exception {
        RpcResult res = this.sendRPC("hello/Aaron");
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"Hello, Aaron!"}), res.successMessages);
        Assert.assertTrue((boolean)res.errorMessages.isEmpty());
    }

    @Test
    public void doubleRPC() throws Exception {
        RpcResult res = this.sendRPC("hello/Aaron", "hello/Reynold");
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"Hello, Aaron!", "Hello, Reynold!"}), res.successMessages);
        Assert.assertTrue((boolean)res.errorMessages.isEmpty());
    }

    @Test
    public void returnErrorRPC() throws Exception {
        RpcResult res = this.sendRPC("return error/OK");
        Assert.assertTrue((boolean)res.successMessages.isEmpty());
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Returned: OK"}));
    }

    @Test
    public void throwErrorRPC() throws Exception {
        RpcResult res = this.sendRPC("throw error/uh-oh");
        Assert.assertTrue((boolean)res.successMessages.isEmpty());
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Thrown: uh-oh"}));
    }

    @Test
    public void doubleTrouble() throws Exception {
        RpcResult res = this.sendRPC("return error/OK", "throw error/uh-oh");
        Assert.assertTrue((boolean)res.successMessages.isEmpty());
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Returned: OK", "Thrown: uh-oh"}));
    }

    @Test
    public void sendSuccessAndFailure() throws Exception {
        RpcResult res = this.sendRPC("hello/Bob", "throw error/the", "hello/Builder", "return error/!");
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"Hello, Bob!", "Hello, Builder!"}), res.successMessages);
        this.assertErrorsContain(res.errorMessages, Sets.newHashSet((Object[])new String[]{"Thrown: the", "Returned: !"}));
    }

    @Test
    public void sendOneWayMessage() throws Exception {
        String message = "no reply";
        try (TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());){
            client.send(JavaUtils.stringToBytes((String)"no reply"));
            Assert.assertEquals((long)0L, (long)client.getHandler().numOutstandingRequests());
            long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS);
            while (System.nanoTime() < deadline && oneWayMsgs.size() == 0) {
                TimeUnit.MILLISECONDS.sleep(10L);
            }
            Assert.assertEquals((long)1L, (long)oneWayMsgs.size());
            Assert.assertEquals((Object)"no reply", (Object)oneWayMsgs.get(0));
        }
    }

    @Test
    public void sendRpcWithStreamOneAtATime() throws Exception {
        for (String stream : StreamTestHelper.STREAMS) {
            RpcResult res = this.sendRpcWithStream(stream);
            Assert.assertTrue((String)("there were error messages!" + res.errorMessages), (boolean)res.errorMessages.isEmpty());
            Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{stream}), res.successMessages);
        }
    }

    @Test
    public void sendRpcWithStreamConcurrently() throws Exception {
        String[] streams = new String[10];
        for (int i = 0; i < 10; ++i) {
            streams[i] = StreamTestHelper.STREAMS[i % StreamTestHelper.STREAMS.length];
        }
        RpcResult res = this.sendRpcWithStream(streams);
        Assert.assertEquals((Object)Sets.newHashSet((Object[])StreamTestHelper.STREAMS), res.successMessages);
        Assert.assertTrue((boolean)res.errorMessages.isEmpty());
    }

    @Test
    public void sendRpcWithStreamFailures() throws Exception {
        RpcResult exceptionInCallbackResult = this.sendRpcWithStream("fail/exception-ondata/smallBuffer", "smallBuffer");
        this.assertErrorAndClosed(exceptionInCallbackResult, "Destination failed while reading stream");
        RpcResult nullStreamHandler = this.sendRpcWithStream("fail/null/smallBuffer", "smallBuffer");
        this.assertErrorAndClosed(exceptionInCallbackResult, "Destination failed while reading stream");
        RpcResult exceptionInOnComplete = this.sendRpcWithStream("fail/exception-oncomplete/smallBuffer", "smallBuffer");
        this.assertErrorsContain(exceptionInOnComplete.errorMessages, Sets.newHashSet((Object[])new String[]{"Failure post-processing"}));
        Assert.assertEquals((Object)Sets.newHashSet((Object[])new String[]{"smallBuffer"}), exceptionInOnComplete.successMessages);
    }

    private void assertErrorsContain(Set<String> errors, Set<String> contains) {
        Assert.assertEquals((String)("Expected " + contains.size() + " errors, got " + errors.size() + "errors: " + errors), (long)contains.size(), (long)errors.size());
        Pair<Set<String>, Set<String>> r = this.checkErrorsContain(errors, contains);
        Assert.assertTrue((String)("Could not find error containing " + r.getRight() + "; errors: " + errors), (boolean)((Set)r.getRight()).isEmpty());
        Assert.assertTrue((boolean)((Set)r.getLeft()).isEmpty());
    }

    private void assertErrorAndClosed(RpcResult result, String expectedError) {
        Assert.assertTrue((String)("unexpected success: " + result.successMessages), (boolean)result.successMessages.isEmpty());
        Set<String> errors = result.errorMessages;
        Assert.assertEquals((String)("Expected 2 errors, got " + errors.size() + "errors: " + errors), (long)2L, (long)errors.size());
        HashSet possibleClosedErrors = Sets.newHashSet((Object[])new String[]{"closed", "Connection reset", "java.nio.channels.ClosedChannelException", "io.netty.channel.StacklessClosedChannelException", "java.io.IOException: Broken pipe"});
        HashSet containsAndClosed = Sets.newHashSet((Object[])new String[]{expectedError});
        containsAndClosed.addAll(possibleClosedErrors);
        Pair<Set<String>, Set<String>> r = this.checkErrorsContain(errors, containsAndClosed);
        Assert.assertTrue((String)("Got a non-empty set " + r.getLeft()), (boolean)((Set)r.getLeft()).isEmpty());
        Set errorsNotFound = (Set)r.getRight();
        Assert.assertEquals((String)("The size of " + errorsNotFound + " was not " + (possibleClosedErrors.size() - 1)), (long)(possibleClosedErrors.size() - 1), (long)errorsNotFound.size());
        for (String err : errorsNotFound) {
            Assert.assertTrue((String)("Found a wrong error " + err), (boolean)containsAndClosed.contains(err));
        }
    }

    private Pair<Set<String>, Set<String>> checkErrorsContain(Set<String> errors, Set<String> contains) {
        HashSet remainingErrors = Sets.newHashSet(errors);
        HashSet notFound = Sets.newHashSet();
        for (String contain : contains) {
            Iterator it = remainingErrors.iterator();
            boolean foundMatch = false;
            while (it.hasNext()) {
                if (!((String)it.next()).contains(contain)) continue;
                it.remove();
                foundMatch = true;
                break;
            }
            if (foundMatch) continue;
            notFound.add(contain);
        }
        return new ImmutablePair((Object)remainingErrors, (Object)notFound);
    }

    static {
        streamCallbacks = new ConcurrentHashMap();
    }

    private static class VerifyingStreamCallback
    implements StreamCallbackWithID {
        final String streamId;
        final StreamSuite.TestCallback helper;
        final OutputStream out;
        final File outFile;

        VerifyingStreamCallback(String streamId) throws IOException {
            if (streamId.equals("file")) {
                this.outFile = File.createTempFile("data", ".tmp", RpcIntegrationSuite.testData.tempDir);
                this.out = new FileOutputStream(this.outFile);
            } else {
                this.out = new ByteArrayOutputStream();
                this.outFile = null;
            }
            this.streamId = streamId;
            this.helper = new StreamSuite.TestCallback(this.out);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void verify() throws IOException {
            if (this.streamId.equals("file")) {
                Assert.assertTrue((String)"File stream did not match.", (boolean)Files.equal((File)RpcIntegrationSuite.testData.testFile, (File)this.outFile));
            } else {
                ByteBuffer base;
                ByteBuffer srcBuffer;
                byte[] result = ((ByteArrayOutputStream)this.out).toByteArray();
                ByteBuffer byteBuffer = srcBuffer = testData.srcBuffer(this.streamId);
                synchronized (byteBuffer) {
                    base = srcBuffer.duplicate();
                }
                byte[] expected = new byte[base.remaining()];
                base.get(expected);
                Assert.assertEquals((long)expected.length, (long)result.length);
                Assert.assertArrayEquals((String)"buffers don't match", (byte[])expected, (byte[])result);
            }
        }

        public void onData(String streamId, ByteBuffer buf) throws IOException {
            this.helper.onData(streamId, buf);
        }

        public void onComplete(String streamId) throws IOException {
            this.helper.onComplete(streamId);
        }

        public void onFailure(String streamId, Throwable cause) throws IOException {
            this.helper.onFailure(streamId, cause);
        }

        public String getID() {
            return this.streamId;
        }
    }

    private static class RpcStreamCallback
    implements RpcResponseCallback {
        final String streamId;
        final RpcResult res;
        final Semaphore sem;

        RpcStreamCallback(String streamId, RpcResult res, Semaphore sem) {
            this.streamId = streamId;
            this.res = res;
            this.sem = sem;
        }

        public void onSuccess(ByteBuffer message) {
            this.res.successMessages.add(this.streamId);
            this.sem.release();
        }

        public void onFailure(Throwable e) {
            this.res.errorMessages.add(e.getMessage());
            this.sem.release();
        }
    }

    static class RpcResult {
        public Set<String> successMessages;
        public Set<String> errorMessages;

        RpcResult() {
        }
    }
}

