package org.apache.hugegraph.computer.core.worker;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.value.DoubleValue;
import org.apache.hugegraph.computer.core.master.MasterService;
import org.apache.hugegraph.computer.core.output.LimitedLogOutput;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.config.RpcOptions;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.util.Log;
import org.junit.Test;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/hugegraph/computer/core/worker/WorkerServiceTest.class */
public class WorkerServiceTest extends UnitTestBase {
    private static final Logger LOG = Log.logger(WorkerServiceTest.class);

    @Test
    public void testServiceWith1Worker() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Throwable[] thArr = new Throwable[2];
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_002", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.TRANSPORT_SERVER_PORT, "8086", ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName(), ComputerOptions.OUTPUT_CLASS, LimitedLogOutput.class.getName());
            try {
                try {
                    MockWorkerService mockWorkerService = new MockWorkerService();
                    try {
                        mockWorkerService.init(updateWithRequiredOptions);
                        mockWorkerService.execute();
                        mockWorkerService.close();
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        try {
                            mockWorkerService.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    LOG.error("Failed to start worker", th3);
                    thArr[0] = th3;
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                countDownLatch.countDown();
                throw th4;
            }
        });
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_SERVER_HOST, "localhost", ComputerOptions.JOB_ID, "local_002", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.MASTER_COMPUTATION_CLASS, MockMasterComputation.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            try {
                try {
                    MasterService masterService = new MasterService();
                    try {
                        masterService.init(updateWithRequiredOptions);
                        masterService.execute();
                        masterService.close();
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        try {
                            masterService.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    LOG.error("Failed to start master", th3);
                    thArr[1] = th3;
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                countDownLatch.countDown();
                throw th4;
            }
        });
        countDownLatch.await();
        newFixedThreadPool.shutdownNow();
        Assert.assertFalse(Arrays.asList(thArr).toString(), existError(thArr));
    }

    @Test
    public void testServiceWith2Workers() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        Throwable[] thArr = new Throwable[3];
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_003", ComputerOptions.JOB_WORKERS_COUNT, "2", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.TRANSPORT_SERVER_PORT, "8086", ComputerOptions.WORKER_DATA_DIRS, "[job_8086]", ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation2.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            try {
                try {
                    MockWorkerService mockWorkerService = new MockWorkerService();
                    try {
                        mockWorkerService.init(updateWithRequiredOptions);
                        mockWorkerService.execute();
                        mockWorkerService.close();
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        try {
                            mockWorkerService.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    LOG.error("Failed to start worker", th3);
                    thArr[0] = th3;
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                countDownLatch.countDown();
                throw th4;
            }
        });
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_003", ComputerOptions.JOB_WORKERS_COUNT, "2", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.TRANSPORT_SERVER_PORT, "8087", ComputerOptions.WORKER_DATA_DIRS, "[job_8087]", ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation2.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            try {
                try {
                    MockWorkerService mockWorkerService = new MockWorkerService();
                    try {
                        mockWorkerService.init(updateWithRequiredOptions);
                        mockWorkerService.execute();
                        mockWorkerService.close();
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        try {
                            mockWorkerService.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    LOG.error("Failed to start worker", th3);
                    thArr[1] = th3;
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                countDownLatch.countDown();
                throw th4;
            }
        });
        newFixedThreadPool.submit(() -> {
            Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(RpcOptions.RPC_SERVER_HOST, "localhost", ComputerOptions.JOB_ID, "local_003", ComputerOptions.JOB_WORKERS_COUNT, "2", ComputerOptions.JOB_PARTITIONS_COUNT, "2", ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", ComputerOptions.BSP_LOG_INTERVAL, "10000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.MASTER_COMPUTATION_CLASS, MockMasterComputation2.class.getName(), ComputerOptions.ALGORITHM_RESULT_CLASS, DoubleValue.class.getName(), ComputerOptions.ALGORITHM_MESSAGE_CLASS, DoubleValue.class.getName());
            try {
                try {
                    MasterService masterService = new MasterService();
                    try {
                        masterService.init(updateWithRequiredOptions);
                        masterService.execute();
                        masterService.close();
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        try {
                            masterService.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    LOG.error("Failed to start master", th3);
                    thArr[2] = th3;
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                countDownLatch.countDown();
                throw th4;
            }
        });
        countDownLatch.await();
        newFixedThreadPool.shutdownNow();
        Assert.assertFalse(Arrays.asList(thArr).toString(), existError(thArr));
    }

    @Test
    public void testFailToConnectEtcd() {
        Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(ComputerOptions.BSP_ETCD_ENDPOINTS, "http://invalid-ip:8098", ComputerOptions.JOB_ID, "local_004", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2", ComputerOptions.WORKER_COMPUTATION_CLASS, MockComputation.class.getName());
        MockWorkerService mockWorkerService = new MockWorkerService();
        try {
            Assert.assertThrows(ComputerException.class, () -> {
                mockWorkerService.init(updateWithRequiredOptions);
                mockWorkerService.execute();
            }, th -> {
                Assert.assertContains("Error while getting with key='BSP_MASTER_INIT_DONE'", th.getMessage());
                Assert.assertContains("UNAVAILABLE: unresolved address", th.getCause().getMessage());
            });
            mockWorkerService.close();
        } catch (Throwable th2) {
            try {
                mockWorkerService.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    public void testDataTransportManagerFail() {
    }
}
