package org.apache.hugegraph.computer.core.bsp;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hugegraph.computer.core.common.ContainerInfo;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.SuperstepStat;
import org.apache.hugegraph.computer.core.graph.partition.PartitionStat;
import org.apache.hugegraph.computer.core.worker.WorkerStat;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.testutil.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hugegraph/computer/core/bsp/EtcdBspTest.class */
public class EtcdBspTest extends UnitTestBase {
    private Bsp4Master bsp4Master;
    private Bsp4Worker bsp4Worker;
    private ContainerInfo masterInfo;
    private ContainerInfo workerInfo;
    private ExecutorService executorService = Executors.newFixedThreadPool(2);
    private int maxSuperStep;

    @Before
    public void setup() {
        Config updateWithRequiredOptions = UnitTestBase.updateWithRequiredOptions(ComputerOptions.JOB_ID, "local_001", ComputerOptions.JOB_WORKERS_COUNT, "1", ComputerOptions.BSP_LOG_INTERVAL, "30000", ComputerOptions.BSP_MAX_SUPER_STEP, "2");
        this.bsp4Master = new Bsp4Master(updateWithRequiredOptions);
        this.bsp4Master.clean();
        this.masterInfo = new ContainerInfo(-1, "localhost", 8001, 8002);
        this.workerInfo = new ContainerInfo(0, "localhost", 8003, 8004);
        this.bsp4Worker = new Bsp4Worker(updateWithRequiredOptions, this.workerInfo);
        this.maxSuperStep = ((Integer) updateWithRequiredOptions.get(ComputerOptions.BSP_MAX_SUPER_STEP)).intValue();
    }

    @After
    public void teardown() {
        this.bsp4Worker.clean();
        this.bsp4Worker.close();
        this.bsp4Master.clean();
        this.bsp4Master.close();
    }

    @Test
    public void testInit() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.executorService.submit(() -> {
            this.bsp4Master.masterInitDone(this.masterInfo);
            Assert.assertEquals(1L, this.bsp4Master.waitWorkersInitDone().size());
            countDownLatch.countDown();
        });
        this.executorService.submit(() -> {
            this.bsp4Worker.workerInitDone();
            Assert.assertEquals(this.masterInfo, this.bsp4Worker.waitMasterInitDone());
            List waitMasterAllInitDone = this.bsp4Worker.waitMasterAllInitDone();
            Assert.assertEquals(1L, waitMasterAllInitDone.size());
            Assert.assertEquals(this.workerInfo, waitMasterAllInitDone.get(0));
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    @Test
    public void testInput() throws InterruptedException {
        WorkerStat workerStat = new WorkerStat();
        workerStat.add(new PartitionStat(0, 100L, 200L, 0L));
        workerStat.add(new PartitionStat(1, 200L, 300L, 0L));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.executorService.submit(() -> {
            this.bsp4Master.masterResumeDone(-1);
            this.bsp4Master.waitWorkersInputDone();
            this.bsp4Master.masterInputDone();
            List waitWorkersStepDone = this.bsp4Master.waitWorkersStepDone(-1);
            Assert.assertEquals(1L, waitWorkersStepDone.size());
            Assert.assertEquals(workerStat, waitWorkersStepDone.get(0));
            countDownLatch.countDown();
        });
        this.executorService.submit(() -> {
            Assert.assertEquals(-1L, this.bsp4Worker.waitMasterResumeDone());
            this.bsp4Worker.workerInputDone();
            this.bsp4Worker.waitMasterInputDone();
            this.bsp4Worker.workerStepDone(-1, workerStat);
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }

    @Test
    public void testIterate() throws InterruptedException {
        WorkerStat workerStat = new WorkerStat();
        workerStat.add(new PartitionStat(0, 100L, 200L, 0L));
        workerStat.add(new PartitionStat(1, 200L, 300L, 0L));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.executorService.submit(() -> {
            for (int i = 0; i < this.maxSuperStep; i++) {
                this.bsp4Master.waitWorkersStepPrepareDone(i);
                this.bsp4Master.masterStepPrepareDone(i);
                this.bsp4Master.waitWorkersStepComputeDone(i);
                this.bsp4Master.masterStepComputeDone(i);
                List waitWorkersStepDone = this.bsp4Master.waitWorkersStepDone(i);
                SuperstepStat superstepStat = new SuperstepStat();
                Iterator it = waitWorkersStepDone.iterator();
                while (it.hasNext()) {
                    superstepStat.increase((WorkerStat) it.next());
                }
                if (i == this.maxSuperStep - 1) {
                    superstepStat.inactivate();
                }
                this.bsp4Master.masterStepDone(i, superstepStat);
            }
            countDownLatch.countDown();
        });
        this.executorService.submit(() -> {
            int i = -1;
            SuperstepStat superstepStat = null;
            while (true) {
                SuperstepStat superstepStat2 = superstepStat;
                if (superstepStat2 != null && !superstepStat2.active()) {
                    countDownLatch.countDown();
                    return;
                }
                i++;
                this.bsp4Worker.workerStepPrepareDone(i);
                this.bsp4Worker.waitMasterStepPrepareDone(i);
                this.bsp4Worker.workerStepComputeDone(i);
                this.bsp4Worker.waitMasterStepComputeDone(i);
                PartitionStat partitionStat = new PartitionStat(0, 100L, 200L, 50L);
                PartitionStat partitionStat2 = new PartitionStat(1, 200L, 300L, 80L);
                WorkerStat workerStat2 = new WorkerStat();
                workerStat2.add(partitionStat);
                workerStat2.add(partitionStat2);
                UnitTestBase.sleep(100L);
                this.bsp4Worker.workerStepDone(i, workerStat2);
                superstepStat = this.bsp4Worker.waitMasterStepDone(i);
            }
        });
        countDownLatch.await();
    }

    @Test
    public void testOutput() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.executorService.submit(() -> {
            this.bsp4Master.waitWorkersOutputDone();
            this.bsp4Master.clean();
            countDownLatch.countDown();
        });
        this.executorService.submit(() -> {
            this.bsp4Worker.workerOutputDone();
            countDownLatch.countDown();
        });
        countDownLatch.await();
    }
}
