/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.procedure;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.confignode.procedure.NoopProcedureStore;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.TestProcedureBase;
import org.apache.iotdb.confignode.procedure.entity.IncProcedure;
import org.apache.iotdb.confignode.procedure.entity.NoopProcedure;
import org.apache.iotdb.confignode.procedure.entity.StuckProcedure;
import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
import org.junit.Assert;
import org.junit.Test;

public class TestProcedureExecutor
extends TestProcedureBase {
    @Override
    protected void initExecutor() {
        this.env = new TestProcEnv();
        this.procStore = new NoopProcedureStore();
        this.procExecutor = new ProcedureExecutor((Object)this.env, this.procStore);
        this.env.setScheduler(this.procExecutor.getScheduler());
        this.procExecutor.init(2);
    }

    @Test
    public void testSubmitProcedure() {
        IncProcedure incProcedure = new IncProcedure();
        long procId = this.procExecutor.submitProcedure((Procedure)incProcedure);
        ProcedureTestUtil.waitForProcedure(this.procExecutor, procId);
        TestProcEnv env = this.getEnv();
        AtomicInteger acc = env.getAcc();
        Assert.assertEquals((long)acc.get(), (long)1L);
    }

    @Test
    public void testWorkerThreadStuck() throws InterruptedException {
        Semaphore latch1 = new Semaphore(2);
        latch1.acquire(2);
        StuckProcedure busyProc1 = new StuckProcedure(latch1);
        Semaphore latch2 = new Semaphore(2);
        latch2.acquire(2);
        StuckProcedure busyProc2 = new StuckProcedure(latch2);
        long busyProcId1 = this.procExecutor.submitProcedure((Procedure)busyProc1);
        long busyProcId2 = this.procExecutor.submitProcedure((Procedure)busyProc2);
        long otherProcId = this.procExecutor.submitProcedure((Procedure)new NoopProcedure());
        int threads1 = this.waitThreadCount(3);
        LOG.info("new threads got created: " + (threads1 - 2));
        Assert.assertEquals((long)3L, (long)threads1);
        ProcedureTestUtil.waitForProcedure(this.procExecutor, otherProcId);
        Assert.assertEquals((Object)true, (Object)this.procExecutor.isFinished(otherProcId));
        Assert.assertEquals((Object)true, (Object)this.procExecutor.isRunning());
        Assert.assertEquals((Object)false, (Object)this.procExecutor.isFinished(busyProcId1));
        Assert.assertEquals((Object)false, (Object)this.procExecutor.isFinished(busyProcId2));
        latch1.release();
        latch2.release();
        LOG.info("set keep alive and wait threads being removed");
        int threads2 = this.waitThreadCount(2);
        LOG.info("threads got removed: " + (threads1 - threads2));
        Assert.assertEquals((long)2L, (long)threads2);
        latch1.release();
        latch2.release();
        ProcedureTestUtil.waitForProcedure(this.procExecutor, busyProcId1);
        ProcedureTestUtil.waitForProcedure(this.procExecutor, busyProcId2);
    }

    private int waitThreadCount(int expectedThreads) {
        long startTime = System.currentTimeMillis();
        while (this.procExecutor.isRunning() && TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime) <= 180L && this.procExecutor.getWorkerThreadCount() != expectedThreads) {
            ProcedureTestUtil.sleepWithoutInterrupt(250L);
        }
        return this.procExecutor.getWorkerThreadCount();
    }
}

