package org.apache.helix.integration;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.TaskTestUtil;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskState;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/TestZkConnectionLost.class */
public class TestZkConnectionLost extends TaskTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(TestZkConnectionLost.class);
    private final AtomicReference<ZkServer> _zkServerRef = new AtomicReference<>();
    private String _zkAddr = "localhost:21893";
    ClusterSetup _setupTool;
    HelixZkClient _zkClient;

    @Override // org.apache.helix.integration.task.TaskTestBase, org.apache.helix.task.TaskSynchronizedTestBase
    @BeforeClass
    public void beforeClass() throws Exception {
        this._zkServerRef.set(TestHelper.startZkServer(this._zkAddr));
        this._zkClient = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(this._zkAddr));
        this._zkClient.setZkSerializer(new ZNRecordSerializer());
        this._setupTool = new ClusterSetup(this._zkClient);
        this._participants = new MockParticipantManager[this._numNodes];
        this._setupTool.addCluster(this.CLUSTER_NAME, true);
        setupParticipants(this._setupTool);
        setupDBs(this._setupTool);
        createManagers(this._zkAddr, this.CLUSTER_NAME);
        this._controller = new ClusterControllerManager(this._zkAddr, this.CLUSTER_NAME, "controller_0");
        this._controller.syncStart();
        Assert.assertTrue(new BestPossibleExternalViewVerifier.Builder(this.CLUSTER_NAME).setZkAddr(this._zkAddr).build().verifyByPolling());
    }

    @Override // org.apache.helix.task.TaskSynchronizedTestBase
    @AfterClass
    public void afterClass() throws Exception {
        if (this._controller != null && this._controller.isConnected()) {
            this._controller.syncStop();
        }
        if (this._manager != null && this._manager.isConnected()) {
            this._manager.disconnect();
        }
        stopParticipants();
        TestHelper.dropCluster(this.CLUSTER_NAME, this._zkClient, this._setupTool);
        this._zkClient.close();
        TestHelper.stopZkServer(this._zkServerRef.get());
    }

    @Test
    public void testLostZkConnection() throws Exception {
        System.setProperty("helixmanager.waitForConnectedTimeout", "1000");
        System.setProperty("zk.session.timeout", "1000");
        try {
            String testMethodName = TestHelper.getTestMethodName();
            startParticipants(this._zkAddr);
            LOG.info("Starting job-queue: " + testMethodName);
            JobQueue.Builder buildRecurrentJobQueue = TaskTestUtil.buildRecurrentJobQueue(testMethodName, 0, 6000);
            createAndEnqueueJob(buildRecurrentJobQueue, 3);
            this._driver.start(buildRecurrentJobQueue.build());
            restartZkServer();
            this._driver.pollForWorkflowState(TaskTestUtil.pollForWorkflowContext(this._driver, testMethodName).getLastScheduledSingleWorkflow(), 30000L, new TaskState[]{TaskState.COMPLETED});
            System.clearProperty("helixmanager.waitForConnectedTimeout");
            System.clearProperty("zk.session.timeout");
        } catch (Throwable th) {
            System.clearProperty("helixmanager.waitForConnectedTimeout");
            System.clearProperty("zk.session.timeout");
            throw th;
        }
    }

    @Test(dependsOnMethods = {"testLostZkConnection"}, enabled = false)
    public void testLostZkConnectionNegative() throws Exception {
        System.setProperty("helixmanager.waitForConnectedTimeout", "10");
        System.setProperty("zk.session.timeout", "1000");
        try {
            String testMethodName = TestHelper.getTestMethodName();
            stopParticipants();
            startParticipants(this._zkAddr);
            LOG.info("Starting job-queue: " + testMethodName);
            JobQueue.Builder buildRecurrentJobQueue = TaskTestUtil.buildRecurrentJobQueue(testMethodName, 0, 6000);
            createAndEnqueueJob(buildRecurrentJobQueue, 3);
            this._driver.start(buildRecurrentJobQueue.build());
            restartZkServer();
            try {
                this._driver.pollForWorkflowState(TaskTestUtil.pollForWorkflowContext(this._driver, testMethodName).getLastScheduledSingleWorkflow(), 30000L, new TaskState[]{TaskState.COMPLETED});
                Assert.fail("Test failure!");
            } catch (HelixException e) {
            }
        } finally {
            System.clearProperty("helixmanager.waitForConnectedTimeout");
            System.clearProperty("zk.session.timeout");
        }
    }

    private void restartZkServer() throws ExecutionException, InterruptedException {
        for (int i = 0; i < 4; i++) {
            Executors.newSingleThreadExecutor().submit(new Runnable() { // from class: org.apache.helix.integration.TestZkConnectionLost.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Thread.sleep(300L);
                        System.out.println(System.currentTimeMillis() + ": Shutdown ZK server.");
                        TestHelper.stopZkServer((ZkServer) TestZkConnectionLost.this._zkServerRef.get());
                        Thread.sleep(300L);
                        System.out.println("Restart ZK server");
                        TestZkConnectionLost.this._zkServerRef.set(TestHelper.startZkServer(TestZkConnectionLost.this._zkAddr, null, false));
                    } catch (Exception e) {
                        TestZkConnectionLost.LOG.error(e.getMessage(), e);
                    }
                }
            }).get();
        }
    }

    private List<String> createAndEnqueueJob(JobQueue.Builder builder, int i) {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (i2 < i) {
            String str = i2 == 0 ? "MASTER" : "SLAVE";
            JobConfig.Builder jobCommandConfigMap = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(Sets.newHashSet(new String[]{str})).setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "100"));
            String str2 = str.toLowerCase() + "Job" + i2;
            builder.enqueueJob(str2, jobCommandConfigMap);
            arrayList.add(str2);
            i2++;
        }
        Assert.assertEquals(arrayList.size(), i);
        return arrayList;
    }
}
