/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.BrokersToIsrs;
import org.apache.kafka.controller.ConfigurationControlManagerTest;
import org.apache.kafka.controller.ControllerMetrics;
import org.apache.kafka.controller.MockControllerMetrics;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.controller.QuorumControllerTestEnv;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.TopicIdPartition;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=40L)
public class QuorumControllerTest {
    @Test
    public void testCreateAndClose() throws Throwable {
        MockControllerMetrics metrics = new MockControllerMetrics();
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());){
            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, builder -> builder.setMetrics((ControllerMetrics)metrics));
            Throwable throwable = null;
            if (controlEnv != null) {
                if (throwable != null) {
                    try {
                        controlEnv.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    controlEnv.close();
                }
            }
        }
        Assertions.assertTrue((boolean)metrics.isClosed(), (String)"metrics were not closed");
    }

    @Test
    public void testConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(logEnv.clusterId())).get();
            this.testConfigurationOperations(controlEnv.activeController());
        }
    }

    private void testConfigurationOperations(QuorumController controller) throws Throwable {
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), true).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.emptyMap())), controller.describeConfigs(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), controller.incrementalAlterConfigs(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false).get());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.singletonMap("baz", "123"))), controller.describeConfigs(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
    }

    @Test
    public void testDelayedConfigurationOperations() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(logEnv.clusterId())).get();
            this.testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
        }
    }

    private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, QuorumController controller) throws Throwable {
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(0L));
        CompletableFuture future1 = controller.incrementalAlterConfigs(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.singletonMap("baz", ConfigurationControlManagerTest.entry(AlterConfigOp.OpType.SET, "123"))), false);
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, new ResultOrError(Collections.emptyMap())), controller.describeConfigs(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, Collections.emptyList())).get());
        logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2L));
        Assertions.assertEquals(Collections.singletonMap(ConfigurationControlManagerTest.BROKER0, ApiError.NONE), future1.get());
    }

    @Test
    public void testFenceMultipleBrokers() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
        List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 1000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA), OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty());){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.replicationControl().isBrokerUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(createTopicsRequestData).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.replicationControl().isBrokerUnfenced(brokerId.intValue()).booleanValue()) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.replicationControl().isBrokerUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.replicationControl().isBrokerUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            int[] expectedIsr = new int[]{1};
            int[] isrFoo = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).isr;
            Assertions.assertTrue((boolean)Arrays.equals(isrFoo, expectedIsr), (String)("The ISR for topic foo was " + Arrays.toString(isrFoo) + ". It is expected to be " + Arrays.toString(expectedIsr)));
            int fooLeader = active.replicationControl().getPartition((Uuid)topicIdFoo, (int)0).leader;
            Assertions.assertEquals((int)expectedIsr[0], (int)fooLeader);
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
        }
    }

    @Test
    public void testBalancePartitionLeaders() throws Throwable {
        List<Integer> allBrokers = Arrays.asList(1, 2, 3);
        List<Integer> brokersToKeepUnfenced = Arrays.asList(1, 2);
        List<Integer> brokersToFence = Arrays.asList(3);
        short replicationFactor = (short)allBrokers.size();
        short numberOfPartitions = (short)allBrokers.size();
        long sessionTimeoutMillis = 1000L;
        long leaderImbalanceCheckIntervalNs = 1000000000L;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA), OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs));){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
            for (Integer brokerId2 : allBrokers) {
                CompletableFuture reply = active.registerBroker(new BrokerRegistrationRequestData().setBrokerId(brokerId2.intValue()).setClusterId(active.clusterId()).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId2, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            allBrokers.forEach(brokerId -> Assertions.assertFalse((boolean)active.replicationControl().isBrokerUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions((int)numberOfPartitions).setReplicationFactor(replicationFactor)).iterator()));
            CreateTopicsResponseData createTopicsResponseData = (CreateTopicsResponseData)active.createTopics(createTopicsRequestData).get();
            Assertions.assertEquals((Object)Errors.NONE, (Object)Errors.forCode((short)createTopicsResponseData.topics().find("foo").errorCode()));
            Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
            TestUtils.waitForCondition(() -> {
                this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
                for (Integer brokerId : brokersToFence) {
                    if (!active.replicationControl().isBrokerUnfenced(brokerId.intValue()).booleanValue()) continue;
                    return false;
                }
                return true;
            }, (long)(sessionTimeoutMillis * 3L), (String)"Fencing of brokers did not process within expected time");
            this.sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
            brokersToKeepUnfenced.forEach(brokerId -> Assertions.assertTrue((boolean)active.replicationControl().isBrokerUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been unfenced")));
            brokersToFence.forEach(brokerId -> Assertions.assertFalse((boolean)active.replicationControl().isBrokerUnfenced(brokerId.intValue()), (String)("Broker " + brokerId + " should have been fenced")));
            Assertions.assertTrue((boolean)active.replicationControl().arePartitionLeadersImbalanced());
            for (Integer brokerId3 : brokersToFence) {
                CompletableFuture reply = active.registerBroker(new BrokerRegistrationRequestData().setBrokerId(brokerId3.intValue()).setClusterId(active.clusterId()).setIncarnationId(Uuid.randomUuid()).setListeners(listeners));
                brokerEpochs.put(brokerId3, ((BrokerRegistrationReply)reply.get()).epoch());
            }
            this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
            Set imbalancedPartitions = active.replicationControl().imbalancedPartitions();
            Assertions.assertEquals((int)1, (int)imbalancedPartitions.size());
            int imbalancedPartitionId = ((TopicIdPartition)imbalancedPartitions.iterator().next()).partitionId();
            PartitionRegistration partitionRegistration = active.replicationControl().getPartition(topicIdFoo, imbalancedPartitionId);
            AlterPartitionRequestData.PartitionData partitionData = new AlterPartitionRequestData.PartitionData().setPartitionIndex(imbalancedPartitionId).setLeaderEpoch(partitionRegistration.leaderEpoch).setPartitionEpoch(partitionRegistration.partitionEpoch).setNewIsr(Arrays.asList(1, 2, 3));
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setName("foo");
            topicData.partitions().add(partitionData);
            AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData().setBrokerId(partitionRegistration.leader).setBrokerEpoch(((Long)brokerEpochs.get(partitionRegistration.leader)).longValue());
            alterPartitionRequest.topics().add(topicData);
            active.alterPartition(alterPartitionRequest).get();
            AtomicLong lastHeartbeat = new AtomicLong(active.time().milliseconds());
            TestUtils.waitForCondition(() -> {
                if (active.time().milliseconds() > lastHeartbeat.get() + sessionTimeoutMillis / 2L) {
                    lastHeartbeat.set(active.time().milliseconds());
                    this.sendBrokerheartbeat(active, allBrokers, brokerEpochs);
                }
                return !active.replicationControl().arePartitionLeadersImbalanced();
            }, (long)TimeUnit.MILLISECONDS.convert(leaderImbalanceCheckIntervalNs * 10L, TimeUnit.NANOSECONDS), (String)"Leaders where not balanced after unfencing all of the brokers");
        }
    }

    @Test
    public void testUnregisterBroker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            BrokerRegistrationRequestData.ListenerCollection listeners = new BrokerRegistrationRequestData.ListenerCollection();
            listeners.add((ImplicitLinkedHashCollection.Element)new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
            QuorumController active = controlEnv.activeController();
            CompletableFuture reply = active.registerBroker(new BrokerRegistrationRequestData().setBrokerId(0).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwBA")).setListeners(listeners));
            Assertions.assertEquals((long)0L, (long)((BrokerRegistrationReply)reply.get()).epoch());
            CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(1).setReplicationFactor((short)1)).iterator()));
            Assertions.assertEquals((short)Errors.INVALID_REPLICATION_FACTOR.code(), (short)((CreateTopicsResponseData)active.createTopics(createTopicsRequestData).get()).topics().find("foo").errorCode());
            Assertions.assertEquals((Object)"Unable to replicate the partition 1 time(s): All brokers are currently fenced.", (Object)((CreateTopicsResponseData)active.createTopics(createTopicsRequestData).get()).topics().find("foo").errorMessage());
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(0L).setBrokerId(0).setCurrentMetadataOffset(100000L)).get());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((CreateTopicsResponseData)active.createTopics(createTopicsRequestData).get()).topics().find("foo").errorCode());
            CompletableFuture topicPartitionFuture = active.appendReadEvent("debugGetPartition", () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().iterator(0, true);
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
            active.unregisterBroker(0).get();
            topicPartitionFuture = active.appendReadEvent("debugGetPartition", () -> {
                BrokersToIsrs.PartitionsOnReplicaIterator iterator = active.replicationControl().brokersToIsrs().partitionsWithNoLeader();
                Assertions.assertTrue((boolean)iterator.hasNext());
                return (TopicIdPartition)iterator.next();
            });
            Assertions.assertEquals((int)0, (int)((TopicIdPartition)topicPartitionFuture.get()).partitionId());
        }
    }

    @Test
    public void testSnapshotSaveAndLoad() throws Throwable {
        Uuid fooId;
        QuorumController active2;
        Throwable throwable;
        QuorumControllerTestEnv controlEnv;
        int numBrokers = 4;
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        RawSnapshotReader reader = null;
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty());){
            controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));
            throwable = null;
            try {
                int i;
                active2 = controlEnv.activeController();
                for (i = 0; i < 4; ++i) {
                    BrokerRegistrationReply reply = (BrokerRegistrationReply)active2.registerBroker(new BrokerRegistrationRequestData().setBrokerId(i).setRack(null).setClusterId(active2.clusterId()).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + i))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Arrays.asList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + i)).iterator()))).get();
                    brokerEpochs.put(i, reply.epoch());
                }
                for (i = 0; i < 3; ++i) {
                    Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active2.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(i)).longValue()).setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
                }
                CreateTopicsResponseData fooData = (CreateTopicsResponseData)active2.createTopics(new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(Arrays.asList(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(Arrays.asList(0, 1, 2)), new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1).setBrokerIds(Arrays.asList(1, 2, 0))).iterator()))).iterator()))).get();
                fooId = fooData.topics().find("foo").topicId();
                active2.allocateProducerIds(new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(((Long)brokerEpochs.get(0)).longValue())).get();
                long snapshotLogOffset = (Long)active2.beginWritingSnapshot().get();
                reader = logEnv.waitForSnapshot(snapshotLogOffset);
                SnapshotReader<ApiMessageAndVersion> snapshot = this.createSnapshotReader(reader);
                Assertions.assertEquals((long)snapshotLogOffset, (long)snapshot.lastContainedLogOffset());
                this.checkSnapshotContent(this.expectedSnapshotContent(fooId, brokerEpochs), (Iterator<Batch<ApiMessageAndVersion>>)snapshot);
            }
            catch (Throwable active2) {
                throwable = active2;
                throw active2;
            }
            finally {
                if (controlEnv != null) {
                    if (throwable != null) {
                        try {
                            controlEnv.close();
                        }
                        catch (Throwable active2) {
                            throwable.addSuppressed(active2);
                        }
                    } else {
                        controlEnv.close();
                    }
                }
            }
        }
        logEnv = new LocalLogManagerTestEnv(3, Optional.of(reader));
        var6_5 = null;
        try {
            controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));
            throwable = null;
            try {
                active2 = controlEnv.activeController();
                long snapshotLogOffset = (Long)active2.beginWritingSnapshot().get();
                SnapshotReader<ApiMessageAndVersion> snapshot = this.createSnapshotReader(logEnv.waitForSnapshot(snapshotLogOffset));
                Assertions.assertEquals((long)snapshotLogOffset, (long)snapshot.lastContainedLogOffset());
                this.checkSnapshotContent(this.expectedSnapshotContent(fooId, brokerEpochs), (Iterator<Batch<ApiMessageAndVersion>>)snapshot);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (controlEnv != null) {
                    if (throwable != null) {
                        try {
                            controlEnv.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        controlEnv.close();
                    }
                }
            }
        }
        catch (Throwable throwable4) {
            var6_5 = throwable4;
            throw throwable4;
        }
        finally {
            if (logEnv != null) {
                if (var6_5 != null) {
                    try {
                        logEnv.close();
                    }
                    catch (Throwable throwable5) {
                        var6_5.addSuppressed(throwable5);
                    }
                } else {
                    logEnv.close();
                }
            }
        }
    }

    @Test
    public void testSnapshotConfiguration() throws Throwable {
        int numBrokers = 4;
        int maxNewRecordBytes = 4;
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
            b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA);
            b.setSnapshotMaxNewRecordBytes(4L);
        });){
            int i;
            QuorumController active = controlEnv.activeController();
            for (i = 0; i < 4; ++i) {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(new BrokerRegistrationRequestData().setBrokerId(i).setRack(null).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + i))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Arrays.asList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + i)).iterator()))).get();
                brokerEpochs.put(i, reply.epoch());
            }
            for (i = 0; i < 3; ++i) {
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(i)).longValue()).setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
            }
            CreateTopicsResponseData fooData = (CreateTopicsResponseData)active.createTopics(new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(Arrays.asList(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(Arrays.asList(0, 1, 2)), new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1).setBrokerIds(Arrays.asList(1, 2, 0))).iterator()))).iterator()))).get();
            Uuid fooId = fooData.topics().find("foo").topicId();
            active.allocateProducerIds(new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(((Long)brokerEpochs.get(0)).longValue())).get();
            SnapshotReader<ApiMessageAndVersion> snapshot = this.createSnapshotReader(logEnv.waitForLatestSnapshot());
            this.checkSnapshotSubcontent(this.expectedSnapshotContent(fooId, brokerEpochs), (Iterator<Batch<ApiMessageAndVersion>>)snapshot);
        }
    }

    @Test
    public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable {
        int numBrokers = 4;
        int maxNewRecordBytes = 1000;
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
            b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA);
            b.setSnapshotMaxNewRecordBytes(1000L);
        });){
            QuorumController active = controlEnv.activeController();
            for (int i = 0; i < 4; ++i) {
                BrokerRegistrationReply reply = (BrokerRegistrationReply)active.registerBroker(new BrokerRegistrationRequestData().setBrokerId(i).setRack(null).setClusterId(active.clusterId()).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + i))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Arrays.asList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + i)).iterator()))).get();
                brokerEpochs.put(i, reply.epoch());
                Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(i)).longValue()).setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
            }
            Assertions.assertTrue((logEnv.appendedBytes() < 1000L ? 1 : 0) != 0, (String)String.format("%s appended bytes is not less than %s max new record bytes", logEnv.appendedBytes(), 1000));
            int counter = 0;
            while (logEnv.appendedBytes() < 1000L) {
                String topicName = String.format("foo-%s", ++counter);
                active.createTopics(new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(Arrays.asList(new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(0).setBrokerIds(Arrays.asList(0, 1, 2)), new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(1).setBrokerIds(Arrays.asList(1, 2, 0))).iterator()))).iterator()))).get();
            }
            logEnv.waitForLatestSnapshot();
        }
    }

    private SnapshotReader<ApiMessageAndVersion> createSnapshotReader(RawSnapshotReader reader) {
        return RecordsSnapshotReader.of((RawSnapshotReader)reader, (RecordSerde)new MetadataRecordSerde(), (BufferSupplier)BufferSupplier.create(), (int)Integer.MAX_VALUE);
    }

    private List<ApiMessageAndVersion> expectedSnapshotContent(Uuid fooId, Map<Integer, Long> brokerEpochs) {
        return Arrays.asList(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(fooId), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(0).setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setPartitionId(1).setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).setPartitionEpoch(0), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB0")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9092).setSecurityProtocol((short)0)).iterator())).setRack(null).setFenced(false), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB1")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9093).setSecurityProtocol((short)0)).iterator())).setRack(null).setFenced(false), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB2")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9094).setSecurityProtocol((short)0)).iterator())).setRack(null).setFenced(false), 0), new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).setIncarnationId(Uuid.fromString((String)"kxAT73dKQsitIedpiPtwB3")).setEndPoints(new RegisterBrokerRecord.BrokerEndpointCollection(Arrays.asList(new RegisterBrokerRecord.BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").setPort(9095).setSecurityProtocol((short)0)).iterator())).setRack(null), 0), new ApiMessageAndVersion((ApiMessage)new ProducerIdsRecord().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).setNextProducerId(1000L), 0));
    }

    private void checkSnapshotContent(List<ApiMessageAndVersion> expected, Iterator<Batch<ApiMessageAndVersion>> iterator) throws Exception {
        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(expected), Arrays.asList(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 16), false).flatMap(batch -> batch.records().stream()).collect(Collectors.toList())).iterator());
    }

    private void checkSnapshotSubcontent(List<ApiMessageAndVersion> expected, Iterator<Batch<ApiMessageAndVersion>> iterator) throws Exception {
        RecordTestUtils.deepSortRecords(expected);
        List actual = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 16), false).flatMap(batch -> batch.records().stream()).collect(Collectors.toList());
        RecordTestUtils.deepSortRecords(actual);
        int expectedIndex = 0;
        for (ApiMessageAndVersion current : actual) {
            while (expectedIndex < expected.size() && !expected.get(expectedIndex).equals((Object)current)) {
                ++expectedIndex;
            }
            ++expectedIndex;
        }
        Assertions.assertTrue((expectedIndex <= expected.size() ? 1 : 0) != 0, (String)String.format("actual is not a subset of expected: expected = %s; actual = %s", expected, actual));
    }

    @Test
    public void testTimeouts() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = controller.pause();
            CompletableFuture createFuture = controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(0).setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo")).iterator())));
            long now = controller.time().nanoseconds();
            CompletableFuture deleteFuture = controller.deleteTopics(now, Collections.singletonList(Uuid.ZERO_UUID));
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(now, Collections.singletonList("foo"));
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(now, Collections.singletonList(Uuid.ZERO_UUID));
            CompletableFuture createPartitionsFuture = controller.createPartitions(now, Collections.singletonList(new CreatePartitionsRequestData.CreatePartitionsTopic()));
            CompletableFuture electLeadersFuture = controller.electLeaders(new ElectLeadersRequestData().setTimeoutMs(0).setTopicPartitions(null));
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTimeoutMs(0).setTopics(Collections.singletonList(new AlterPartitionReassignmentsRequestData.ReassignableTopic())));
            CompletableFuture listReassignmentsFuture = controller.listPartitionReassignments(new ListPartitionReassignmentsRequestData().setTopics(null).setTimeoutMs(0));
            while (controller.time().nanoseconds() == now) {
                Thread.sleep(0L, 10);
            }
            countDownLatch.countDown();
            QuorumControllerTest.assertYieldsTimeout(createFuture);
            QuorumControllerTest.assertYieldsTimeout(deleteFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicIdsFuture);
            QuorumControllerTest.assertYieldsTimeout(findTopicNamesFuture);
            QuorumControllerTest.assertYieldsTimeout(createPartitionsFuture);
            QuorumControllerTest.assertYieldsTimeout(electLeadersFuture);
            QuorumControllerTest.assertYieldsTimeout(alterReassignmentsFuture);
            QuorumControllerTest.assertYieldsTimeout(listReassignmentsFuture);
        }
    }

    private static void assertYieldsTimeout(Future<?> future) {
        Assertions.assertEquals(TimeoutException.class, ((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> future.get())).getCause().getClass());
    }

    @Test
    public void testEarlyControllerResults() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            QuorumController controller = controlEnv.activeController();
            CountDownLatch countDownLatch = controller.pause();
            CompletableFuture createFuture = controller.createTopics(new CreateTopicsRequestData().setTimeoutMs(120000));
            long deadlineMs = controller.time().nanoseconds() + TimeUnit.HOURS.toNanos(1L);
            CompletableFuture deleteFuture = controller.deleteTopics(deadlineMs, Collections.emptyList());
            CompletableFuture findTopicIdsFuture = controller.findTopicIds(deadlineMs, Collections.emptyList());
            CompletableFuture findTopicNamesFuture = controller.findTopicNames(deadlineMs, Collections.emptyList());
            CompletableFuture createPartitionsFuture = controller.createPartitions(deadlineMs, Collections.emptyList());
            CompletableFuture electLeadersFuture = controller.electLeaders(new ElectLeadersRequestData().setTimeoutMs(120000));
            CompletableFuture alterReassignmentsFuture = controller.alterPartitionReassignments(new AlterPartitionReassignmentsRequestData().setTimeoutMs(12000));
            createFuture.get();
            deleteFuture.get();
            findTopicIdsFuture.get();
            findTopicNamesFuture.get();
            createPartitionsFuture.get();
            electLeadersFuture.get();
            alterReassignmentsFuture.get();
            countDownLatch.countDown();
        }
    }

    @Test
    public void testMissingInMemorySnapshot() throws Exception {
        int numBrokers = 3;
        int numPartitions = 3;
        String topicName = "topic-name";
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            QuorumController controller = controlEnv.activeController();
            Map<Integer, Long> brokerEpochs = this.registerBrokers(controller, numBrokers);
            List partitions = IntStream.range(0, numPartitions).mapToObj(partitionIndex -> new CreateTopicsRequestData.CreatableReplicaAssignment().setPartitionIndex(partitionIndex).setBrokerIds(Arrays.asList(0, 1, 2))).collect(Collectors.toList());
            Uuid topicId = ((CreateTopicsResponseData)controller.createTopics(new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName(topicName).setNumPartitions(-1).setReplicationFactor((short)-1).setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(partitions.iterator()))).iterator()))).get()).topics().find(topicName).topicId();
            List alterPartitions = IntStream.range(0, numPartitions).mapToObj(partitionIndex -> {
                PartitionRegistration partitionRegistration = controller.replicationControl().getPartition(topicId, partitionIndex);
                return new AlterPartitionRequestData.PartitionData().setPartitionIndex(partitionIndex).setLeaderEpoch(partitionRegistration.leaderEpoch).setPartitionEpoch(partitionRegistration.partitionEpoch).setNewIsr(Arrays.asList(0, 1));
            }).collect(Collectors.toList());
            AlterPartitionRequestData.TopicData topicData = new AlterPartitionRequestData.TopicData().setName(topicName);
            topicData.partitions().addAll(alterPartitions);
            int leaderId = 0;
            AlterPartitionRequestData alterPartitionRequest = new AlterPartitionRequestData().setBrokerId(leaderId).setBrokerEpoch(brokerEpochs.get(leaderId).longValue());
            alterPartitionRequest.topics().add(topicData);
            logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
            int oldClaimEpoch = controller.curClaimEpoch();
            Assertions.assertThrows(ExecutionException.class, () -> {
                AlterPartitionResponseData cfr_ignored_0 = (AlterPartitionResponseData)controller.alterPartition(alterPartitionRequest).get();
            });
            Assertions.assertSame((Object)controller, (Object)controlEnv.activeController());
            Assertions.assertTrue((oldClaimEpoch < controller.curClaimEpoch() ? 1 : 0) != 0, (String)String.format("oldClaimEpoch = %s, newClaimEpoch = %s", oldClaimEpoch, controller.curClaimEpoch()));
            int partitionsWithReplica2 = Utils.toList((Iterator)controller.replicationControl().brokersToIsrs().partitionsWithBrokerInIsr(2)).size();
            int partitionsWithReplica0 = Utils.toList((Iterator)controller.replicationControl().brokersToIsrs().partitionsWithBrokerInIsr(0)).size();
            Assertions.assertEquals((int)numPartitions, (int)partitionsWithReplica0);
            Assertions.assertNotEquals((int)0, (int)partitionsWithReplica2);
            Assertions.assertTrue((partitionsWithReplica0 > partitionsWithReplica2 ? 1 : 0) != 0, (String)String.format("partitionsWithReplica0 = %s, partitionsWithReplica2 = %s", partitionsWithReplica0, partitionsWithReplica2));
        }
    }

    private Map<Integer, Long> registerBrokers(QuorumController controller, int numBrokers) throws Exception {
        HashMap<Integer, Long> brokerEpochs = new HashMap<Integer, Long>();
        for (int brokerId = 0; brokerId < numBrokers; ++brokerId) {
            BrokerRegistrationReply reply = (BrokerRegistrationReply)controller.registerBroker(new BrokerRegistrationRequestData().setBrokerId(brokerId).setRack(null).setClusterId(controller.clusterId()).setIncarnationId(Uuid.fromString((String)("kxAT73dKQsitIedpiPtwB" + brokerId))).setListeners(new BrokerRegistrationRequestData.ListenerCollection(Arrays.asList(new BrokerRegistrationRequestData.Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092 + brokerId)).iterator()))).get();
            brokerEpochs.put(brokerId, reply.epoch());
            controller.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(((Long)brokerEpochs.get(brokerId)).longValue()).setBrokerId(brokerId).setCurrentMetadataOffset(100000L)).get();
        }
        return brokerEpochs;
    }

    private void sendBrokerheartbeat(QuorumController controller, List<Integer> brokers, Map<Integer, Long> brokerEpochs) throws Exception {
        if (brokers.isEmpty()) {
            return;
        }
        for (Integer brokerId : brokers) {
            BrokerHeartbeatReply reply = (BrokerHeartbeatReply)controller.processBrokerHeartbeat(new BrokerHeartbeatRequestData().setWantFence(false).setBrokerEpoch(brokerEpochs.get(brokerId).longValue()).setBrokerId(brokerId.intValue()).setCurrentMetadataOffset(100000L)).get();
            Assertions.assertEquals((Object)new BrokerHeartbeatReply(true, false, false, false), (Object)reply);
        }
    }

    @Test
    public void testConfigResourceExistenceChecker() throws Throwable {
        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty());
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigSchema(ConfigurationControlManagerTest.SCHEMA));){
            QuorumController active = controlEnv.activeController();
            this.registerBrokers(active, 5);
            active.createTopics(new CreateTopicsRequestData().setTopics(new CreateTopicsRequestData.CreatableTopicCollection(Collections.singleton(new CreateTopicsRequestData.CreatableTopic().setName("foo").setReplicationFactor((short)3).setNumPartitions(1)).iterator()))).get();
            QuorumController.ConfigResourceExistenceChecker checker = new QuorumController.ConfigResourceExistenceChecker(active);
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, ""));
            checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "3"));
            Assertions.assertThrows(BrokerIdNotRegisteredException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.BROKER, "10")));
            checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "foo"));
            Assertions.assertThrows(UnknownTopicOrPartitionException.class, () -> checker.accept(new ConfigResource(ConfigResource.Type.TOPIC, "bar")));
        }
    }
}

