package org.apache.rocketmq.test.autoswitchrole;

import java.io.File;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.controller.ControllerManager;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoResponseHeader;
import org.apache.rocketmq.store.MappedFileQueue;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.ha.HAClient;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.class */
public class AutoSwitchRoleIntegrationTest extends AutoSwitchRoleBase {
    private static final int DEFAULT_FILE_SIZE = 1048576;
    private static NamesrvController namesrvController;
    private static ControllerManager controllerManager;
    private static String nameserverAddress;
    private static String controllerAddress;
    private static ControllerConfig controllerConfig;
    private BrokerController brokerController1;
    private BrokerController brokerController2;
    private Random random = new Random();

    @BeforeClass
    public static void init() throws Exception {
        initialize();
        int nextPort = nextPort();
        String format = String.format("n0-localhost:%d", Integer.valueOf(nextPort));
        NettyServerConfig nettyServerConfig = new NettyServerConfig();
        int nextPort2 = nextPort();
        nettyServerConfig.setListenPort(nextPort2);
        controllerConfig = buildControllerConfig("n0", format);
        namesrvController = new NamesrvController(new NamesrvConfig(), nettyServerConfig, new NettyClientConfig());
        Assert.assertTrue(namesrvController.initialize());
        namesrvController.start();
        initAndStartControllerManager();
        nameserverAddress = "127.0.0.1:" + nextPort2 + ";";
        controllerAddress = "127.0.0.1:" + nextPort + ";";
    }

    private static void initAndStartControllerManager() {
        controllerManager = new ControllerManager(controllerConfig, new NettyServerConfig(), new NettyClientConfig());
        Assert.assertTrue(controllerManager.initialize());
        controllerManager.start();
    }

    public void initBroker(int i, String str) throws Exception {
        this.brokerController1 = startBroker(nameserverAddress, controllerAddress, str, 1, nextPort(), nextPort(), nextPort(), BrokerRole.SYNC_MASTER, i);
        this.brokerController2 = startBroker(nameserverAddress, controllerAddress, str, 2, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, i);
        Assert.assertTrue(waitSlaveReady(this.brokerController2.getMessageStore()));
        Thread.sleep(1000L);
    }

    public void mockData(String str) throws Exception {
        putMessage(this.brokerController1.getMessageStore(), str);
        checkMessage(this.brokerController2.getMessageStore(), str, 10, 0);
    }

    public boolean waitSlaveReady(MessageStore messageStore) throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            HAClient hAClient = messageStore.getHaService().getHAClient();
            if (hAClient != null && hAClient.getCurrentState().equals(HAConnectionState.TRANSFER)) {
                return true;
            }
            Thread.sleep(2000L);
        }
        return false;
    }

    @Test
    public void testCheckSyncStateSet() throws Exception {
        String str = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        initBroker(DEFAULT_FILE_SIZE, "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535));
        mockData(str);
        ReplicasManager replicasManager = this.brokerController1.getReplicasManager();
        Assert.assertEquals(2L, replicasManager.getSyncStateSet().getSyncStateSet().size());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        while (!newSingleThreadScheduledExecutor.awaitTermination(6000L, TimeUnit.MILLISECONDS)) {
            this.brokerController2.shutdown();
            newSingleThreadScheduledExecutor.shutdown();
        }
        SyncStateSet syncStateSet = replicasManager.getSyncStateSet();
        shutdownAndClearBroker();
        Assert.assertEquals(1L, syncStateSet.getSyncStateSet().size());
    }

    @Test
    public void testChangeMaster() throws Exception {
        String str = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        String str2 = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        initBroker(DEFAULT_FILE_SIZE, str2);
        int listenPort = this.brokerController1.getBrokerConfig().getListenPort();
        int listenPort2 = this.brokerController1.getNettyServerConfig().getListenPort();
        mockData(str);
        this.brokerController1.shutdown();
        brokerList.remove(this.brokerController1);
        Thread.sleep(6000L);
        Assert.assertTrue(this.brokerController2.getReplicasManager().isMasterState());
        Assert.assertEquals(this.brokerController2.getReplicasManager().getMasterEpoch(), 2L);
        this.brokerController1 = startBroker(nameserverAddress, controllerAddress, str2, 1, nextPort(), listenPort, listenPort2, BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
        waitSlaveReady(this.brokerController1.getMessageStore());
        Assert.assertFalse(this.brokerController1.getReplicasManager().isMasterState());
        Assert.assertEquals(this.brokerController1.getReplicasManager().getMasterAddress(), this.brokerController2.getReplicasManager().getBrokerAddress());
        putMessage(this.brokerController2.getMessageStore(), str);
        checkMessage(this.brokerController1.getMessageStore(), str, 20, 0);
        shutdownAndClearBroker();
    }

    @Test
    public void testRestartWithChangedAddress() throws Exception {
        String str = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        String str2 = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        int nextPort = nextPort();
        this.brokerController1 = startBroker(nameserverAddress, controllerAddress, str2, 1, nextPort(), nextPort, nextPort, BrokerRole.SYNC_MASTER, DEFAULT_FILE_SIZE);
        Thread.sleep(1000L);
        Assert.assertTrue(this.brokerController1.getReplicasManager().isMasterState());
        Assert.assertEquals(this.brokerController1.getReplicasManager().getMasterEpoch(), 1L);
        this.brokerController1.shutdown();
        brokerList.remove(this.brokerController1);
        Thread.sleep(6000L);
        int nextPort2 = nextPort();
        this.brokerController1 = startBroker(nameserverAddress, controllerAddress, str2, 1, nextPort(), nextPort2, nextPort2, BrokerRole.SYNC_MASTER, DEFAULT_FILE_SIZE);
        Thread.sleep(1000L);
        Assert.assertEquals(1L, this.brokerController1.getReplicasManager().getBrokerControllerId().longValue());
        Assert.assertTrue(this.brokerController1.getReplicasManager().isMasterState());
        GetReplicaInfoResponseHeader readCustomHeader = ((RemotingCommand) controllerManager.getController().getReplicaInfo(new GetReplicaInfoRequestHeader(str2)).get(500L, TimeUnit.MILLISECONDS)).readCustomHeader();
        Assert.assertEquals(1L, readCustomHeader.getMasterBrokerId().longValue());
        Assert.assertTrue(readCustomHeader.getMasterAddress().contains(String.valueOf(nextPort2)));
        shutdownAndClearBroker();
    }

    @Test
    public void testBasicWorkWhenControllerShutdown() throws Exception {
        initBroker(DEFAULT_FILE_SIZE, "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt());
        putMessage(this.brokerController1.getMessageStore(), "Foobar");
        checkMessage(this.brokerController2.getMessageStore(), "Foobar", 10, 0);
        controllerManager.shutdown();
        putMessage(this.brokerController1.getMessageStore(), "Foobar");
        checkMessage(this.brokerController2.getMessageStore(), "Foobar", 20, 0);
        initAndStartControllerManager();
    }

    @Test
    public void testAddBroker() throws Exception {
        String str = "Topic-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        String str2 = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        initBroker(DEFAULT_FILE_SIZE, str2);
        mockData(str);
        BrokerController startBroker = startBroker(nameserverAddress, controllerAddress, str2, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, DEFAULT_FILE_SIZE);
        waitSlaveReady(startBroker.getMessageStore());
        checkMessage(startBroker.getMessageStore(), str, 10, 0);
        putMessage(this.brokerController1.getMessageStore(), str);
        checkMessage(startBroker.getMessageStore(), str, 20, 0);
        shutdownAndClearBroker();
    }

    @Test
    public void testTruncateEpochLogAndChangeMaster() throws Exception {
        shutdownAndClearBroker();
        String str = "Broker-" + AutoSwitchRoleIntegrationTest.class.getSimpleName() + this.random.nextInt(65535);
        initBroker(1700, str);
        putMessage(this.brokerController1.getMessageStore(), "FooBar");
        checkMessage(this.brokerController2.getMessageStore(), "FooBar", 10, 0);
        this.brokerController1.shutdown();
        brokerList.remove(this.brokerController1);
        Thread.sleep(5000L);
        Assert.assertTrue(this.brokerController2.getReplicasManager().isMasterState());
        Assert.assertEquals(this.brokerController2.getReplicasManager().getMasterEpoch(), 2L);
        BrokerController startBroker = startBroker(nameserverAddress, controllerAddress, str, 3, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
        waitSlaveReady(startBroker.getMessageStore());
        checkMessage(startBroker.getMessageStore(), "FooBar", 10, 0);
        putMessage(this.brokerController2.getMessageStore(), "FooBar");
        checkMessage(startBroker.getMessageStore(), "FooBar", 20, 0);
        MessageStore messageStore = this.brokerController2.getMessageStore();
        MappedFileQueue mappedFileQueue = messageStore.getCommitLog().getMappedFileQueue();
        Assert.assertEquals(2L, mappedFileQueue.getTotalFileSize() / 1700);
        messageStore.getCommitLog().getMappedFileQueue().getFirstMappedFile().shutdown(1000L);
        mappedFileQueue.retryDeleteFirstFile(1000L);
        Assert.assertEquals(messageStore.getCommitLog().getMinOffset(), 1700L);
        this.brokerController2.getMessageStore().getHaService().truncateEpochFilePrefix(1570L);
        checkMessage(messageStore, "FooBar", 10, 10);
        BrokerController startBroker2 = startBroker(nameserverAddress, controllerAddress, str, 4, nextPort(), nextPort(), nextPort(), BrokerRole.SLAVE, 1700);
        waitSlaveReady(startBroker2.getMessageStore());
        checkMessage(startBroker2.getMessageStore(), "FooBar", 10, 10);
        shutdownAndClearBroker();
    }

    public void shutdownAndClearBroker() throws InterruptedException {
        for (BrokerController brokerController : brokerList) {
            brokerController.shutdown();
            UtilAll.deleteFile(new File(brokerController.getMessageStoreConfig().getStorePathRootDir()));
        }
        brokerList.clear();
    }

    @AfterClass
    public static void destroy() {
        if (namesrvController != null) {
            namesrvController.shutdown();
        }
        if (controllerManager != null) {
            controllerManager.shutdown();
        }
        UtilAll.deleteFile(new File(STORE_PATH_ROOT_PARENT_DIR));
    }
}
