package org.apache.spark.network;

import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/spark/network/RpcIntegrationSuite.class */
public class RpcIntegrationSuite {
    static TransportServer server;
    static TransportClientFactory clientFactory;
    static RpcHandler rpcHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/spark/network/RpcIntegrationSuite$RpcResult.class */
    public class RpcResult {
        public Set<String> successMessages;
        public Set<String> errorMessages;

        RpcResult() {
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        TransportConf transportConf = new TransportConf(new SystemPropertyConfigProvider());
        rpcHandler = new RpcHandler() { // from class: org.apache.spark.network.RpcIntegrationSuite.1
            public void receive(TransportClient transportClient, byte[] bArr, RpcResponseCallback rpcResponseCallback) {
                String[] split = new String(bArr, Charsets.UTF_8).split("/");
                if (split[0].equals("hello")) {
                    rpcResponseCallback.onSuccess(("Hello, " + split[1] + "!").getBytes(Charsets.UTF_8));
                } else if (split[0].equals("return error")) {
                    rpcResponseCallback.onFailure(new RuntimeException("Returned: " + split[1]));
                } else if (split[0].equals("throw error")) {
                    throw new RuntimeException("Thrown: " + split[1]);
                }
            }

            public StreamManager getStreamManager() {
                return new OneForOneStreamManager();
            }
        };
        TransportContext transportContext = new TransportContext(transportConf, rpcHandler);
        server = transportContext.createServer();
        clientFactory = transportContext.createClientFactory();
    }

    @AfterClass
    public static void tearDown() {
        server.close();
        clientFactory.close();
    }

    private RpcResult sendRPC(String... strArr) throws Exception {
        TransportClient createClient = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
        final Semaphore semaphore = new Semaphore(0);
        final RpcResult rpcResult = new RpcResult();
        rpcResult.successMessages = Collections.synchronizedSet(new HashSet());
        rpcResult.errorMessages = Collections.synchronizedSet(new HashSet());
        RpcResponseCallback rpcResponseCallback = new RpcResponseCallback() { // from class: org.apache.spark.network.RpcIntegrationSuite.2
            public void onSuccess(byte[] bArr) {
                rpcResult.successMessages.add(new String(bArr, Charsets.UTF_8));
                semaphore.release();
            }

            public void onFailure(Throwable th) {
                rpcResult.errorMessages.add(th.getMessage());
                semaphore.release();
            }
        };
        for (String str : strArr) {
            createClient.sendRpc(str.getBytes(Charsets.UTF_8), rpcResponseCallback);
        }
        if (!semaphore.tryAcquire(strArr.length, 5L, TimeUnit.SECONDS)) {
            Assert.fail("Timeout getting response from the server");
        }
        createClient.close();
        return rpcResult;
    }

    @Test
    public void singleRPC() throws Exception {
        RpcResult sendRPC = sendRPC("hello/Aaron");
        Assert.assertEquals(sendRPC.successMessages, Sets.newHashSet(new String[]{"Hello, Aaron!"}));
        Assert.assertTrue(sendRPC.errorMessages.isEmpty());
    }

    @Test
    public void doubleRPC() throws Exception {
        RpcResult sendRPC = sendRPC("hello/Aaron", "hello/Reynold");
        Assert.assertEquals(sendRPC.successMessages, Sets.newHashSet(new String[]{"Hello, Aaron!", "Hello, Reynold!"}));
        Assert.assertTrue(sendRPC.errorMessages.isEmpty());
    }

    @Test
    public void returnErrorRPC() throws Exception {
        RpcResult sendRPC = sendRPC("return error/OK");
        Assert.assertTrue(sendRPC.successMessages.isEmpty());
        assertErrorsContain(sendRPC.errorMessages, Sets.newHashSet(new String[]{"Returned: OK"}));
    }

    @Test
    public void throwErrorRPC() throws Exception {
        RpcResult sendRPC = sendRPC("throw error/uh-oh");
        Assert.assertTrue(sendRPC.successMessages.isEmpty());
        assertErrorsContain(sendRPC.errorMessages, Sets.newHashSet(new String[]{"Thrown: uh-oh"}));
    }

    @Test
    public void doubleTrouble() throws Exception {
        RpcResult sendRPC = sendRPC("return error/OK", "throw error/uh-oh");
        Assert.assertTrue(sendRPC.successMessages.isEmpty());
        assertErrorsContain(sendRPC.errorMessages, Sets.newHashSet(new String[]{"Returned: OK", "Thrown: uh-oh"}));
    }

    @Test
    public void sendSuccessAndFailure() throws Exception {
        RpcResult sendRPC = sendRPC("hello/Bob", "throw error/the", "hello/Builder", "return error/!");
        Assert.assertEquals(sendRPC.successMessages, Sets.newHashSet(new String[]{"Hello, Bob!", "Hello, Builder!"}));
        assertErrorsContain(sendRPC.errorMessages, Sets.newHashSet(new String[]{"Thrown: the", "Returned: !"}));
    }

    private void assertErrorsContain(Set<String> set, Set<String> set2) {
        Assert.assertEquals(set2.size(), set.size());
        HashSet newHashSet = Sets.newHashSet(set);
        for (String str : set2) {
            Iterator it = newHashSet.iterator();
            boolean z = false;
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (((String) it.next()).contains(str)) {
                    it.remove();
                    z = true;
                    break;
                }
            }
            Assert.assertTrue("Could not find error containing " + str + "; errors: " + set, z);
        }
        Assert.assertTrue(newHashSet.isEmpty());
    }
}
