package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.collection.Iterable;
import scala.collection.JavaConverters;
import scala.collection.mutable.Set;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/KafkaSampleStoreTest.class */
public class KafkaSampleStoreTest extends CCKafkaIntegrationTestHarness {
    private static final String PARTITION_TOPIC = "__KafkaCruiseControlPartitionMetricSamples";
    private static final String BROKER_TOPIC = "__KafkaCruiseControlModelTrainingSamples";

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    public int clusterSize() {
        return 3;
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSampleStoreCreatesTopicWithCorrectConfigsWhenItDoesNotExist() {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "LoadMonitorTaskRunnerGroup", "LoadMonitorTaskRunnerSetup", false);
        try {
            Map mergedConfigValues = new KafkaCruiseControlConfig(getConfig()).mergedConfigValues();
            KafkaSampleStore kafkaSampleStore = new KafkaSampleStore();
            kafkaSampleStore.configure(mergedConfigValues);
            try {
                Map<TopicPartition, Collection<Integer>> topicPartitionAssignments = getTopicPartitionAssignments(createKafkaZkClient);
                Assert.assertEquals(64L, topicPartitionAssignments.size());
                Iterator<Map.Entry<TopicPartition, Collection<Integer>>> it = topicPartitionAssignments.entrySet().iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(3L, it.next().getValue().size());
                }
                kafkaSampleStore.close();
            } catch (Throwable th) {
                kafkaSampleStore.close();
                throw th;
            }
        } finally {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        }
    }

    private Map<String, String> getConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("partition.metric.sample.store.topic", PARTITION_TOPIC);
        hashMap.put("broker.metric.sample.store.topic", BROKER_TOPIC);
        hashMap.put("skip.sample.store.topic.rack.awareness.check", "true");
        hashMap.put("bootstrap.servers", bootstrapServers());
        hashMap.put("zookeeper.connect", zookeeper().connectionString());
        hashMap.put("partition.metrics.window.ms", "300000");
        hashMap.put("broker.metrics.window.ms", "30000");
        hashMap.put("num.partition.metrics.windows", "1");
        hashMap.put("num.broker.metrics.windows", "1");
        hashMap.put("reconnect.backoff.ms", "5000");
        hashMap.put("zookeeper.security.enabled", "false");
        hashMap.put("default.goals", "io.confluent.cruisecontrol.analyzer.goals.CrossRackMovementGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal");
        hashMap.put("anomaly.detection.goals", "io.confluent.cruisecontrol.analyzer.goals.CrossRackMovementGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal");
        return hashMap;
    }

    private Map<TopicPartition, Collection<Integer>> getTopicPartitionAssignments(KafkaZkClient kafkaZkClient) {
        HashSet hashSet = new HashSet();
        hashSet.add(PARTITION_TOPIC);
        hashSet.add(BROKER_TOPIC);
        return (Map) ((Map) JavaConverters.mapAsJavaMapConverter(kafkaZkClient.getReplicaAssignmentForTopics(((Set) JavaConverters.asScalaSetConverter(hashSet).asScala()).toSet())).asJava()).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (List) JavaConverters.asJavaCollectionConverter((Iterable) entry.getValue()).asJavaCollection().stream().map(obj -> {
                return (Integer) obj;
            }).collect(Collectors.toList());
        }));
    }

    @Test
    public void testCheckTopicsCreated() throws InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "LoadMonitorTaskRunnerGroup", "LoadMonitorTaskRunnerSetup", false);
        try {
            Map mergedConfigValues = new KafkaCruiseControlConfig(getConfig()).mergedConfigValues();
            Assert.assertFalse(KafkaSampleStore.checkTopicsCreated(mergedConfigValues));
            java.util.Set asJavaSet = JavaConverters.setAsJavaSet(createKafkaZkClient.getAllTopicsInCluster(false));
            Assert.assertTrue(asJavaSet.contains(PARTITION_TOPIC));
            Assert.assertTrue(asJavaSet.contains(BROKER_TOPIC));
            int i = 0;
            while (i < 10 && !KafkaSampleStore.checkTopicsCreated(mergedConfigValues)) {
                Thread.sleep(2000L);
                i++;
            }
            Assert.assertTrue("Topics not created after " + (i * 2000) + " ms", i < 10);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }
}
