/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.migration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.DelegationTokenImage;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsImageTest;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.migration.CapturingAclMigrationClient;
import org.apache.kafka.metadata.migration.CapturingConfigMigrationClient;
import org.apache.kafka.metadata.migration.CapturingDelegationTokenMigrationClient;
import org.apache.kafka.metadata.migration.CapturingMigrationClient;
import org.apache.kafka.metadata.migration.CapturingTopicMigrationClient;
import org.apache.kafka.metadata.migration.KRaftMigrationDriver;
import org.apache.kafka.metadata.migration.LegacyPropagator;
import org.apache.kafka.metadata.migration.MigrationClient;
import org.apache.kafka.metadata.migration.MigrationClientAuthException;
import org.apache.kafka.metadata.migration.MigrationClientException;
import org.apache.kafka.metadata.migration.MigrationDriverState;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

public class KRaftMigrationDriverTest {
    private static final QuorumFeatures QUORUM_FEATURES = new QuorumFeatures(4, QuorumFeatures.defaultFeatureMap((boolean)true), Arrays.asList(4, 5, 6));
    MockControllerMetrics metrics = new MockControllerMetrics();
    Time mockTime = new MockTime(1L){

        public long nanoseconds() {
            return System.nanoTime() - TimeUnit.NANOSECONDS.convert(990L, TimeUnit.MILLISECONDS);
        }
    };

    KRaftMigrationDriver.Builder defaultTestBuilder() {
        return KRaftMigrationDriver.newBuilder().setNodeId(3000).setZkRecordConsumer((ZkRecordConsumer)new NoOpRecordConsumer()).setInitialZkLoadHandler(metadataPublisher -> {}).setFaultHandler((FaultHandler)new MockFaultHandler("test")).setQuorumFeatures(QUORUM_FEATURES).setConfigSchema(KafkaConfigSchema.EMPTY).setControllerMetrics((QuorumControllerMetrics)this.metrics).setTime(this.mockTime);
    }

    static LogDeltaManifest.Builder logDeltaManifestBuilder(MetadataProvenance provenance, LeaderAndEpoch newLeader) {
        return LogDeltaManifest.newBuilder().provenance(provenance).leaderAndEpoch(newLeader).numBatches(1).elapsedNs(100L).numBytes(42L);
    }

    RegisterBrokerRecord zkBrokerRecord(int id) {
        RegisterBrokerRecord record = new RegisterBrokerRecord();
        record.setBrokerId(id);
        record.setIsMigratingZkBroker(true);
        record.setFenced(false);
        return record;
    }

    CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(KRaftMigrationDriver driver, MetadataDelta delta, MetadataImage newImage, MetadataProvenance provenance) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Consumer<Throwable> completionHandler = ex -> {
            if (ex == null) {
                future.complete(null);
            } else {
                future.completeExceptionally((Throwable)ex);
            }
        };
        driver.enqueueMetadataChangeEvent(delta, newImage, provenance, false, completionHandler);
        return future;
    }

    @Test
    public void testOnControllerChangeWhenUninitialized() throws InterruptedException {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient.newBuilder().build();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
        MockFaultHandler faultHandler = new MockFaultHandler("testBecomeLeaderUninitialized");
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setPropagator((LegacyPropagator)metadataPropagator).setFaultHandler((FaultHandler)faultHandler);
        try (KRaftMigrationDriver driver = builder.build();){
            migrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.start();
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(30L, TimeUnit.SECONDS)).equals((Object)MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), (String)"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testOnlySendNeededRPCsToBrokers(boolean registerControllers) throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).setConfigMigrationClient(configClient).build();
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setPropagator((LegacyPropagator)metadataPropagator).setInitialZkLoadHandler(metadataPublisher -> {});
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, registerControllers);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            Assertions.assertEquals((int)1, (int)metadataPropagator.images);
            Assertions.assertEquals((int)0, (int)metadataPropagator.deltas);
            delta = new MetadataDelta(image);
            delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
            provenance = new MetadataProvenance(120L, 1, 2L);
            image = delta.apply(provenance);
            this.enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)configClient.writtenConfigs.size());
            Assertions.assertEquals((int)1, (int)metadataPropagator.images);
            Assertions.assertEquals((int)0, (int)metadataPropagator.deltas);
            delta = new MetadataDelta(image);
            delta.replay(new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(0L).setFenced(BrokerRegistrationFencingChange.NONE.value()).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
            provenance = new MetadataProvenance(130L, 1, 3L);
            image = delta.apply(provenance);
            this.enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)metadataPropagator.images);
            Assertions.assertEquals((int)1, (int)metadataPropagator.deltas);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMigrationWithClientException(final boolean authException) throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        final CountDownLatch claimLeaderAttempts = new CountDownLatch(3);
        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<Integer>(Arrays.asList(1, 2, 3)), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient(), new CapturingDelegationTokenMigrationClient(), CapturingMigrationClient.EMPTY_BATCH_SUPPLIER){

            @Override
            public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
                if (claimLeaderAttempts.getCount() == 0L) {
                    return super.claimControllerLeadership(state);
                }
                claimLeaderAttempts.countDown();
                if (authException) {
                    throw new MigrationClientAuthException((Throwable)new RuntimeException("Some kind of ZK auth error!"));
                }
                throw new MigrationClientException("Some kind of ZK error!");
            }
        };
        MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setFaultHandler((FaultHandler)faultHandler).setPropagator((LegacyPropagator)metadataPropagator);
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            this.setupDeltaForMigration(delta, true);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            Assertions.assertTrue((boolean)claimLeaderAttempts.await(1L, TimeUnit.MINUTES));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            if (authException) {
                Assertions.assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass());
            } else {
                Assertions.assertNull((Object)faultHandler.firstException());
            }
        }
    }

    @Test
    public void testMigrationWithClientExceptionWhileMigratingZnodeCreation() throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        final CountDownLatch createZnodeAttempts = new CountDownLatch(3);
        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<Integer>(Arrays.asList(1, 2, 3)), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient(), new CapturingDelegationTokenMigrationClient(), CapturingMigrationClient.EMPTY_BATCH_SUPPLIER){

            @Override
            public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
                if (createZnodeAttempts.getCount() == 0L) {
                    this.setMigrationRecoveryState(initialState);
                    return initialState;
                }
                createZnodeAttempts.countDown();
                throw new MigrationClientException("Some kind of ZK error!");
            }
        };
        MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setFaultHandler((FaultHandler)faultHandler).setPropagator((LegacyPropagator)metadataPropagator);
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            this.setupDeltaForMigration(delta, true);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            Assertions.assertTrue((boolean)createZnodeAttempts.await(1L, TimeUnit.MINUTES));
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            Assertions.assertNull((Object)faultHandler.firstException());
        }
    }

    private void setupDeltaForMigration(MetadataDelta delta, boolean registerControllers) {
        if (registerControllers) {
            delta.replay(new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()));
            Iterator iterator = QUORUM_FEATURES.quorumNodeIds().iterator();
            while (iterator.hasNext()) {
                int id = (Integer)iterator.next();
                delta.replay(RecordTestUtils.createTestControllerRegistration(id, true));
            }
        } else {
            delta.replay(new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel()));
        }
    }

    private void setupDeltaWithControllerRegistrations(MetadataDelta delta, List<Integer> notReadyIds, List<Integer> readyIds) {
        delta.replay(new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()));
        delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
        for (int id : notReadyIds) {
            delta.replay(RecordTestUtils.createTestControllerRegistration(id, false));
        }
        for (int id : readyIds) {
            delta.replay(RecordTestUtils.createTestControllerRegistration(id, true));
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate(boolean allNodePresent) throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build();
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setPropagator((LegacyPropagator)metadataPropagator);
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            if (allNodePresent) {
                this.setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 5, 6), Arrays.asList(new Integer[0]));
            } else {
                this.setupDeltaWithControllerRegistrations(delta, Arrays.asList(new Integer[0]), Arrays.asList(4, 5));
            }
            delta.replay(this.zkBrokerRecord(1));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), (String)"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
            Assertions.assertEquals((Object)MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM, driver.migrationState().get(1L, TimeUnit.MINUTES));
            delta = new MetadataDelta(image);
            this.setupDeltaWithControllerRegistrations(delta, Arrays.asList(new Integer[0]), Arrays.asList(4, 5, 6));
            image = delta.apply(new MetadataProvenance(200L, 1, 2L));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest.Builder().provenance(image.provenance()).leaderAndEpoch(newLeader).numBatches(1).elapsedNs(100L).numBytes(42L).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
        }
    }

    @Test
    public void testSkipWaitForBrokersInDualWrite() throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient.newBuilder().build();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build();
        MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setPropagator((LegacyPropagator)metadataPropagator).setFaultHandler((FaultHandler)faultHandler);
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            migrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(ZkMigrationState.MIGRATION.toRecord().message());
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
        }
    }

    public void setupTopicDualWrite(TopicDualWriteVerifier verifier) throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient(){

            @Override
            public void iterateTopics(EnumSet<TopicMigrationClient.TopicVisitorInterest> interests, TopicMigrationClient.TopicVisitor visitor) {
                TopicsImageTest.IMAGE1.topicsByName().forEach((topicName, topicImage) -> {
                    HashMap assignment = new HashMap();
                    topicImage.partitions().forEach((partitionId, partitionRegistration) -> assignment.put(partitionId, IntStream.of(partitionRegistration.replicas).boxed().collect(Collectors.toList())));
                    visitor.visitTopic(topicName, topicImage.id(), assignment);
                    topicImage.partitions().forEach((partitionId, partitionRegistration) -> visitor.visitPartition(new TopicIdPartition(topicImage.id(), new TopicPartition(topicName, partitionId.intValue())), partitionRegistration));
                });
            }
        };
        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(0, 1, 2, 3, 4, 5).setTopicMigrationClient(topicClient).setConfigMigrationClient(configClient).build();
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setPropagator((LegacyPropagator)metadataPropagator);
        try (KRaftMigrationDriver driver = builder.build();){
            verifier.verify(driver, migrationClient, topicClient, configClient);
        }
    }

    @Test
    public void testTopicDualWriteSnapshot() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY, DelegationTokenImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, true);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new SnapshotManifest(provenance, 100L));
            driver.migrationState().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)topicClient.deletedTopics.size());
            Assertions.assertEquals((Object)"foo", (Object)topicClient.deletedTopics.get(0));
            Assertions.assertEquals((int)1, (int)topicClient.createdTopics.size());
            Assertions.assertEquals((Object)"baz", (Object)topicClient.createdTopics.get(0));
            Assertions.assertTrue((boolean)topicClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals((Object)new ConfigResource(ConfigResource.Type.TOPIC, "foo"), (Object)configClient.deletedResources.get(0));
        });
    }

    @Test
    public void testTopicDualWriteDelta() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY, DelegationTokenImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, true);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            driver.migrationState().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)topicClient.deletedTopics.size());
            Assertions.assertEquals((Object)"foo", (Object)topicClient.deletedTopics.get(0));
            Assertions.assertEquals((int)1, (int)topicClient.createdTopics.size());
            Assertions.assertEquals((Object)"baz", (Object)topicClient.createdTopics.get(0));
            Assertions.assertTrue((boolean)topicClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals((Object)new ConfigResource(ConfigResource.Type.TOPIC, "foo"), (Object)configClient.deletedResources.get(0));
        });
    }

    @Test
    public void testNoDualWriteBeforeMigration() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY, DelegationTokenImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, true);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), (String)"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            driver.transitionTo(MigrationDriverState.WAIT_FOR_BROKERS);
            driver.transitionTo(MigrationDriverState.BECOME_CONTROLLER);
            driver.transitionTo(MigrationDriverState.ZK_MIGRATION);
            driver.transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new SnapshotManifest(provenance, 100L));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
        });
    }

    @Test
    public void testControllerFailover() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY, DelegationTokenImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, true);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3001), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            migrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, newLeader).build());
            newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"");
            Assertions.assertEquals((int)1, (int)topicClient.deletedTopics.size());
            Assertions.assertEquals((Object)"foo", (Object)topicClient.deletedTopics.get(0));
            Assertions.assertEquals((int)1, (int)topicClient.createdTopics.size());
            Assertions.assertEquals((Object)"baz", (Object)topicClient.createdTopics.get(0));
            Assertions.assertTrue((boolean)topicClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals((Object)new ConfigResource(ConfigResource.Type.TOPIC, "foo"), (Object)configClient.deletedResources.get(0));
        });
    }

    @Test
    public void testBeginMigrationOnce() throws Exception {
        final AtomicInteger migrationBeginCalls = new AtomicInteger(0);
        NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer(){

            @Override
            public CompletableFuture<?> beginMigration() {
                migrationBeginCalls.incrementAndGet();
                return CompletableFuture.completedFuture(null);
            }
        };
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build();
        MockFaultHandler faultHandler = new MockFaultHandler("testBeginMigrationOnce");
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setZkRecordConsumer((ZkRecordConsumer)recordConsumer).setPropagator((LegacyPropagator)metadataPropagator).setFaultHandler((FaultHandler)faultHandler);
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, true);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            Assertions.assertEquals((int)1, (int)migrationBeginCalls.get());
        }
    }

    private List<ApiMessageAndVersion> fillBatch(int size) {
        Object[] batch = new ApiMessageAndVersion[size];
        Arrays.fill(batch, new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("topic-fill").setTopicId(Uuid.randomUuid()), 0));
        return Arrays.asList(batch);
    }

    static Stream<Arguments> batchSizes() {
        int defaultBatchSize = 200;
        return Stream.of(Arguments.of((Object[])new Object[]{Optional.of(defaultBatchSize), Arrays.asList(0, 0, 0, 0), 0, 0}), Arguments.of((Object[])new Object[]{Optional.of(defaultBatchSize), Arrays.asList(0, 0, 1, 0), 1, 1}), Arguments.of((Object[])new Object[]{Optional.of(defaultBatchSize), Arrays.asList(1, 1, 1, 1), 1, 4}), Arguments.of((Object[])new Object[]{Optional.of(1000), Collections.singletonList(999), 1, 999}), Arguments.of((Object[])new Object[]{Optional.of(1000), Collections.singletonList(1000), 1, 1000}), Arguments.of((Object[])new Object[]{Optional.of(1000), Collections.singletonList(1001), 1, 1001}), Arguments.of((Object[])new Object[]{Optional.of(1000), Arrays.asList(1000, 1), 2, 1001}), Arguments.of((Object[])new Object[]{Optional.of(defaultBatchSize), Arrays.asList(0, 0, 0, 0), 0, 0}), Arguments.of((Object[])new Object[]{Optional.of(1000), Arrays.asList(1000, 1000, 1000), 3, 3000}), Arguments.of((Object[])new Object[]{Optional.of(defaultBatchSize), Collections.singletonList(defaultBatchSize + 1), 1, 201}), Arguments.of((Object[])new Object[]{Optional.of(defaultBatchSize), Arrays.asList(defaultBatchSize, 1), 2, 201}), Arguments.of((Object[])new Object[]{Optional.empty(), Collections.singletonList(defaultBatchSize + 1), 1, 201}), Arguments.of((Object[])new Object[]{Optional.empty(), Arrays.asList(defaultBatchSize, 1), 2, 201}));
    }

    @ParameterizedTest
    @MethodSource(value={"batchSizes"})
    public void testCoalesceMigrationRecords(Optional<Integer> configBatchSize, final List<Integer> batchSizes, int expectedBatchCount, int expectedRecordCount) throws Exception {
        final ArrayList batchesPassedToController = new ArrayList();
        NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer(){

            @Override
            public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
                batchesPassedToController.add(recordBatch);
                return CompletableFuture.completedFuture(null);
            }
        };
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).setBatchSupplier(new CapturingMigrationClient.MigrationBatchSupplier(){

            @Override
            public List<List<ApiMessageAndVersion>> recordBatches() {
                ArrayList<List<ApiMessageAndVersion>> batches = new ArrayList<List<ApiMessageAndVersion>>();
                Iterator iterator = batchSizes.iterator();
                while (iterator.hasNext()) {
                    int batchSize = (Integer)iterator.next();
                    batches.add(KRaftMigrationDriverTest.this.fillBatch(batchSize));
                }
                return batches;
            }
        }).build();
        MockFaultHandler faultHandler = new MockFaultHandler("testRebatchMigrationRecords");
        KRaftMigrationDriver.Builder builder = this.defaultTestBuilder().setZkMigrationClient((MigrationClient)migrationClient).setZkRecordConsumer((ZkRecordConsumer)recordConsumer).setPropagator((LegacyPropagator)metadataPropagator).setFaultHandler((FaultHandler)faultHandler);
        configBatchSize.ifPresent(arg_0 -> ((KRaftMigrationDriver.Builder)builder).setMinMigrationBatchSize(arg_0));
        try (KRaftMigrationDriver driver = builder.build();){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            this.startAndWaitForRecoveringMigrationStateFromZK(driver);
            this.setupDeltaForMigration(delta, true);
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            driver.onMetadataUpdate(delta, image, (LoaderManifest)KRaftMigrationDriverTest.logDeltaManifestBuilder(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            Assertions.assertEquals((int)expectedBatchCount, (int)batchesPassedToController.size());
            Assertions.assertEquals((int)expectedRecordCount, (int)batchesPassedToController.stream().mapToInt(List::size).sum());
        }
    }

    private void startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) throws InterruptedException {
        driver.start();
        TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.INACTIVE), (String)"Waiting for KRaftMigrationDriver to enter INACTIVE state");
    }

    @FunctionalInterface
    static interface TopicDualWriteVerifier {
        public void verify(KRaftMigrationDriver var1, CapturingMigrationClient var2, CapturingTopicMigrationClient var3, CapturingConfigMigrationClient var4) throws Exception;
    }

    static class CountingMetadataPropagator
    implements LegacyPropagator {
        public int deltas = 0;
        public int images = 0;

        CountingMetadataPropagator() {
        }

        public void startup() {
        }

        public void shutdown() {
        }

        public void publishMetadata(MetadataImage image) {
        }

        public void sendRPCsToBrokersFromMetadataDelta(MetadataDelta delta, MetadataImage image, int zkControllerEpoch) {
            ++this.deltas;
        }

        public void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch) {
            ++this.images;
        }

        public void clear() {
        }
    }

    static class NoOpRecordConsumer
    implements ZkRecordConsumer {
        NoOpRecordConsumer() {
        }

        public CompletableFuture<?> beginMigration() {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<OffsetAndEpoch> completeMigration() {
            return CompletableFuture.completedFuture(new OffsetAndEpoch(100L, 1));
        }

        public void abortMigration() {
        }
    }

    static class MockControllerMetrics
    extends QuorumControllerMetrics {
        final AtomicBoolean closed = new AtomicBoolean(false);

        MockControllerMetrics() {
            super(Optional.empty(), Time.SYSTEM, false);
        }

        public void close() {
            super.close();
            this.closed.set(true);
        }
    }
}

