package org.apache.helix.integration.messaging;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.integration.DelayedTransitionBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.messaging.handling.MockHelixTaskExecutor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.class */
public class TestP2PNoDuplicatedMessage extends ZkTestBase {
    static final int PARTICIPANT_NUMBER = 10;
    static final int PARTICIPANT_START_PORT = 12918;
    static final int DB_COUNT = 2;
    static final int PARTITION_NUMBER = 100;
    static final int REPLICA_NUMBER = 3;
    ClusterControllerManager _controller;
    ZkHelixClusterVerifier _clusterVerifier;
    ConfigAccessor _configAccessor;
    HelixDataAccessor _accessor;
    private static Logger logger = LoggerFactory.getLogger(TestP2PNoDuplicatedMessage.class);
    static int total = 0;
    static int p2pTrigged = 0;
    final String CLASS_NAME = getShortClassName();
    final String CLUSTER_NAME = "CLUSTER_" + this.CLASS_NAME;
    final String _controllerName = "controller_0";
    List<MockParticipantManager> _participants = new ArrayList();
    List<String> _instances = new ArrayList();

    /* loaded from: input_file:org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage$MockMessagingService.class */
    static class MockMessagingService extends DefaultMessagingService {
        private final HelixTaskExecutor _taskExecutor;
        ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded;
        private final HelixManager _manager;

        public MockMessagingService(HelixManager helixManager) {
            super(helixManager);
            this._messageHandlerFactoriestobeAdded = new ConcurrentHashMap<>();
            this._manager = helixManager;
            this._taskExecutor = new MockHelixTaskExecutor(new ParticipantStatusMonitor(helixManager.getInstanceType() == InstanceType.PARTICIPANT || helixManager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT, helixManager.getInstanceName()), new MessageQueueMonitor(helixManager.getClusterName(), helixManager.getInstanceName()));
        }

        public synchronized void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory) {
            registerMessageHandlerFactory(Collections.singletonList(str), messageHandlerFactory);
        }

        public synchronized void registerMessageHandlerFactory(List<String> list, MessageHandlerFactory messageHandlerFactory) {
            if (this._manager.isConnected()) {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    registerMessageHandlerFactoryExtended(it.next(), messageHandlerFactory);
                }
            } else {
                Iterator<String> it2 = list.iterator();
                while (it2.hasNext()) {
                    this._messageHandlerFactoriestobeAdded.put(it2.next(), messageHandlerFactory);
                }
            }
        }

        public synchronized void onConnected() {
            Iterator it = this._messageHandlerFactoriestobeAdded.keySet().iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                registerMessageHandlerFactoryExtended(str, this._messageHandlerFactoriestobeAdded.get(str));
            }
            this._messageHandlerFactoriestobeAdded.clear();
        }

        public HelixTaskExecutor getExecutor() {
            return this._taskExecutor;
        }

        void registerMessageHandlerFactoryExtended(String str, MessageHandlerFactory messageHandlerFactory) {
            this._taskExecutor.registerMessageHandlerFactory(str, messageHandlerFactory, 40);
            super.sendNopMessage();
        }
    }

    /* loaded from: input_file:org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage$TestParticipantManager.class */
    static class TestParticipantManager extends MockParticipantManager {
        private final DefaultMessagingService _messagingService;

        public TestParticipantManager(String str, String str2, String str3) {
            super(str, str2, str3);
            this._messagingService = new MockMessagingService(this);
        }

        public ClusterMessagingService getMessagingService() {
            return this._messagingService;
        }

        @Override // org.apache.helix.integration.manager.MockParticipantManager, org.apache.helix.integration.manager.ClusterManager
        public void finalize() {
            super.finalize();
        }
    }

    @Override // org.apache.helix.common.ZkTestBase
    @BeforeClass
    public void beforeClass() {
        System.out.println("START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
        _gSetupTool.addCluster(this.CLUSTER_NAME, true);
        for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
            String str = BaseStageTest.HOSTNAME_PREFIX + (PARTICIPANT_START_PORT + i);
            _gSetupTool.addInstanceToCluster(this.CLUSTER_NAME, str);
            this._instances.add(str);
        }
        for (int i2 = 0; i2 < PARTICIPANT_NUMBER; i2++) {
            TestParticipantManager testParticipantManager = new TestParticipantManager(ZkTestBase.ZK_ADDR, this.CLUSTER_NAME, this._instances.get(i2));
            testParticipantManager.setTransition(new DelayedTransitionBase(100L));
            testParticipantManager.syncStart();
            this._participants.add(testParticipantManager);
        }
        enableDelayRebalanceInCluster(_gZkClient, this.CLUSTER_NAME, true);
        enablePersistBestPossibleAssignment(_gZkClient, this.CLUSTER_NAME, true);
        for (int i3 = 0; i3 < DB_COUNT; i3++) {
            createResourceWithDelayedRebalance(this.CLUSTER_NAME, "TestDB_" + i3, BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER, DB_COUNT, 1000000L, CrushEdRebalanceStrategy.class.getName());
        }
        this._controller = new ClusterControllerManager(ZkTestBase.ZK_ADDR, this.CLUSTER_NAME, "controller_0");
        this._controller.syncStart();
        this._clusterVerifier = new BestPossibleExternalViewVerifier.Builder(this.CLUSTER_NAME).setZkClient(_gZkClient).setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
        Assert.assertTrue(this._clusterVerifier.verifyByPolling());
        this._configAccessor = new ConfigAccessor(_gZkClient);
        this._accessor = new ZKHelixDataAccessor(this.CLUSTER_NAME, _baseAccessor);
    }

    @AfterClass
    public void afterClass() throws Exception {
        this._controller.syncStop();
        Iterator<MockParticipantManager> it = this._participants.iterator();
        while (it.hasNext()) {
            it.next().syncStop();
        }
        deleteCluster(this.CLUSTER_NAME);
        System.out.println("END " + this.CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testP2PStateTransitionDisabled() {
        enableP2PInCluster(this.CLUSTER_NAME, this._configAccessor, false);
        MockHelixTaskExecutor.resetStats();
        for (String str : this._instances) {
            _gSetupTool.getClusterManagementTool().enableInstance(this.CLUSTER_NAME, str, false);
            Assert.assertTrue(this._clusterVerifier.verifyByPolling());
            verifyP2PDisabled();
            _gSetupTool.getClusterManagementTool().enableInstance(this.CLUSTER_NAME, str, true);
            Assert.assertTrue(this._clusterVerifier.verifyByPolling());
            verifyP2PDisabled();
        }
        Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0, "There are duplicated transition messages sent while participant is handling the state-transition!");
        Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0, "There are duplicated transition messages sent at same time!");
    }

    @Test(dependsOnMethods = {"testP2PStateTransitionDisabled"})
    public void testP2PStateTransitionEnabled() {
        enableP2PInCluster(this.CLUSTER_NAME, this._configAccessor, true);
        long currentTimeMillis = System.currentTimeMillis();
        MockHelixTaskExecutor.resetStats();
        for (String str : this._instances) {
            _gSetupTool.getClusterManagementTool().enableInstance(this.CLUSTER_NAME, str, false);
            Assert.assertTrue(this._clusterVerifier.verifyByPolling());
            verifyP2PEnabled(currentTimeMillis);
            _gSetupTool.getClusterManagementTool().enableInstance(this.CLUSTER_NAME, str, true);
            Assert.assertTrue(this._clusterVerifier.verifyByPolling());
            verifyP2PEnabled(currentTimeMillis);
        }
        Math.round(total * 0.9d);
        Assert.assertTrue(((long) p2pTrigged) > Math.round(((double) total) * 0.9d));
        Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0, "There are duplicated transition messages sent while participant is handling the state-transition!");
        Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0, "There are duplicated transition messages sent at same time!");
    }

    private void verifyP2PDisabled() {
        ResourceControllerDataProvider resourceControllerDataProvider = new ResourceControllerDataProvider(this.CLUSTER_NAME);
        resourceControllerDataProvider.refresh(this._accessor);
        for (LiveInstance liveInstance : resourceControllerDataProvider.getLiveInstances().values()) {
            Map currentState = resourceControllerDataProvider.getCurrentState(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
            Assert.assertNotNull(currentState);
            for (CurrentState currentState2 : currentState.values()) {
                for (String str : currentState2.getPartitionStateMap().keySet()) {
                    String state = currentState2.getState(str);
                    if (state.equalsIgnoreCase("MASTER")) {
                        String triggerHost = currentState2.getTriggerHost(str);
                        Assert.assertEquals(triggerHost, "controller_0", state + " of " + str + " on " + liveInstance.getInstanceName() + " was triggered by " + triggerHost);
                    }
                }
            }
        }
    }

    private void verifyP2PEnabled(long j) {
        ResourceControllerDataProvider resourceControllerDataProvider = new ResourceControllerDataProvider(this.CLUSTER_NAME);
        resourceControllerDataProvider.refresh(this._accessor);
        for (LiveInstance liveInstance : resourceControllerDataProvider.getLiveInstances().values()) {
            Map currentState = resourceControllerDataProvider.getCurrentState(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
            Assert.assertNotNull(currentState);
            for (CurrentState currentState2 : currentState.values()) {
                for (String str : currentState2.getPartitionStateMap().keySet()) {
                    String state = currentState2.getState(str);
                    long startTime = currentState2.getStartTime(str);
                    if (state.equalsIgnoreCase("MASTER") && startTime > j) {
                        if (!currentState2.getTriggerHost(str).equals("controller_0")) {
                            p2pTrigged++;
                        }
                        total++;
                    }
                }
            }
        }
    }
}
