package io.confluent.databalancer.persistence;

import com.linkedin.cruisecontrol.exception.CruiseControlException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.SbkTopicUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.databalancer.ImmutableSet;
import io.confluent.databalancer.TestConstants;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.EvenClusterLoadStateMachine;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.BrokerShutdownStatus;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.PartitionReassignmentsStatus;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag(TestConstants.INTEGRATION_TEST)
@Timeout(value = 1, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/databalancer/persistence/ApiStatePersistenceStoreTest.class */
public class ApiStatePersistenceStoreTest {
    private static final String API_STATE_TOPIC = "_ApiStatePersistenceTestStore";
    private static final long TEST_TIMEOUT = 60000;
    private EmbeddedKafkaCluster kafkaCluster;
    private MockTime time = new MockTime();

    /* loaded from: input_file:io/confluent/databalancer/persistence/ApiStatePersistenceStoreTest$TestException.class */
    static class TestException extends Exception {
        public TestException(String str) {
            super(str);
        }

        public boolean equals(Object obj) {
            return obj instanceof TestException;
        }

        public int hashCode() {
            return super.hashCode();
        }
    }

    protected int numBrokers() {
        return 3;
    }

    @BeforeEach
    public void setUp() {
        this.kafkaCluster = new EmbeddedKafkaCluster();
        this.kafkaCluster.startZooKeeper();
        this.kafkaCluster.startBrokers(numBrokers(), new Properties());
    }

    @AfterEach
    public void tearDown() {
        if (this.kafkaCluster != null) {
            this.kafkaCluster.shutdown();
        }
    }

    @Test
    public void testProduceConsumeOneRecord() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        Throwable th = null;
        try {
            try {
                HashSet hashSet = new HashSet(Arrays.asList(1, 2));
                BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_FAILED, new CruiseControlException("test message"), false);
                apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
                BrokerRemovalStateRecord brokerRemovalStateRecord2 = apiStatePersistenceStore.getBrokerRemovalStateRecord(new ImmutableSet(hashSet));
                Assertions.assertEquals(brokerRemovalStateRecord, brokerRemovalStateRecord2);
                Assertions.assertEquals("test message", brokerRemovalStateRecord2.exception().getMessage());
                Assertions.assertEquals(BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_FAILED.brokerShutdownStatus(), brokerRemovalStateRecord2.brokerShutdownStatus());
                Assertions.assertEquals(BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_FAILED.partitionReassignmentsStatus(), brokerRemovalStateRecord2.partitionReassignmentsStatus());
                Assertions.assertTrue(brokerRemovalStateRecord2.startTime() > 0, "Start time should be set.");
                Assertions.assertTrue(brokerRemovalStateRecord2.lastUpdateTime() > 0, "Last update time should be set");
                Assertions.assertEquals(brokerRemovalStateRecord2.startTime(), brokerRemovalStateRecord2.lastUpdateTime());
                if (apiStatePersistenceStore != null) {
                    if (0 == 0) {
                        apiStatePersistenceStore.close();
                        return;
                    }
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (apiStatePersistenceStore != null) {
                if (th != null) {
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    apiStatePersistenceStore.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testProduceConsumeMultipleRecords() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        Throwable th = null;
        try {
            try {
                HashSet hashSet = new HashSet(Arrays.asList(1, 2));
                CruiseControlException cruiseControlException = new CruiseControlException("test message");
                BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, cruiseControlException, false);
                apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
                HashSet hashSet2 = new HashSet(Arrays.asList(1, 2, 3));
                BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(hashSet2, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_FAILED, cruiseControlException, true);
                apiStatePersistenceStore.save(brokerRemovalStateRecord2, true);
                Assertions.assertEquals(brokerRemovalStateRecord, apiStatePersistenceStore.getBrokerRemovalStateRecord(new ImmutableSet(hashSet)));
                Assertions.assertEquals(brokerRemovalStateRecord2, apiStatePersistenceStore.getBrokerRemovalStateRecord(new ImmutableSet(hashSet2)));
                Assertions.assertEquals(2, apiStatePersistenceStore.getAllBrokerRemovalStateRecords().size());
                if (apiStatePersistenceStore != null) {
                    if (0 == 0) {
                        apiStatePersistenceStore.close();
                        return;
                    }
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (apiStatePersistenceStore != null) {
                if (th != null) {
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    apiStatePersistenceStore.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testProduceConsumeBrokerStateRecordMultipleTime() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        Throwable th = null;
        try {
            HashSet hashSet = new HashSet(Arrays.asList(1, 2));
            BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_INITIATED, new CruiseControlException("test message"), false);
            apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
            BrokerRemovalStateRecord brokerRemovalStateRecord2 = apiStatePersistenceStore.getBrokerRemovalStateRecord(new ImmutableSet(hashSet));
            Assertions.assertEquals(brokerRemovalStateRecord, brokerRemovalStateRecord2);
            long startTime = brokerRemovalStateRecord2.startTime();
            BrokerRemovalStateRecord brokerRemovalStateRecord3 = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.EXCLUSION_FAILED, new CruiseControlException("updated message"), false);
            brokerRemovalStateRecord3.setStartTime(brokerRemovalStateRecord2.startTime());
            apiStatePersistenceStore.save(brokerRemovalStateRecord3, false);
            BrokerRemovalStateRecord brokerRemovalStateRecord4 = apiStatePersistenceStore.getBrokerRemovalStateRecord(new ImmutableSet(hashSet));
            Assertions.assertEquals(brokerRemovalStateRecord3, brokerRemovalStateRecord4);
            Assertions.assertTrue(brokerRemovalStateRecord4.startTime() > 0, "Start time should be set.");
            Assertions.assertEquals(brokerRemovalStateRecord3.startTime(), brokerRemovalStateRecord4.startTime());
            Assertions.assertTrue(brokerRemovalStateRecord4.lastUpdateTime() > 0, "Last update time should be set");
            Assertions.assertNotEquals(brokerRemovalStateRecord4.startTime(), brokerRemovalStateRecord4.lastUpdateTime());
            Assertions.assertEquals(brokerRemovalStateRecord4.startTime(), startTime);
            if (apiStatePersistenceStore != null) {
                if (0 == 0) {
                    apiStatePersistenceStore.close();
                    return;
                }
                try {
                    apiStatePersistenceStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (apiStatePersistenceStore != null) {
                if (0 != 0) {
                    try {
                        apiStatePersistenceStore.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    apiStatePersistenceStore.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testApiStatusExistAfterRestart() throws Exception {
        KafkaConfig kafkaConfig = getKafkaConfig();
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap());
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        HashSet hashSet2 = new HashSet(Arrays.asList(1, 2, 3));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_INITIATED, (Exception) null, false);
        apiStatePersistenceStore.save(brokerRemovalStateRecord, true);
        long startTime = brokerRemovalStateRecord.startTime();
        long lastUpdateTime = brokerRemovalStateRecord.lastUpdateTime();
        apiStatePersistenceStore.save(new BrokerRemovalStateRecord(hashSet2, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_INITIATED, (Exception) null, true), true);
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null, false);
        brokerRemovalStateRecord2.setStartTime(startTime);
        apiStatePersistenceStore.save(brokerRemovalStateRecord2, false);
        BrokerRemovalStateRecord brokerRemovalStateRecord3 = apiStatePersistenceStore.getBrokerRemovalStateRecord(new ImmutableSet(hashSet));
        Assertions.assertEquals(startTime, brokerRemovalStateRecord3.startTime());
        Assertions.assertNotEquals(lastUpdateTime, brokerRemovalStateRecord3.lastUpdateTime());
        long lastUpdateTime2 = brokerRemovalStateRecord3.lastUpdateTime();
        apiStatePersistenceStore.close();
        ApiStatePersistenceStore apiStatePersistenceStore2 = new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap());
        Assertions.assertEquals(2, apiStatePersistenceStore2.getAllBrokerRemovalStateRecords().size(), "There should only be two api states.");
        BrokerRemovalStateRecord brokerRemovalStateRecord4 = apiStatePersistenceStore2.getBrokerRemovalStateRecord(new ImmutableSet(hashSet));
        Assertions.assertEquals(brokerRemovalStateRecord3, brokerRemovalStateRecord4);
        Assertions.assertFalse(brokerRemovalStateRecord4.shouldShutdown(), "Expected the first broker removal to have its shutdown scheduled flag set to false");
        Assertions.assertTrue(brokerRemovalStateRecord4.startTime() > 0, "Start time should be set.");
        Assertions.assertEquals(brokerRemovalStateRecord4.startTime(), startTime);
        Assertions.assertTrue(brokerRemovalStateRecord4.lastUpdateTime() > 0, "Last update time should be set");
        Assertions.assertNotEquals(brokerRemovalStateRecord4.startTime(), brokerRemovalStateRecord4.lastUpdateTime());
        Assertions.assertEquals(lastUpdateTime2, brokerRemovalStateRecord4.lastUpdateTime());
        BrokerRemovalStateRecord brokerRemovalStateRecord5 = apiStatePersistenceStore2.getBrokerRemovalStateRecord(new ImmutableSet(hashSet2));
        Assertions.assertTrue(brokerRemovalStateRecord5.shouldShutdown(), "Expected the second broker removal to have its shutdown scheduled flag set to true");
        Assertions.assertNull(brokerRemovalStateRecord5.exception(), "Exception is not null: " + brokerRemovalStateRecord5.exception());
        Assertions.assertEquals(BrokerShutdownStatus.PENDING, brokerRemovalStateRecord5.brokerShutdownStatus());
        Assertions.assertEquals(PartitionReassignmentsStatus.IN_PROGRESS, brokerRemovalStateRecord5.partitionReassignmentsStatus());
        Assertions.assertEquals(brokerRemovalStateRecord5.startTime(), brokerRemovalStateRecord5.lastUpdateTime());
        apiStatePersistenceStore2.close();
    }

    @Test
    public void testCheckStartupCondition() throws Exception {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(KafkaCruiseControlUtils.filterAdminClientConfigs(getConfig()));
        try {
            ApiStatePersistenceStore.checkStartupCondition(new KafkaCruiseControlConfig(getConfig()), new Semaphore(0));
            Assertions.assertTrue(((Set) createAdmin.listTopics().names().get()).contains(API_STATE_TOPIC));
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    @Test
    public void testCheckTopicCreated() throws Exception {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(KafkaCruiseControlUtils.filterAdminClientConfigs(getConfig()));
        try {
            Map mergedConfigValues = new KafkaCruiseControlConfig(getConfig()).mergedConfigValues();
            SbkTopicUtils.SbkTopicConfig topicConfig = ApiStatePersistenceStore.getTopicConfig(ApiStatePersistenceStore.getApiStatePersistenceStoreTopicName(mergedConfigValues), mergedConfigValues);
            Assertions.assertFalse(ApiStatePersistenceStore.checkTopicCreated(mergedConfigValues, topicConfig));
            TestUtils.waitForCondition(() -> {
                return ((Set) createAdmin.listTopics().names().get()).contains(API_STATE_TOPIC);
            }, 30000L, "_ApiStatePersistenceTestStore can't be created.");
            TestUtils.waitForCondition(() -> {
                return ApiStatePersistenceStore.checkTopicCreated(mergedConfigValues, topicConfig);
            }, 30000L, "Check _ApiStatePersistenceTestStore exists.");
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
            throw th;
        }
    }

    @Test
    public void testFailedBrokers() throws Exception {
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(getKafkaConfig(), this.time, Collections.emptyMap());
        HashMap hashMap = new HashMap();
        hashMap.put(1, Long.valueOf(System.currentTimeMillis()));
        apiStatePersistenceStore.save(hashMap);
        Assertions.assertEquals(hashMap, apiStatePersistenceStore.getFailedBrokers());
        apiStatePersistenceStore.save(Collections.emptyMap());
        Map failedBrokers = apiStatePersistenceStore.getFailedBrokers();
        Assertions.assertTrue(failedBrokers.isEmpty(), "Emtpy list of brokers cannot be stored: " + failedBrokers);
        hashMap.put(2, Long.valueOf(System.currentTimeMillis()));
        hashMap.put(3, Long.valueOf(System.currentTimeMillis()));
        apiStatePersistenceStore.save(hashMap);
        Assertions.assertEquals(hashMap, apiStatePersistenceStore.getFailedBrokers());
        hashMap.clear();
        hashMap.put(4, Long.valueOf(System.currentTimeMillis()));
        hashMap.put(5, Long.valueOf(System.currentTimeMillis()));
        apiStatePersistenceStore.save(hashMap);
        Assertions.assertEquals(hashMap, apiStatePersistenceStore.getFailedBrokers());
        hashMap.remove(5);
        apiStatePersistenceStore.save(hashMap);
        Assertions.assertEquals(hashMap, apiStatePersistenceStore.getFailedBrokers());
    }

    @Test
    public void testSerializeDeserializeException() {
        TestException testException = new TestException("Test error");
        Assertions.assertEquals(testException, ApiStatePersistenceStore.deserializeException(ApiStatePersistenceStore.serializeException(testException)));
        Assertions.assertNull(ApiStatePersistenceStore.deserializeException(ApiStatePersistenceStore.serializeException((Exception) null)));
    }

    @Test
    public void testEvenClusterLoadState() throws Exception {
        KafkaConfig kafkaConfig = getKafkaConfig();
        ApiStatePersistenceStore apiStatePersistenceStore = new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap());
        EvenClusterLoadStateRecord evenClusterLoadStateRecord = new EvenClusterLoadStateRecord(EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCED, 50L, (Long) null, (Exception) null, (EvenClusterLoadStateMachine.EvenClusterLoadState) null, (Long) null, (Long) null, (Exception) null);
        apiStatePersistenceStore.save(evenClusterLoadStateRecord);
        Assertions.assertEquals(evenClusterLoadStateRecord, apiStatePersistenceStore.getEvenClusterLoadStateRecord());
        TestException testException = new TestException("Balancing failed.");
        EvenClusterLoadStateRecord evenClusterLoadStateRecord2 = new EvenClusterLoadStateRecord(EvenClusterLoadStateMachine.EvenClusterLoadState.ABORTED, 100L, 110L, new TestException("Balancer component shutting down."), EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCING_FAILED, 90L, 100L, testException);
        apiStatePersistenceStore.save(evenClusterLoadStateRecord2);
        apiStatePersistenceStore.close();
        ApiStatePersistenceStore apiStatePersistenceStore2 = new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap());
        Assertions.assertEquals(evenClusterLoadStateRecord2, apiStatePersistenceStore2.getEvenClusterLoadStateRecord());
        EvenClusterLoadStateRecord evenClusterLoadStateRecord3 = new EvenClusterLoadStateRecord(EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCED, 100L, 115L, (Exception) null, EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCING_FAILED, 90L, 100L, testException);
        apiStatePersistenceStore2.save(evenClusterLoadStateRecord3);
        Assertions.assertEquals(evenClusterLoadStateRecord3, apiStatePersistenceStore2.getEvenClusterLoadStateRecord());
        EvenClusterLoadStateRecord evenClusterLoadStateRecord4 = new EvenClusterLoadStateRecord(EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCING, 120L, 120L, (Exception) null, EvenClusterLoadStateMachine.EvenClusterLoadState.BALANCED, 100L, 115L, (Exception) null);
        apiStatePersistenceStore2.save(evenClusterLoadStateRecord4);
        apiStatePersistenceStore2.close();
        Assertions.assertEquals(evenClusterLoadStateRecord4, new ApiStatePersistenceStore(kafkaConfig, this.time, Collections.emptyMap()).getEvenClusterLoadStateRecord());
    }

    private KafkaConfig getKafkaConfig() {
        return new KafkaConfig(getConfig());
    }

    private Map<String, String> getConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.balancer.api.state.topic", API_STATE_TOPIC);
        hashMap.put("bootstrap.servers", this.kafkaCluster.bootstrapServers());
        hashMap.put("zookeeper.connect", this.kafkaCluster.zkConnect());
        hashMap.put("zookeeper.security.enabled", "false");
        return hashMap;
    }
}
