package io.confluent.databalancer.persistence;

import com.linkedin.cruisecontrol.exception.CruiseControlException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.SbkTopicUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Semaphore;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.BrokerRemovalDescription;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/databalancer/persistence/ApiStatePersistenceStoreTest.class */
public class ApiStatePersistenceStoreTest {
    private static final String API_STATE_TOPIC = "_ApiStatePersistenceTestStore";
    private static final long TEST_TIMEOUT = 60000;
    private EmbeddedKafkaCluster kafkaCluster;

    @Rule
    public final Timeout globalTimeout = Timeout.millis(TEST_TIMEOUT);
    private MockTime time = new MockTime();

    protected int numBrokers() {
        return 3;
    }

    @Before
    public void setUp() {
        this.kafkaCluster = new EmbeddedKafkaCluster();
        this.kafkaCluster.startZooKeeper();
        this.kafkaCluster.startBrokers(numBrokers(), new Properties());
    }

    @After
    public void tearDown() {
        if (this.kafkaCluster != null) {
            this.kafkaCluster.shutdown();
        }
    }

    @Test
    public void testProduceConsumeOneRecord() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        Throwable th = null;
        try {
            BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_FAILED, new CruiseControlException("test message"));
            apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
            BrokerRemovalStateRecord brokerRemovalStateRecord2 = apiStatePersistenceStore.getBrokerRemovalStateRecord(1);
            Assert.assertEquals(brokerRemovalStateRecord, brokerRemovalStateRecord2);
            Assert.assertEquals("test message", brokerRemovalStateRecord2.exception().getMessage());
            Assert.assertEquals(BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_FAILED.brokerShutdownStatus(), brokerRemovalStateRecord2.brokerShutdownStatus());
            Assert.assertEquals(BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_FAILED.partitionReassignmentsStatus(), brokerRemovalStateRecord2.partitionReassignmentsStatus());
            Assert.assertTrue("Start time should be set.", brokerRemovalStateRecord2.startTime() > 0);
            Assert.assertTrue("Last update time should be set", brokerRemovalStateRecord2.lastUpdateTime() > 0);
            Assert.assertEquals(brokerRemovalStateRecord2.startTime(), brokerRemovalStateRecord2.lastUpdateTime());
            if (apiStatePersistenceStore != null) {
                if (0 == 0) {
                    apiStatePersistenceStore.close();
                    return;
                }
                try {
                    apiStatePersistenceStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (apiStatePersistenceStore != null) {
                if (0 != 0) {
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    apiStatePersistenceStore.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testProduceConsumeMultipleRecords() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        Throwable th = null;
        try {
            try {
                CruiseControlException cruiseControlException = new CruiseControlException("test message");
                BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, cruiseControlException);
                apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
                BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(2, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_FAILED, cruiseControlException);
                apiStatePersistenceStore.save(brokerRemovalStateRecord2, true);
                Assert.assertEquals(brokerRemovalStateRecord, apiStatePersistenceStore.getBrokerRemovalStateRecord(1));
                Assert.assertEquals(brokerRemovalStateRecord2, apiStatePersistenceStore.getBrokerRemovalStateRecord(2));
                Assert.assertEquals(2L, apiStatePersistenceStore.getAllBrokerRemovalStateRecords().size());
                if (apiStatePersistenceStore != null) {
                    if (0 == 0) {
                        apiStatePersistenceStore.close();
                        return;
                    }
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (apiStatePersistenceStore != null) {
                if (th != null) {
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    apiStatePersistenceStore.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testProduceConsumeBrokerStateRecordMultipleTime() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        Throwable th = null;
        try {
            try {
                BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_INITIATED, new CruiseControlException("test message"));
                apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
                BrokerRemovalStateRecord brokerRemovalStateRecord2 = apiStatePersistenceStore.getBrokerRemovalStateRecord(1);
                Assert.assertEquals(brokerRemovalStateRecord, brokerRemovalStateRecord2);
                long startTime = brokerRemovalStateRecord2.startTime();
                BrokerRemovalStateRecord brokerRemovalStateRecord3 = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_FAILED, new CruiseControlException("updated message"));
                brokerRemovalStateRecord3.setStartTime(brokerRemovalStateRecord2.startTime());
                apiStatePersistenceStore.save(brokerRemovalStateRecord3, false);
                BrokerRemovalStateRecord brokerRemovalStateRecord4 = apiStatePersistenceStore.getBrokerRemovalStateRecord(1);
                Assert.assertEquals(brokerRemovalStateRecord3, brokerRemovalStateRecord4);
                Assert.assertTrue("Start time should be set.", brokerRemovalStateRecord4.startTime() > 0);
                Assert.assertEquals(brokerRemovalStateRecord3.startTime(), brokerRemovalStateRecord4.startTime());
                Assert.assertTrue("Last update time should be set", brokerRemovalStateRecord4.lastUpdateTime() > 0);
                Assert.assertNotEquals(brokerRemovalStateRecord4.startTime(), brokerRemovalStateRecord4.lastUpdateTime());
                Assert.assertEquals(brokerRemovalStateRecord4.startTime(), startTime);
                if (apiStatePersistenceStore != null) {
                    if (0 == 0) {
                        apiStatePersistenceStore.close();
                        return;
                    }
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (apiStatePersistenceStore != null) {
                if (th != null) {
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    apiStatePersistenceStore.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testApiStatusExistAfterRestart() throws Exception {
        KafkaConfig kafkaConfig = getKafkaConfig();
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap());
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_INITIATED, (Exception) null);
        apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
        long startTime = brokerRemovalStateRecord.startTime();
        long lastUpdateTime = brokerRemovalStateRecord.lastUpdateTime();
        apiStatePersistenceStore.save(new BrokerRemovalStateRecord(2, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_INITIATED, (Exception) null), true);
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null);
        brokerRemovalStateRecord2.setStartTime(startTime);
        apiStatePersistenceStore.save(brokerRemovalStateRecord2, false);
        BrokerRemovalStateRecord brokerRemovalStateRecord3 = apiStatePersistenceStore.getBrokerRemovalStateRecord(1);
        Assert.assertEquals(startTime, brokerRemovalStateRecord3.startTime());
        Assert.assertNotEquals(lastUpdateTime, brokerRemovalStateRecord3.lastUpdateTime());
        long lastUpdateTime2 = brokerRemovalStateRecord3.lastUpdateTime();
        apiStatePersistenceStore.close();
        ApiStatePersistenceStore apiStatePersistenceStore2 = new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap());
        Assert.assertEquals("There should only be two api states.", 2L, apiStatePersistenceStore2.getAllBrokerRemovalStateRecords().size());
        BrokerRemovalStateRecord brokerRemovalStateRecord4 = apiStatePersistenceStore2.getBrokerRemovalStateRecord(1);
        Assert.assertEquals(brokerRemovalStateRecord3, brokerRemovalStateRecord4);
        Assert.assertTrue("Start time should be set.", brokerRemovalStateRecord4.startTime() > 0);
        Assert.assertEquals(brokerRemovalStateRecord4.startTime(), startTime);
        Assert.assertTrue("Last update time should be set", brokerRemovalStateRecord4.lastUpdateTime() > 0);
        Assert.assertNotEquals(brokerRemovalStateRecord4.startTime(), brokerRemovalStateRecord4.lastUpdateTime());
        Assert.assertEquals(lastUpdateTime2, brokerRemovalStateRecord4.lastUpdateTime());
        BrokerRemovalStateRecord brokerRemovalStateRecord5 = apiStatePersistenceStore2.getBrokerRemovalStateRecord(2);
        Assert.assertNull("Exception is not null: " + brokerRemovalStateRecord5.exception(), brokerRemovalStateRecord5.exception());
        Assert.assertEquals(BrokerRemovalDescription.BrokerShutdownStatus.COMPLETE, brokerRemovalStateRecord5.brokerShutdownStatus());
        Assert.assertEquals(BrokerRemovalDescription.PartitionReassignmentsStatus.IN_PROGRESS, brokerRemovalStateRecord5.partitionReassignmentsStatus());
        Assert.assertEquals(brokerRemovalStateRecord5.startTime(), brokerRemovalStateRecord5.lastUpdateTime());
        apiStatePersistenceStore2.close();
    }

    @Test
    public void testCheckStartupCondition() throws Exception {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(KafkaCruiseControlUtils.filterAdminClientConfigs(getConfig()));
        try {
            ApiStatePersistenceStore.checkStartupCondition(new KafkaCruiseControlConfig(getConfig()), new Semaphore(0));
            Assert.assertTrue(((Set) createAdmin.listTopics().names().get()).contains(API_STATE_TOPIC));
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    @Test
    public void testCheckTopicCreated() throws Exception {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(KafkaCruiseControlUtils.filterAdminClientConfigs(getConfig()));
        try {
            Map mergedConfigValues = new KafkaCruiseControlConfig(getConfig()).mergedConfigValues();
            SbkTopicUtils.SbkTopicConfig topicConfig = ApiStatePersistenceStore.getTopicConfig(ApiStatePersistenceStore.getApiStatePersistenceStoreTopicName(mergedConfigValues), mergedConfigValues);
            Assert.assertFalse(ApiStatePersistenceStore.checkTopicCreated(mergedConfigValues, topicConfig));
            TestUtils.waitForCondition(() -> {
                return ((Set) createAdmin.listTopics().names().get()).contains(API_STATE_TOPIC);
            }, 30000L, "_ApiStatePersistenceTestStore can't be created.");
            TestUtils.waitForCondition(() -> {
                return ApiStatePersistenceStore.checkTopicCreated(mergedConfigValues, topicConfig);
            }, 30000L, "Check _ApiStatePersistenceTestStore exists.");
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    private KafkaConfig getKafkaConfig() {
        return new KafkaConfig(getConfig());
    }

    private Map<String, String> getConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.balancer.api.state.topic", API_STATE_TOPIC);
        hashMap.put("bootstrap.servers", this.kafkaCluster.bootstrapServers());
        hashMap.put("zookeeper.connect", this.kafkaCluster.zkConnect());
        hashMap.put("zookeeper.security.enabled", "false");
        return hashMap;
    }
}
