package org.apache.helix.integration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.manager.zk.HelixManagerStateListener;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.messaging.handling.TestResourceThreadpoolSize;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestZkReconnect.class */
public class TestZkReconnect {
    private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class);

    /* loaded from: input_file:org/apache/helix/integration/TestZkReconnect$SimpleStateModel.class */
    public static final class SimpleStateModel extends StateModel {
        private final CountDownLatch latch;

        public SimpleStateModel(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void onBecomeOnlineFromOffline(Message message, NotificationContext notificationContext) {
            TestZkReconnect.LOG.info("message: " + message);
            this.latch.countDown();
        }
    }

    @Test(enabled = false)
    public void testZKReconnect() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final String format = String.format("localhost:%d", Integer.valueOf(TestHelper.getRandomPort()));
        atomicReference.set(TestHelper.startZkServer(format));
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        LOG.info("Setup clusters");
        ClusterSetup clusterSetup = new ClusterSetup(format);
        clusterSetup.addCluster(str, true);
        LOG.info("Starts controller");
        HelixManagerFactory.getZKHelixManager(str, (String) null, InstanceType.CONTROLLER, format).connect();
        LOG.info("Starts participant");
        String format2 = String.format("%s_%d", "localhost", 1);
        clusterSetup.addInstanceToCluster(str, format2);
        HelixManager zKHelixManager = HelixManagerFactory.getZKHelixManager(str, format2, InstanceType.PARTICIPANT, format);
        zKHelixManager.connect();
        LOG.info("Register state machine");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zKHelixManager.getStateMachineEngine().registerStateModelFactory(TestResourceThreadpoolSize.ONLINE_OFFLINE, new StateModelFactory<StateModel>() { // from class: org.apache.helix.integration.TestZkReconnect.1
            public StateModel createNewStateModel(String str2, String str3) {
                return new SimpleStateModel(countDownLatch);
            }
        }, "test");
        LOG.info("Ideal state assignment");
        HelixAdmin clusterManagmentTool = zKHelixManager.getClusterManagmentTool();
        clusterManagmentTool.addResource(str, "test-resource", 1, TestResourceThreadpoolSize.ONLINE_OFFLINE, IdealState.RebalanceMode.CUSTOMIZED.toString());
        IdealState resourceIdealState = clusterManagmentTool.getResourceIdealState(str, "test-resource");
        resourceIdealState.setReplicas("1");
        resourceIdealState.setStateModelFactoryName("test");
        resourceIdealState.setPartitionState("test-resource_0", format2, "ONLINE");
        LOG.info("Shutdown ZK server");
        TestHelper.stopZkServer((ZkServer) atomicReference.get());
        Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { // from class: org.apache.helix.integration.TestZkReconnect.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    TestZkReconnect.LOG.info("Restart ZK server");
                    atomicReference.set(TestHelper.startZkServer(format, null, false));
                } catch (Exception e) {
                    TestZkReconnect.LOG.error(e.getMessage(), e);
                }
            }
        }, 2L, TimeUnit.SECONDS);
        LOG.info("Before update ideal state");
        clusterManagmentTool.setResourceIdealState(str, "test-resource", resourceIdealState);
        LOG.info("After update ideal state");
        LOG.info("Wait for OFFLINE->ONLINE state transition");
        try {
            Assert.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
            Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(format, str)));
            zKHelixManager.disconnect();
            ((ZkServer) atomicReference.get()).shutdown();
        } catch (Throwable th) {
            zKHelixManager.disconnect();
            ((ZkServer) atomicReference.get()).shutdown();
            throw th;
        }
    }

    @Test
    public void testZKDisconnectCallback() throws Exception {
        String format = String.format("localhost:%d", Integer.valueOf(TestHelper.getRandomPort()));
        ZkServer startZkServer = TestHelper.startZkServer(format);
        final String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        final AtomicReference atomicReference = new AtomicReference(false);
        LOG.info("Setup clusters");
        new ClusterSetup(format).addCluster(str, true);
        System.setProperty("zk.connection.timeout", "2000");
        System.setProperty("zk.connectionReEstablishment.timeout", "1000");
        LOG.info("Starts controller");
        ZKHelixManager zKHelixManager = HelixManagerFactory.getZKHelixManager(str, (String) null, InstanceType.CONTROLLER, format, new HelixManagerStateListener() { // from class: org.apache.helix.integration.TestZkReconnect.3
            public void onConnected(HelixManager helixManager) throws Exception {
            }

            public void onDisconnected(HelixManager helixManager, Throwable th) throws Exception {
                Assert.assertEquals(helixManager.getClusterName(), str);
                atomicReference.getAndSet(true);
            }
        });
        try {
            zKHelixManager.connect();
            ZkHelixPropertyStore helixPropertyStore = zKHelixManager.getHelixPropertyStore();
            startZkServer.shutdown();
            zKHelixManager.handleSessionEstablishmentError(new Exception("For testing"));
            Assert.assertTrue(((Boolean) atomicReference.get()).booleanValue());
            try {
                helixPropertyStore.get("/", (Stat) null, 0);
                Assert.fail("propertyStore should be disconnected.");
            } catch (IllegalStateException e) {
                System.out.println(e.getMessage());
            }
            atomicReference.getAndSet(false);
            startZkServer.start();
            zKHelixManager.handleSessionEstablishmentError(new Exception("For testing"));
            Assert.assertFalse(((Boolean) atomicReference.get()).booleanValue());
            zKHelixManager.getHelixPropertyStore().get("/", (Stat) null, 0);
            zKHelixManager.disconnect();
            startZkServer.shutdown();
            System.clearProperty("zk.connection.timeout");
            System.clearProperty("zk.connectionReEstablishment.timeout");
        } catch (Throwable th) {
            zKHelixManager.disconnect();
            startZkServer.shutdown();
            System.clearProperty("zk.connection.timeout");
            System.clearProperty("zk.connectionReEstablishment.timeout");
            throw th;
        }
    }
}
