package org.apache.storm.daemon.drpc;

import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.storm.drpc.DRPCInvocationsClient;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.security.auth.SimpleTransportPlugin;
import org.apache.storm.utils.DRPCClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/drpc/DRPCServerTest.class */
public class DRPCServerTest {
    private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class);
    private static final ExecutorService exec = Executors.newCachedThreadPool();

    @AfterAll
    public static void close() {
        exec.shutdownNow();
    }

    private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient dRPCInvocationsClient, String str) throws Exception {
        DRPCRequest dRPCRequest = null;
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            dRPCRequest = dRPCInvocationsClient.getClient().fetchRequest(str);
            if (dRPCRequest != null && dRPCRequest.get_request_id() != null && !dRPCRequest.get_request_id().isEmpty()) {
                return dRPCRequest;
            }
            Thread.sleep(1L);
        }
        Assertions.fail("Test timed out waiting for a request on " + str);
        return dRPCRequest;
    }

    private Map<String, Object> getConf(int i, int i2, Integer num) {
        HashMap hashMap = new HashMap();
        hashMap.put("drpc.port", Integer.valueOf(i));
        hashMap.put("drpc.invocations.port", Integer.valueOf(i2));
        hashMap.put("storm.thrift.transport", SimpleTransportPlugin.class.getName());
        hashMap.put("drpc.worker.threads", 5);
        hashMap.put("drpc.invocations.threads", 5);
        hashMap.put("drpc.max_buffer_size", 1048576);
        hashMap.put("storm.nimbus.retry.times", 2);
        hashMap.put("storm.nimbus.retry.interval.millis", 10);
        hashMap.put("storm.nimbus.retry.intervalceiling.millis", 100);
        if (num != null) {
            hashMap.put("drpc.http.port", num);
        }
        return hashMap;
    }

    @Test
    public void testGoodThrift() throws Exception {
        Map<String, Object> conf = getConf(0, 0, null);
        DRPCServer dRPCServer = new DRPCServer(conf, new StormMetricsRegistry());
        try {
            dRPCServer.start();
            DRPCClient dRPCClient = new DRPCClient(conf, "localhost", dRPCServer.getDrpcPort());
            try {
                DRPCInvocationsClient dRPCInvocationsClient = new DRPCInvocationsClient(conf, "localhost", dRPCServer.getDrpcInvokePort());
                try {
                    Future submit = exec.submit(() -> {
                        return dRPCClient.getClient().execute("testing", "test");
                    });
                    DRPCRequest nextAvailableRequest = getNextAvailableRequest(dRPCInvocationsClient, "testing");
                    Assertions.assertNotNull(nextAvailableRequest);
                    Assertions.assertEquals("test", nextAvailableRequest.get_func_args());
                    Assertions.assertNotNull(nextAvailableRequest.get_request_id());
                    dRPCInvocationsClient.result(nextAvailableRequest.get_request_id(), "tested");
                    Assertions.assertEquals("tested", (String) submit.get(1000L, TimeUnit.MILLISECONDS));
                    dRPCInvocationsClient.close();
                    dRPCClient.close();
                    dRPCServer.close();
                } catch (Throwable th) {
                    try {
                        dRPCInvocationsClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            try {
                dRPCServer.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    @Test
    public void testFailedThrift() throws Exception {
        Map<String, Object> conf = getConf(0, 0, null);
        DRPCServer dRPCServer = new DRPCServer(conf, new StormMetricsRegistry());
        try {
            dRPCServer.start();
            DRPCClient dRPCClient = new DRPCClient(conf, "localhost", dRPCServer.getDrpcPort());
            try {
                DRPCInvocationsClient dRPCInvocationsClient = new DRPCInvocationsClient(conf, "localhost", dRPCServer.getDrpcInvokePort());
                try {
                    Future submit = exec.submit(() -> {
                        return dRPCClient.getClient().execute("testing", "test");
                    });
                    DRPCRequest nextAvailableRequest = getNextAvailableRequest(dRPCInvocationsClient, "testing");
                    Assertions.assertNotNull(nextAvailableRequest);
                    Assertions.assertEquals("test", nextAvailableRequest.get_func_args());
                    Assertions.assertNotNull(nextAvailableRequest.get_request_id());
                    dRPCInvocationsClient.failRequest(nextAvailableRequest.get_request_id());
                    try {
                        submit.get(1000L, TimeUnit.MILLISECONDS);
                        Assertions.fail("exec did not throw an exception");
                    } catch (ExecutionException e) {
                        DRPCExecutionException cause = e.getCause();
                        Assertions.assertEquals(cause.getClass(), DRPCExecutionException.class);
                        Assertions.assertEquals("Request failed", cause.get_msg());
                    }
                    dRPCInvocationsClient.close();
                    dRPCClient.close();
                    dRPCServer.close();
                } catch (Throwable th) {
                    try {
                        dRPCInvocationsClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            try {
                dRPCServer.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String doGet(int i, String str, String str2) {
        try {
            byte[] bArr = new byte[1024];
            return new String(bArr, 0, new URL("http://localhost:" + i + "/drpc/" + str + "/" + str2).openStream().read(bArr));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testGoodHttpGet() throws Exception {
        LOG.info("STARTING HTTP GET TEST...");
        Map<String, Object> conf = getConf(0, 0, 0);
        DRPCServer dRPCServer = new DRPCServer(conf, new StormMetricsRegistry());
        try {
            dRPCServer.start();
            Thread.sleep(2000L);
            DRPCInvocationsClient dRPCInvocationsClient = new DRPCInvocationsClient(conf, "localhost", dRPCServer.getDrpcInvokePort());
            try {
                Future submit = exec.submit(() -> {
                    return doGet(dRPCServer.getHttpServerPort(), "testing", "test");
                });
                DRPCRequest nextAvailableRequest = getNextAvailableRequest(dRPCInvocationsClient, "testing");
                Assertions.assertNotNull(nextAvailableRequest);
                Assertions.assertEquals("test", nextAvailableRequest.get_func_args());
                Assertions.assertNotNull(nextAvailableRequest.get_request_id());
                dRPCInvocationsClient.result(nextAvailableRequest.get_request_id(), "tested");
                Assertions.assertEquals("tested", (String) submit.get(1000L, TimeUnit.MILLISECONDS));
                dRPCInvocationsClient.close();
                dRPCServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                dRPCServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testFailedHttpGet() throws Exception {
        LOG.info("STARTING HTTP GET (FAIL) TEST...");
        Map<String, Object> conf = getConf(0, 0, 0);
        DRPCServer dRPCServer = new DRPCServer(conf, new StormMetricsRegistry());
        try {
            dRPCServer.start();
            Thread.sleep(2000L);
            DRPCInvocationsClient dRPCInvocationsClient = new DRPCInvocationsClient(conf, "localhost", dRPCServer.getDrpcInvokePort());
            try {
                Future submit = exec.submit(() -> {
                    return doGet(dRPCServer.getHttpServerPort(), "testing", "test");
                });
                DRPCRequest nextAvailableRequest = getNextAvailableRequest(dRPCInvocationsClient, "testing");
                Assertions.assertNotNull(nextAvailableRequest);
                Assertions.assertEquals("test", nextAvailableRequest.get_func_args());
                Assertions.assertNotNull(nextAvailableRequest.get_request_id());
                dRPCInvocationsClient.getClient().failRequest(nextAvailableRequest.get_request_id());
                try {
                    submit.get(1000L, TimeUnit.MILLISECONDS);
                    Assertions.fail("exec did not throw an exception");
                } catch (ExecutionException e) {
                    LOG.warn("Got Expected Exception", e);
                }
                dRPCInvocationsClient.close();
                dRPCServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                dRPCServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
