package org.apache.helix.integration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.api.State;
import org.apache.helix.api.StateTransitionHandlerFactory;
import org.apache.helix.api.TransitionHandler;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.api.id.StateModelFactoryId;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.testutil.ZkTestUtil;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestZkReconnect.class */
public class TestZkReconnect {
    private static final Logger LOG = Logger.getLogger(TestZkReconnect.class);

    /* loaded from: input_file:org/apache/helix/integration/TestZkReconnect$SimpleStateModel.class */
    public static final class SimpleStateModel extends TransitionHandler {
        private final CountDownLatch latch;

        public SimpleStateModel(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void onBecomeOnlineFromOffline(Message message, NotificationContext notificationContext) {
            TestZkReconnect.LOG.info("message: " + message);
            this.latch.countDown();
        }
    }

    @Test
    public void testZKReconnect() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final String format = String.format("localhost:%d", Integer.valueOf(ZkTestUtil.availableTcpPort()));
        atomicReference.set(TestHelper.startZkServer(format));
        String str = TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
        LOG.info("Setup clusters");
        ClusterSetup clusterSetup = new ClusterSetup(format);
        clusterSetup.addCluster(str, true);
        LOG.info("Starts controller");
        HelixManagerFactory.getZKHelixManager(str, (String) null, InstanceType.CONTROLLER, format).connect();
        LOG.info("Starts participant");
        String format2 = String.format("%s_%d", "localhost", 1);
        clusterSetup.addInstanceToCluster(str, format2);
        HelixManager zKHelixManager = HelixManagerFactory.getZKHelixManager(str, format2, InstanceType.PARTICIPANT, format);
        zKHelixManager.connect();
        LOG.info("Register state machine");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        zKHelixManager.getStateMachineEngine().registerStateModelFactory(StateModelDefId.OnlineOffline, "test", new StateTransitionHandlerFactory<TransitionHandler>() { // from class: org.apache.helix.integration.TestZkReconnect.1
            public TransitionHandler createStateTransitionHandler(PartitionId partitionId) {
                return new SimpleStateModel(countDownLatch);
            }
        });
        LOG.info("Ideal state assignment");
        HelixAdmin clusterManagmentTool = zKHelixManager.getClusterManagmentTool();
        clusterManagmentTool.addResource(str, "test-resource", 1, "OnlineOffline", IdealState.RebalanceMode.CUSTOMIZED.toString());
        IdealState resourceIdealState = clusterManagmentTool.getResourceIdealState(str, "test-resource");
        resourceIdealState.setReplicas("1");
        resourceIdealState.setStateModelFactoryId(StateModelFactoryId.from("test"));
        resourceIdealState.setPartitionState(PartitionId.from(ResourceId.from("test-resource"), "0"), ParticipantId.from(format2), State.from("ONLINE"));
        LOG.info("Shutdown ZK server");
        TestHelper.stopZkServer((ZkServer) atomicReference.get());
        Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() { // from class: org.apache.helix.integration.TestZkReconnect.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    TestZkReconnect.LOG.info("Restart ZK server");
                    atomicReference.set(TestHelper.startZkServer(format, null, false));
                } catch (Exception e) {
                    TestZkReconnect.LOG.error(e.getMessage(), e);
                }
            }
        }, 2L, TimeUnit.SECONDS);
        LOG.info("Before update ideal state");
        clusterManagmentTool.setResourceIdealState(str, "test-resource", resourceIdealState);
        LOG.info("After update ideal state");
        LOG.info("Wait for OFFLINE->ONLINE state transition");
        try {
            Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(format, str)));
            zKHelixManager.disconnect();
            ((ZkServer) atomicReference.get()).shutdown();
        } catch (Throwable th) {
            zKHelixManager.disconnect();
            ((ZkServer) atomicReference.get()).shutdown();
            throw th;
        }
    }
}
