package org.apache.helix.integration.messaging;

import java.util.Date;
import java.util.UUID;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.messaging.handling.TestResourceThreadpoolSize;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/helix/integration/messaging/TestGroupCommitAddBackData.class */
public class TestGroupCommitAddBackData extends ZkTestBase {
    private static Logger LOG = LoggerFactory.getLogger(TestGroupCommitAddBackData.class);
    private static final int START_PORT = 12918;
    private static final int DEFAULT_TIMEOUT = 30000;
    private HelixManager _manager;
    private final String CLASS_NAME = getShortClassName();
    private final String CLUSTER_NAME = "CLUSTER_" + this.CLASS_NAME;
    private MockParticipantManager _participant;

    @Override // org.apache.helix.common.ZkTestBase
    @BeforeClass
    public void beforeClass() throws Exception {
        System.out.println("START " + this.CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
        _gSetupTool.addCluster(this.CLUSTER_NAME, true);
        _gSetupTool.addInstanceToCluster(this.CLUSTER_NAME, "localhost_12918");
        this._participant = new MockParticipantManager(ZkTestBase.ZK_ADDR, this.CLUSTER_NAME, "localhost_12918");
        this._participant.syncStart();
        this._manager = HelixManagerFactory.getZKHelixManager(this.CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZkTestBase.ZK_ADDR);
        this._manager.connect();
    }

    @AfterClass
    public void afterClass() throws Exception {
        if (this._participant != null && this._participant.isConnected()) {
            this._participant.syncStop();
        }
        if (this._manager != null && this._manager.isConnected()) {
            this._manager.disconnect();
        }
        if (_gZkClient.exists("/" + this.CLUSTER_NAME)) {
            try {
                _gSetupTool.deleteCluster(this.CLUSTER_NAME);
            } catch (Exception e) {
                System.err.println("Failed to delete cluster " + this.CLUSTER_NAME + ", error: " + e.getLocalizedMessage());
                _gSetupTool.deleteCluster(this.CLUSTER_NAME);
            }
        }
        System.out.println("END " + this.CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
    }

    @Test
    public void testGroupCommitAddCurrentStateBack() throws InterruptedException {
        HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
        Message generateMessage = generateMessage("OFFLINE", "ONLINE");
        helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().message(this._participant.getInstanceName(), generateMessage.getMsgId()), generateMessage);
        Assert.assertTrue(waitForMessageProcessed(helixDataAccessor, generateMessage.getMsgId()));
        Message generateMessage2 = generateMessage("ONLINE", "OFFLINE");
        helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().message(this._participant.getInstanceName(), generateMessage2.getMsgId()), generateMessage2);
        Assert.assertTrue(waitForMessageProcessed(helixDataAccessor, generateMessage2.getMsgId()));
        for (int i = 0; i < 10; i++) {
            Message generateMessage3 = generateMessage("OFFLINE", "DROPPED");
            helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().message(this._participant.getInstanceName(), generateMessage3.getMsgId()), generateMessage3);
            Assert.assertTrue(waitForMessageProcessed(helixDataAccessor, generateMessage3.getMsgId()));
            Assert.assertFalse(helixDataAccessor.getBaseDataAccessor().exists(helixDataAccessor.keyBuilder().currentState(this._participant.getInstanceName(), this._participant.getSessionId(), WorkflowGenerator.DEFAULT_TGT_DB).getPath(), 0));
        }
    }

    private Message generateMessage(String str, String str2) {
        Message message = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
        message.setSrcName("ADMIN");
        message.setTgtName(this._participant.getInstanceName());
        message.setMsgState(Message.MessageState.NEW);
        message.setPartitionName("P");
        message.setResourceName(WorkflowGenerator.DEFAULT_TGT_DB);
        message.setFromState(str);
        message.setToState(str2);
        message.setTgtSessionId(this._participant.getSessionId());
        message.setSrcSessionId(this._manager.getSessionId());
        message.setStateModelDef(TestResourceThreadpoolSize.ONLINE_OFFLINE);
        message.setStateModelFactoryName("DEFAULT");
        return message;
    }

    private boolean waitForMessageProcessed(HelixDataAccessor helixDataAccessor, String str) throws InterruptedException {
        String path = helixDataAccessor.keyBuilder().message(this._participant.getInstanceName(), str).getPath();
        long currentTimeMillis = System.currentTimeMillis();
        while (helixDataAccessor.getBaseDataAccessor().exists(path, 0)) {
            if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                return false;
            }
            Thread.sleep(200L);
        }
        return true;
    }
}
