package org.apache.helix.messaging.handling;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.helix.HelixManager;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.mock.participant.DummyProcess;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/messaging/handling/TestResourceThreadpoolSize.class */
public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
    public static final String TEST_FACTORY = "TestFactory";
    public static final String ONLINE_OFFLINE = "OnlineOffline";
    public static final String OFFLINE_TO_SLAVE = "OFFLINE.SLAVE";
    public static final String SLAVE_TO_MASTER = "SLAVE.MASTER";

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestResourceThreadpoolSize$TestMasterSlaveStateModelFactory.class */
    public static class TestMasterSlaveStateModelFactory extends DummyProcess.DummyMasterSlaveStateModelFactory {
        int _startThreadPoolSize;
        Map<String, ExecutorService> _threadPoolExecutorMap;

        public TestMasterSlaveStateModelFactory(int i) {
            super(0);
            this._startThreadPoolSize = i;
            this._threadPoolExecutorMap = new HashMap();
            if (this._startThreadPoolSize > 0) {
                this._threadPoolExecutorMap.put(TestResourceThreadpoolSize.OFFLINE_TO_SLAVE, Executors.newFixedThreadPool(this._startThreadPoolSize));
                this._threadPoolExecutorMap.put(TestResourceThreadpoolSize.SLAVE_TO_MASTER, Executors.newFixedThreadPool(this._startThreadPoolSize + 5));
            }
        }

        public ExecutorService getExecutorService(String str, String str2, String str3) {
            return this._threadPoolExecutorMap.get(str2 + "." + str3);
        }
    }

    /* loaded from: input_file:org/apache/helix/messaging/handling/TestResourceThreadpoolSize$TestOnlineOfflineStateModelFactory.class */
    public static class TestOnlineOfflineStateModelFactory extends DummyProcess.DummyOnlineOfflineStateModelFactory {
        int _threadPoolSize;
        ExecutorService _threadPoolExecutor;

        public TestOnlineOfflineStateModelFactory(int i, int i2) {
            super(0);
            if (i > 0) {
                this._threadPoolExecutor = Executors.newFixedThreadPool(i);
            }
        }

        public ExecutorService getExecutorService(String str) {
            return this._threadPoolExecutor;
        }
    }

    @Test
    public void TestThreadPoolSizeConfig() {
        setResourceThreadPoolSize("NextDB", 12);
        _gSetupTool.addResourceToCluster(this.CLUSTER_NAME, "NextDB", 64, "MasterSlave");
        _gSetupTool.rebalanceStorageCluster(this.CLUSTER_NAME, "NextDB", 3);
        Assert.assertTrue(this._clusterVerifier.verifyByPolling());
        long j = 0;
        for (int i = 0; i < 5; i++) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this._participants[i].getMessagingService().getExecutor()._executorMap.get(Message.MessageType.STATE_TRANSITION + ".NextDB");
            Assert.assertNotNull(threadPoolExecutor);
            Assert.assertEquals(12, threadPoolExecutor.getMaximumPoolSize());
            j += threadPoolExecutor.getCompletedTaskCount();
            Assert.assertTrue(threadPoolExecutor.getCompletedTaskCount() > 0);
        }
        Assert.assertEquals(j, 256L);
    }

    @Test
    public void TestCustomizedResourceThreadPool() {
        for (MockParticipantManager mockParticipantManager : this._participants) {
            mockParticipantManager.getStateMachineEngine().registerStateModelFactory(ONLINE_OFFLINE, new TestOnlineOfflineStateModelFactory(7, 0), TEST_FACTORY);
        }
        _gSetupTool.addResourceToCluster(this.CLUSTER_NAME, "TestDB1", 64, "MasterSlave");
        _gSetupTool.rebalanceStorageCluster(this.CLUSTER_NAME, "TestDB1", 3);
        _gSetupTool.getClusterManagementTool().addResource(this.CLUSTER_NAME, "TestDB2", new FullAutoModeISBuilder("TestDB2").setStateModel(ONLINE_OFFLINE).setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10).setNumReplica(1).build());
        _gSetupTool.rebalanceStorageCluster(this.CLUSTER_NAME, "TestDB2", 1);
        _gSetupTool.getClusterManagementTool().addResource(this.CLUSTER_NAME, "TestDB3", new FullAutoModeISBuilder("TestDB3").setStateModel(ONLINE_OFFLINE).setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10).setNumReplica(1).build());
        setResourceThreadPoolSize("TestDB3", 9);
        _gSetupTool.rebalanceStorageCluster(this.CLUSTER_NAME, "TestDB3", 1);
        Assert.assertTrue(this._clusterVerifier.verifyByPolling());
        for (int i = 0; i < 5; i++) {
            HelixTaskExecutor executor = this._participants[i].getMessagingService().getExecutor();
            Assert.assertNull((ThreadPoolExecutor) executor._executorMap.get(Message.MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "1"));
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor._executorMap.get(Message.MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "2");
            Assert.assertNotNull(threadPoolExecutor);
            Assert.assertEquals(7, threadPoolExecutor.getMaximumPoolSize());
            ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) executor._executorMap.get(Message.MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "3");
            Assert.assertNotNull(threadPoolExecutor2);
            Assert.assertEquals(9, threadPoolExecutor2.getMaximumPoolSize());
        }
    }

    @Test
    public void TestPerStateTransitionTypeThreadPool() throws InterruptedException {
        for (MockParticipantManager mockParticipantManager : this._participants) {
            mockParticipantManager.getStateMachineEngine().registerStateModelFactory("MasterSlave", new TestMasterSlaveStateModelFactory(22), TEST_FACTORY);
        }
        _gSetupTool.getClusterManagementTool().addResource(this.CLUSTER_NAME, "TestDB4", new FullAutoModeISBuilder("TestDB4").setStateModel("MasterSlave").setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10).setNumReplica(1).build());
        _gSetupTool.rebalanceStorageCluster(this.CLUSTER_NAME, "TestDB4", 1);
        Assert.assertTrue(this._clusterVerifier.verifyByPolling());
        for (int i = 0; i < 5; i++) {
            HelixTaskExecutor executor = this._participants[i].getMessagingService().getExecutor();
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor._executorMap.get(Message.MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "4." + OFFLINE_TO_SLAVE);
            Assert.assertNotNull(threadPoolExecutor);
            Assert.assertEquals(22, threadPoolExecutor.getMaximumPoolSize());
            ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) executor._executorMap.get(Message.MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "4." + SLAVE_TO_MASTER);
            Assert.assertNotNull(threadPoolExecutor2);
            Assert.assertEquals(22 + 5, threadPoolExecutor2.getMaximumPoolSize());
        }
    }

    @Test
    public void testBatchMessageThreadPoolSize() throws InterruptedException {
        this._participants[0].getStateMachineEngine().registerStateModelFactory(ONLINE_OFFLINE, new TestOnlineOfflineStateModelFactory(5, 2000), TEST_FACTORY);
        for (int i = 1; i < this._participants.length; i++) {
            this._participants[i].syncStop();
        }
        Assert.assertTrue(this._clusterVerifier.verifyByPolling());
        for (int i2 = 0; i2 < 10; i2++) {
            String str = "TestDBABatch" + i2;
            IdealState build = new FullAutoModeISBuilder(str).setStateModel(ONLINE_OFFLINE).setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10).setNumReplica(1).build();
            build.setBatchMessageMode(true);
            _gSetupTool.getClusterManagementTool().addResource(this.CLUSTER_NAME, str, build);
            _gSetupTool.rebalanceStorageCluster(this.CLUSTER_NAME, str, 1);
        }
        Assert.assertTrue(this._clusterVerifier.verifyByPolling());
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) this._participants[0].getMessagingService().getExecutor()._batchMessageExecutorService;
        Assert.assertNotNull(threadPoolExecutor);
        Assert.assertTrue(threadPoolExecutor.getPoolSize() >= 10);
        Assert.assertTrue(new BestPossibleExternalViewVerifier.Builder(this.CLUSTER_NAME).setZkAddr(ZkTestBase.ZK_ADDR).build().verifyByPolling());
    }

    private void setResourceThreadPoolSize(String str, int i) {
        HelixManager helixManager = this._participants[0];
        helixManager.getConfigAccessor().set(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE).forCluster(helixManager.getClusterName()).forResource(str).build(), "maxThreads", "" + i);
    }
}
