package org.apache.helix.integration.task;

import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.MockController;
import org.apache.helix.manager.zk.MockParticipant;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskResult;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.Workflow;
import org.apache.helix.testutil.ZkTestBase;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/task/TestTaskRebalancerStopResume.class */
public class TestTaskRebalancerStopResume extends ZkTestBase {
    private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class);
    private static final int n = 5;
    private static final int START_PORT = 12918;
    private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
    private static final String TIMEOUT_CONFIG = "Timeout";
    private static final String TGT_DB = "TestDB";
    private static final String JOB_RESOURCE = "SomeJob";
    private static final int NUM_PARTITIONS = 20;
    private static final int NUM_REPLICAS = 3;
    private final String CLUSTER_NAME = "TestTaskRebalancerStopResume";
    private final MockParticipant[] _participants = new MockParticipant[n];
    private MockController _controller;
    private HelixManager _manager;
    private TaskDriver _driver;

    /* loaded from: input_file:org/apache/helix/integration/task/TestTaskRebalancerStopResume$ReindexTask.class */
    public static class ReindexTask implements Task {
        private final long _delay;
        private volatile boolean _canceled;

        public ReindexTask(TaskCallbackContext taskCallbackContext) {
            Map jobCommandConfigMap = taskCallbackContext.getJobConfig().getJobCommandConfigMap();
            jobCommandConfigMap = jobCommandConfigMap == null ? Collections.emptyMap() : jobCommandConfigMap;
            this._delay = jobCommandConfigMap.containsKey(TestTaskRebalancerStopResume.TIMEOUT_CONFIG) ? Long.parseLong((String) jobCommandConfigMap.get(TestTaskRebalancerStopResume.TIMEOUT_CONFIG)) : 200L;
        }

        public TaskResult run() {
            long currentTimeMillis = System.currentTimeMillis() + this._delay;
            while (System.currentTimeMillis() < currentTimeMillis) {
                if (this._canceled) {
                    long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                    return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(currentTimeMillis2 < 0 ? 0L : currentTimeMillis2));
                }
                sleep(50L);
            }
            long currentTimeMillis3 = currentTimeMillis - System.currentTimeMillis();
            return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(currentTimeMillis3 < 0 ? 0L : currentTimeMillis3));
        }

        public void cancel() {
            this._canceled = true;
        }

        private static void sleep(long j) {
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @BeforeClass
    public void beforeClass() throws Exception {
        if (_zkclient.exists("/TestTaskRebalancerStopResume")) {
            _zkclient.deleteRecursive("/TestTaskRebalancerStopResume");
        }
        _setupTool.addCluster("TestTaskRebalancerStopResume", true);
        for (int i = 0; i < n; i++) {
            _setupTool.addInstanceToCluster("TestTaskRebalancerStopResume", "localhost_" + (START_PORT + i));
        }
        _setupTool.addResourceToCluster("TestTaskRebalancerStopResume", "TestDB", NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
        _setupTool.rebalanceStorageCluster("TestTaskRebalancerStopResume", "TestDB", NUM_REPLICAS);
        HashMap hashMap = new HashMap();
        hashMap.put("Reindex", new TaskFactory() { // from class: org.apache.helix.integration.task.TestTaskRebalancerStopResume.1
            public Task createNewTask(TaskCallbackContext taskCallbackContext) {
                return new ReindexTask(taskCallbackContext);
            }
        });
        for (int i2 = 0; i2 < n; i2++) {
            this._participants[i2] = new MockParticipant(_zkaddr, "TestTaskRebalancerStopResume", "localhost_" + (START_PORT + i2));
            this._participants[i2].getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("Task"), new TaskStateModelFactory(this._participants[i2], hashMap));
            this._participants[i2].syncStart();
        }
        this._controller = new MockController(_zkaddr, "TestTaskRebalancerStopResume", "controller_0");
        this._controller.syncStart();
        this._manager = HelixManagerFactory.getZKHelixManager("TestTaskRebalancerStopResume", "Admin", InstanceType.ADMINISTRATOR, _zkaddr);
        this._manager.connect();
        this._driver = new TaskDriver(this._manager);
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(_zkaddr, "TestTaskRebalancerStopResume")));
        Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(_zkaddr, "TestTaskRebalancerStopResume")));
    }

    @AfterClass
    public void afterClass() throws Exception {
        this._controller.syncStop();
        for (int i = 0; i < n; i++) {
            this._participants[i].syncStop();
        }
        this._manager.disconnect();
    }

    @Test
    public void stopAndResume() throws Exception {
        Workflow build = WorkflowGenerator.generateDefaultSingleJobWorkflowBuilderWithExtraConfigs(JOB_RESOURCE, ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(100)), new String[0]).build();
        LOG.info("Starting flow " + build.getName());
        this._driver.start(build);
        TestUtil.pollForWorkflowState(this._manager, JOB_RESOURCE, TaskState.IN_PROGRESS);
        LOG.info("Pausing job");
        this._driver.stop(JOB_RESOURCE);
        TestUtil.pollForWorkflowState(this._manager, JOB_RESOURCE, TaskState.STOPPED);
        LOG.info("Resuming job");
        this._driver.resume(JOB_RESOURCE);
        TestUtil.pollForWorkflowState(this._manager, JOB_RESOURCE, TaskState.COMPLETED);
    }

    @Test
    public void stopAndResumeWorkflow() throws Exception {
        Workflow build = WorkflowGenerator.generateDefaultRepeatedJobWorkflowBuilder("SomeWorkflow").build();
        LOG.info("Starting flow SomeWorkflow");
        this._driver.start(build);
        TestUtil.pollForWorkflowState(this._manager, "SomeWorkflow", TaskState.IN_PROGRESS);
        LOG.info("Pausing workflow");
        this._driver.stop("SomeWorkflow");
        TestUtil.pollForWorkflowState(this._manager, "SomeWorkflow", TaskState.STOPPED);
        LOG.info("Resuming workflow");
        this._driver.resume("SomeWorkflow");
        TestUtil.pollForWorkflowState(this._manager, "SomeWorkflow", TaskState.COMPLETED);
    }
}
