/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.Function;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.ControllerMetricsManager;
import org.apache.kafka.controller.MockControllerMetrics;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

final class ControllerMetricsManagerTest {
    ControllerMetricsManagerTest() {
    }

    @Test
    public void testActiveBrokerRegistration() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, false));
        Assertions.assertEquals((int)1, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testFenceBrokerRegistration() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true));
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)1, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testBrokerChangedToActive() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerChange(1, 1L, BrokerRegistrationFencingChange.UNFENCE));
        Assertions.assertEquals((int)1, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testBrokerLegacyChangedToActive() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerUnfence(1, 1L));
        Assertions.assertEquals((int)1, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testBrokerChangedToFence() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, false));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerChange(1, 1L, BrokerRegistrationFencingChange.FENCE));
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)1, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testBrokerLegacyChangedToFence() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, false));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerFence(1, 1L));
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)1, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testBrokerUnchanged() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerChange(1, 1L, BrokerRegistrationFencingChange.NONE));
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)1, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testBrokerUnregister() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(2, 1L, false));
        Assertions.assertEquals((int)1, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)1, (int)metrics.fencedBrokerCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerUnregistration(1, 1L));
        Assertions.assertEquals((int)1, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerUnregistration(2, 1L));
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testReplayBatch() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replayBatch(0L, Arrays.asList(new ApiMessageAndVersion((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true), 0), new ApiMessageAndVersion((ApiMessage)ControllerMetricsManagerTest.brokerChange(1, 1L, BrokerRegistrationFencingChange.UNFENCE), 0)));
        Assertions.assertEquals((int)1, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
    }

    @Test
    public void testTopicCountIncreased() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test"));
        Assertions.assertEquals((int)1, (int)metrics.globalTopicCount());
    }

    @Test
    public void testTopicCountDecreased() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid id = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(id));
        Assertions.assertEquals((int)0, (int)metrics.globalTopicCount());
    }

    @Test
    public void testPartitionCountIncreased() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid id = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", id));
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((int)1, (int)metrics.globalPartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((int)2, (int)metrics.globalPartitionCount());
    }

    @Test
    public void testPartitionCountDecreased() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid createTopicId = Uuid.randomUuid();
        Uuid createPartitionTopicId = new Uuid(createTopicId.getMostSignificantBits(), createTopicId.getLeastSignificantBits());
        Uuid removeTopicId = new Uuid(createTopicId.getMostSignificantBits(), createTopicId.getLeastSignificantBits());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", createTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(createPartitionTopicId, 0, 0, Arrays.asList(0, 1, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(createPartitionTopicId, 1, 0, Arrays.asList(0, 1, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(removeTopicId));
    }

    @Test
    public void testOfflinePartition() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid id = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, -1, Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((int)1, (int)metrics.globalOfflinePartitionCount());
    }

    @Test
    public void testImbalancedPartition() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid id = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, 1, Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((int)1, (int)metrics.preferredReplicaImbalanceCount());
    }

    @Test
    public void testPartitionChange() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid id = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(id, 0, OptionalInt.of(-1), Optional.empty()));
        Assertions.assertEquals((int)1, (int)metrics.globalOfflinePartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(id, 0, OptionalInt.of(1), Optional.empty()));
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(id, 0, OptionalInt.of(0), Optional.empty()));
        Assertions.assertEquals((int)0, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(id, 0, OptionalInt.empty(), Optional.of(Arrays.asList(1, 2, 0))));
        Assertions.assertEquals((int)1, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(id, 0, OptionalInt.of(2), Optional.of(Arrays.asList(2, 0, 1))));
        Assertions.assertEquals((int)0, (int)metrics.preferredReplicaImbalanceCount());
    }

    @Test
    public void testMinIsrConfig() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Uuid id = Uuid.randomUuid();
        String topicName = "test";
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord(topicName, id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2), Arrays.asList(0, 1)));
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicConfigRecord(topicName, "min.insync.replicas", "3"));
        Assertions.assertEquals((int)1, (int)metrics.globalUnderMinIsrCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicConfigRecord(topicName, "min.insync.replicas", null));
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.clusterConfigRecord("min.insync.replicas", "3"));
        Assertions.assertEquals((int)1, (int)metrics.globalUnderMinIsrCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.clusterConfigRecord("min.insync.replicas", null));
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
    }

    @Test
    public void testMinIsrConfigBeforeTopic() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics, Optional.of(ControllerMetricsManagerTest::topicNameToTenant));
        Uuid id = Uuid.randomUuid();
        String topicName = "lkc-123_test";
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicConfigRecord(topicName, "min.insync.replicas", "3"));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord(topicName, id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2), Arrays.asList(0, 1)));
        Assertions.assertEquals((int)1, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().size());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get("lkc-123").getPartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(id));
        Assertions.assertEquals((int)0, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().size());
    }

    @Test
    public void testStartingMetrics() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.preferredReplicaImbalanceCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
    }

    @Test
    public void testReset() {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics);
        manager.replay((ApiMessage)ControllerMetricsManagerTest.brokerRegistration(1, 1L, true));
        Uuid id = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", id));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2), Arrays.asList(0)));
        manager.reset();
        Assertions.assertEquals((int)0, (int)metrics.activeBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.fencedBrokerCount());
        Assertions.assertEquals((int)0, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.preferredReplicaImbalanceCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
    }

    @Test
    public void testTenantPartitionMetrics1() throws Exception {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics, Optional.of(ControllerMetricsManagerTest::topicNameToTenant));
        Uuid nonTenantTopicId = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("test", nonTenantTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(nonTenantTopicId, 0, 0, Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((int)1, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().size());
        Uuid tenantTopicId = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord("lkc-123_bar", tenantTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(tenantTopicId, 0, 0, Arrays.asList(0, 1, 2)));
        Assertions.assertEquals((int)2, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)2, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().size());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get("lkc-123").getPartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(nonTenantTopicId));
        Assertions.assertEquals((int)1, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)1, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get("lkc-123").getPartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(tenantTopicId));
        Assertions.assertEquals((int)0, (int)metrics.globalTopicCount());
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().size());
    }

    @Test
    public void testTenantPartitionMetrics2() throws Exception {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics, Optional.of(ControllerMetricsManagerTest::topicNameToTenant));
        String tenant = "lkc-xyz";
        String fooTopicName = tenant + "_foo";
        Uuid fooTopicId = Uuid.randomUuid();
        String zarTopicName = tenant + "_zar";
        Uuid zarTopicId = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord(fooTopicName, fooTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(fooTopicId, 0, 0, Arrays.asList(0, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(fooTopicId, 1, 0, Arrays.asList(0, 1)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord(zarTopicName, zarTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(zarTopicId, 0, 0, Arrays.asList(0, 1, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(zarTopicId, 1, 1, Arrays.asList(1, 2, 3)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(zarTopicId, 2, 1, Arrays.asList(1, 2, 0)));
        Assertions.assertEquals((int)5, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)5, (int)metrics.tenantMetrics().get(tenant).getPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 0, OptionalInt.of(2), Optional.of(Collections.singletonList(2)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 1, OptionalInt.of(1), Optional.of(Collections.singletonList(1)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 0, OptionalInt.of(1), Optional.of(Arrays.asList(1, 2)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 2, OptionalInt.empty(), Optional.of(Arrays.asList(1, 2)), Optional.empty()));
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        Assertions.assertEquals((int)3, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 1, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 0, OptionalInt.of(2), Optional.of(Collections.singletonList(2)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 1, OptionalInt.of(2), Optional.of(Arrays.asList(2, 3)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 2, OptionalInt.of(2), Optional.of(Collections.singletonList(2)), Optional.empty()));
        Assertions.assertEquals((int)1, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)3, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)3, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        Assertions.assertEquals((int)5, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 0, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 0, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 1, OptionalInt.of(3), Optional.of(Collections.singletonList(3)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 2, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        Assertions.assertEquals((int)4, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)4, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 1, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        Assertions.assertEquals((int)5, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)5, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(fooTopicId));
        Assertions.assertEquals((int)3, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)3, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)3, (int)metrics.tenantMetrics().get(tenant).getPartitionCount());
        Assertions.assertEquals((int)3, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(zarTopicId));
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().size());
    }

    @Test
    public void testTenantGlobalPartitionMetrics() throws Exception {
        MockControllerMetrics metrics = new MockControllerMetrics();
        ControllerMetricsManager manager = ControllerMetricsManagerTest.metricsManager(metrics, Optional.of(ControllerMetricsManagerTest::topicNameToTenant));
        String tenant = "lkc-xyz";
        String fooTopicName = tenant + "_foo";
        Uuid fooTopicId = Uuid.randomUuid();
        String zarTopicName = "_zar";
        Uuid zarTopicId = Uuid.randomUuid();
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord(fooTopicName, fooTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(fooTopicId, 0, 0, Arrays.asList(0, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(fooTopicId, 1, 0, Arrays.asList(0, 1)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.topicRecord(zarTopicName, zarTopicId));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(zarTopicId, 0, 0, Arrays.asList(0, 1, 2)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(zarTopicId, 1, 1, Arrays.asList(1, 2, 3)));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionRecord(zarTopicId, 2, 1, Arrays.asList(1, 2, 0)));
        Assertions.assertEquals((int)5, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 0, OptionalInt.of(2), Optional.of(Collections.singletonList(2)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 1, OptionalInt.of(1), Optional.of(Collections.singletonList(1)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 0, OptionalInt.of(1), Optional.of(Arrays.asList(1, 2)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 2, OptionalInt.empty(), Optional.of(Arrays.asList(1, 2)), Optional.empty()));
        Assertions.assertEquals((int)5, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)2, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        Assertions.assertEquals((int)3, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 1, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 0, OptionalInt.of(2), Optional.of(Collections.singletonList(2)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 1, OptionalInt.of(2), Optional.of(Arrays.asList(2, 3)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 2, OptionalInt.of(2), Optional.of(Collections.singletonList(2)), Optional.empty()));
        Assertions.assertEquals((int)1, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)3, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        Assertions.assertEquals((int)5, (int)metrics.preferredReplicaImbalanceCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(fooTopicId, 0, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 0, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 1, OptionalInt.of(3), Optional.of(Collections.singletonList(3)), Optional.empty()));
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 2, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        Assertions.assertEquals((int)5, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)4, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)1, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getPartitionCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.partitionChangeRecord(zarTopicId, 1, OptionalInt.of(-1), Optional.empty(), Optional.empty()));
        Assertions.assertEquals((int)5, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)2, (int)metrics.tenantMetrics().get(tenant).getOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().get(tenant).getUnderMinIsrPartitionsCount());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(fooTopicId));
        Assertions.assertEquals((int)3, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)3, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().size());
        manager.replay((ApiMessage)ControllerMetricsManagerTest.removeTopicRecord(zarTopicId));
        Assertions.assertEquals((int)0, (int)metrics.globalPartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalOfflinePartitionCount());
        Assertions.assertEquals((int)0, (int)metrics.globalUnderMinIsrCount());
        Assertions.assertEquals((int)0, (int)metrics.tenantMetrics().size());
    }

    private static RegisterBrokerRecord brokerRegistration(int brokerId, long epoch, boolean fenced) {
        return new RegisterBrokerRecord().setBrokerId(brokerId).setIncarnationId(Uuid.randomUuid()).setBrokerEpoch(epoch).setFenced(fenced);
    }

    private static UnregisterBrokerRecord brokerUnregistration(int brokerId, long epoch) {
        return new UnregisterBrokerRecord().setBrokerId(brokerId).setBrokerEpoch(epoch);
    }

    private static BrokerRegistrationChangeRecord brokerChange(int brokerId, long epoch, BrokerRegistrationFencingChange fencing) {
        return new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(epoch).setFenced(fencing.value());
    }

    private static UnfenceBrokerRecord brokerUnfence(int brokerId, long epoch) {
        return new UnfenceBrokerRecord().setId(brokerId).setEpoch(epoch);
    }

    private static FenceBrokerRecord brokerFence(int brokerId, long epoch) {
        return new FenceBrokerRecord().setId(brokerId).setEpoch(epoch);
    }

    private static TopicRecord topicRecord(String name) {
        return new TopicRecord().setName(name).setTopicId(Uuid.randomUuid());
    }

    private static TopicRecord topicRecord(String name, Uuid id) {
        return new TopicRecord().setName(name).setTopicId(id);
    }

    private static RemoveTopicRecord removeTopicRecord(Uuid id) {
        return new RemoveTopicRecord().setTopicId(id);
    }

    private static PartitionRecord partitionRecord(Uuid id, int partition, int leader, List<Integer> replicas) {
        return ControllerMetricsManagerTest.partitionRecord(id, partition, leader, replicas, replicas);
    }

    private static PartitionRecord partitionRecord(Uuid id, int partition, int leader, List<Integer> replicas, List<Integer> isr) {
        return new PartitionRecord().setPartitionId(partition).setTopicId(id).setReplicas(replicas).setIsr(isr).setLeader(leader);
    }

    private static PartitionChangeRecord partitionChangeRecord(Uuid id, int partition, OptionalInt leader, Optional<List<Integer>> replicas) {
        return ControllerMetricsManagerTest.partitionChangeRecord(id, partition, leader, replicas, replicas);
    }

    private static PartitionChangeRecord partitionChangeRecord(Uuid id, int partition, OptionalInt leader, Optional<List<Integer>> isr, Optional<List<Integer>> replicas) {
        PartitionChangeRecord record = new PartitionChangeRecord();
        leader.ifPresent(record::setLeader);
        replicas.ifPresent(record::setReplicas);
        isr.ifPresent(record::setIsr);
        return record.setPartitionId(partition).setTopicId(id);
    }

    private static ConfigRecord topicConfigRecord(String topicName, String name, String value) {
        return new ConfigRecord().setResourceType(ConfigResource.Type.TOPIC.id()).setResourceName(topicName).setName(name).setValue(value);
    }

    private static ConfigRecord clusterConfigRecord(String name, String value) {
        return new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("").setName(name).setValue(value);
    }

    private static String topicNameToTenant(String name) {
        if (!name.startsWith("lkc-")) {
            return null;
        }
        int delimIndex = name.indexOf("_");
        return delimIndex == -1 ? null : name.substring(0, delimIndex);
    }

    private static ControllerMetricsManager metricsManager(ControllerMetrics metrics) {
        return ControllerMetricsManagerTest.metricsManager(metrics, Optional.empty());
    }

    private static ControllerMetricsManager metricsManager(ControllerMetrics metrics, Optional<Function<String, String>> topicNameToTenant) {
        return new ControllerMetricsManager(metrics, topicNameToTenant, 2);
    }
}

