package org.apache.helix.controller.stages;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.helix.TestHelper;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/controller/stages/TestAsyncBaseStage.class */
public class TestAsyncBaseStage {
    private static AsyncWorkerType DEFAULT_WORKER_TYPE = AsyncWorkerType.ExternalViewComputeWorker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/helix/controller/stages/TestAsyncBaseStage$BlockingAsyncStage.class */
    public class BlockingAsyncStage extends AbstractAsyncBaseStage {
        public boolean _isFinished = false;
        public boolean _isStarted = false;
        private CountDownLatch _countDownLatch = new CountDownLatch(1);

        private BlockingAsyncStage() {
        }

        public void reset() {
            this._isFinished = false;
            this._isStarted = false;
            this._countDownLatch = new CountDownLatch(1);
        }

        public void proceed() {
            this._countDownLatch.countDown();
        }

        public AsyncWorkerType getAsyncWorkerType() {
            return TestAsyncBaseStage.DEFAULT_WORKER_TYPE;
        }

        public void execute(ClusterEvent clusterEvent) throws Exception {
            this._isStarted = true;
            this._countDownLatch.await();
            this._isFinished = true;
        }
    }

    @Test
    public void testAsyncStageCleanup() throws Exception {
        BlockingAsyncStage blockingAsyncStage = new BlockingAsyncStage();
        HashMap hashMap = new HashMap();
        DedupEventProcessor<String, Runnable> dedupEventProcessor = new DedupEventProcessor<String, Runnable>("ClusterName", DEFAULT_WORKER_TYPE.name()) { // from class: org.apache.helix.controller.stages.TestAsyncBaseStage.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void handleEvent(Runnable runnable) {
                runnable.run();
            }
        };
        dedupEventProcessor.start();
        hashMap.put(DEFAULT_WORKER_TYPE, dedupEventProcessor);
        ClusterEvent clusterEvent = new ClusterEvent("ClusterName", ClusterEventType.OnDemandRebalance);
        clusterEvent.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), hashMap);
        blockingAsyncStage.process(clusterEvent);
        Assert.assertTrue(TestHelper.verify(() -> {
            return blockingAsyncStage._isStarted;
        }, 500L));
        Assert.assertFalse(blockingAsyncStage._isFinished);
        blockingAsyncStage.proceed();
        Assert.assertTrue(TestHelper.verify(() -> {
            return blockingAsyncStage._isFinished;
        }, 500L));
        blockingAsyncStage.reset();
        blockingAsyncStage.process(clusterEvent);
        TestHelper.verify(() -> {
            return blockingAsyncStage._isStarted;
        }, 500L);
        Assert.assertFalse(blockingAsyncStage._isFinished);
        dedupEventProcessor.shutdown();
        Assert.assertFalse(TestHelper.verify(() -> {
            return blockingAsyncStage._isFinished;
        }, 1000L));
        Assert.assertFalse(dedupEventProcessor.isAlive());
        blockingAsyncStage.reset();
    }
}
