/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.migration;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.FeaturesImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicsImageTest;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.migration.CapturingAclMigrationClient;
import org.apache.kafka.metadata.migration.CapturingConfigMigrationClient;
import org.apache.kafka.metadata.migration.CapturingMigrationClient;
import org.apache.kafka.metadata.migration.CapturingTopicMigrationClient;
import org.apache.kafka.metadata.migration.KRaftMigrationDriver;
import org.apache.kafka.metadata.migration.LegacyPropagator;
import org.apache.kafka.metadata.migration.MigrationClient;
import org.apache.kafka.metadata.migration.MigrationClientAuthException;
import org.apache.kafka.metadata.migration.MigrationClientException;
import org.apache.kafka.metadata.migration.MigrationDriverState;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class KRaftMigrationDriverTest {
    List<Node> controllerNodes = Arrays.asList(new Node(4, "host4", 0), new Node(5, "host5", 0), new Node(6, "host6", 0));
    ApiVersions apiVersions = new ApiVersions();
    QuorumFeatures quorumFeatures = QuorumFeatures.create((int)4, (ApiVersions)this.apiVersions, (Map)QuorumFeatures.defaultFeatureMap(), this.controllerNodes);
    Time mockTime = new MockTime(1L){

        public long nanoseconds() {
            return System.nanoTime() - TimeUnit.NANOSECONDS.convert(990L, TimeUnit.MILLISECONDS);
        }
    };

    @BeforeEach
    public void setup() {
        this.apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
        this.apiVersions.update("5", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
        this.apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
    }

    RegisterBrokerRecord zkBrokerRecord(int id) {
        RegisterBrokerRecord record = new RegisterBrokerRecord();
        record.setBrokerId(id);
        record.setIsMigratingZkBroker(true);
        record.setFenced(false);
        return record;
    }

    CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(KRaftMigrationDriver driver, MetadataDelta delta, MetadataImage newImage, MetadataProvenance provenance) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Consumer<Throwable> completionHandler = ex -> {
            if (ex == null) {
                future.complete(null);
            } else {
                future.completeExceptionally((Throwable)ex);
            }
        };
        driver.enqueueMetadataChangeEvent(delta, newImage, provenance, false, completionHandler);
        return future;
    }

    @Test
    public void testOnlySendNeededRPCsToBrokers() throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).setConfigMigrationClient(configClient).build();
        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(3000, (ZkRecordConsumer)new NoOpRecordConsumer(), (MigrationClient)migrationClient, (LegacyPropagator)metadataPropagator, metadataPublisher -> {}, (FaultHandler)new MockFaultHandler("test"), this.quorumFeatures, this.mockTime);){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            Assertions.assertEquals((int)1, (int)metadataPropagator.images);
            Assertions.assertEquals((int)0, (int)metadataPropagator.deltas);
            delta = new MetadataDelta(image);
            delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
            provenance = new MetadataProvenance(120L, 1, 2L);
            image = delta.apply(provenance);
            this.enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)configClient.writtenConfigs.size());
            Assertions.assertEquals((int)1, (int)metadataPropagator.images);
            Assertions.assertEquals((int)0, (int)metadataPropagator.deltas);
            delta = new MetadataDelta(image);
            delta.replay(new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(0L).setFenced(BrokerRegistrationFencingChange.NONE.value()).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
            provenance = new MetadataProvenance(130L, 1, 3L);
            image = delta.apply(provenance);
            this.enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)metadataPropagator.images);
            Assertions.assertEquals((int)1, (int)metadataPropagator.deltas);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testMigrationWithClientException(final boolean authException) throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        final CountDownLatch claimLeaderAttempts = new CountDownLatch(3);
        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<Integer>(Arrays.asList(1, 2, 3)), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient()){

            @Override
            public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
                if (claimLeaderAttempts.getCount() == 0L) {
                    return super.claimControllerLeadership(state);
                }
                claimLeaderAttempts.countDown();
                if (authException) {
                    throw new MigrationClientAuthException((Throwable)new RuntimeException("Some kind of ZK auth error!"));
                }
                throw new MigrationClientException("Some kind of ZK error!");
            }
        };
        MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(3000, (ZkRecordConsumer)new NoOpRecordConsumer(), (MigrationClient)migrationClient, (LegacyPropagator)metadataPropagator, metadataPublisher -> {}, (FaultHandler)faultHandler, this.quorumFeatures, this.mockTime);){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100L, 42L));
            Assertions.assertTrue((boolean)claimLeaderAttempts.await(1L, TimeUnit.MINUTES));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
            if (authException) {
                Assertions.assertEquals(MigrationClientAuthException.class, faultHandler.firstException().getCause().getClass());
            } else {
                Assertions.assertNull((Object)faultHandler.firstException());
            }
        }
    }

    @Test
    public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate() throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build();
        this.apiVersions.remove("6");
        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(3000, (ZkRecordConsumer)new NoOpRecordConsumer(), (MigrationClient)migrationClient, (LegacyPropagator)metadataPropagator, metadataPublisher -> {}, (FaultHandler)new MockFaultHandler("test"), this.quorumFeatures, this.mockTime);){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM), (String)"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");
            this.apiVersions.update("6", NodeApiVersions.create());
            Assertions.assertEquals((Object)MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM, driver.migrationState().get(1L, TimeUnit.MINUTES));
            this.apiVersions.update("6", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
        }
    }

    @Test
    public void testSkipWaitForBrokersInDualWrite() throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(), new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
        MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(3000, (ZkRecordConsumer)new NoOpRecordConsumer(), (MigrationClient)migrationClient, (LegacyPropagator)metadataPropagator, metadataPublisher -> {}, (FaultHandler)faultHandler, this.quorumFeatures, this.mockTime);){
            MetadataImage image = MetadataImage.EMPTY;
            MetadataDelta delta = new MetadataDelta(image);
            migrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(ZkMigrationState.MIGRATION.toRecord().message());
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100L, 42L));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
        }
    }

    public void setupTopicDualWrite(TopicDualWriteVerifier verifier) throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingTopicMigrationClient topicClient = new CapturingTopicMigrationClient(){

            @Override
            public void iterateTopics(EnumSet<TopicMigrationClient.TopicVisitorInterest> interests, TopicMigrationClient.TopicVisitor visitor) {
                TopicsImageTest.IMAGE1.topicsByName().forEach((topicName, topicImage) -> {
                    HashMap assignment = new HashMap();
                    topicImage.partitions().forEach((partitionId, partitionRegistration) -> assignment.put(partitionId, IntStream.of(partitionRegistration.replicas).boxed().collect(Collectors.toList())));
                    visitor.visitTopic(topicName, topicImage.id(), assignment);
                    topicImage.partitions().forEach((partitionId, partitionRegistration) -> visitor.visitPartition(new TopicIdPartition(topicImage.id(), new TopicPartition(topicName, partitionId.intValue())), partitionRegistration));
                });
            }
        };
        CapturingConfigMigrationClient configClient = new CapturingConfigMigrationClient();
        CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(0, 1, 2, 3, 4, 5).setTopicMigrationClient(topicClient).setConfigMigrationClient(configClient).build();
        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(3000, (ZkRecordConsumer)new NoOpRecordConsumer(), (MigrationClient)migrationClient, (LegacyPropagator)metadataPropagator, metadataPublisher -> {}, (FaultHandler)new MockFaultHandler("test"), this.quorumFeatures, this.mockTime);){
            verifier.verify(driver, migrationClient, topicClient, configClient);
        }
    }

    @Test
    public void testTopicDualWriteSnapshot() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new SnapshotManifest(provenance, 100L));
            driver.migrationState().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)topicClient.deletedTopics.size());
            Assertions.assertEquals((Object)"foo", (Object)topicClient.deletedTopics.get(0));
            Assertions.assertEquals((int)1, (int)topicClient.createdTopics.size());
            Assertions.assertEquals((Object)"baz", (Object)topicClient.createdTopics.get(0));
            Assertions.assertTrue((boolean)topicClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals((Object)new ConfigResource(ConfigResource.Type.TOPIC, "foo"), (Object)configClient.deletedResources.get(0));
        });
    }

    @Test
    public void testTopicDualWriteDelta() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            driver.migrationState().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)topicClient.deletedTopics.size());
            Assertions.assertEquals((Object)"foo", (Object)topicClient.deletedTopics.get(0));
            Assertions.assertEquals((int)1, (int)topicClient.createdTopics.size());
            Assertions.assertEquals((Object)"baz", (Object)topicClient.createdTopics.get(0));
            Assertions.assertTrue((boolean)topicClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals((Object)new ConfigResource(ConfigResource.Type.TOPIC, "foo"), (Object)configClient.deletedResources.get(0));
        });
    }

    @Test
    public void testControllerFailover() throws Exception {
        this.setupTopicDualWrite((driver, migrationClient, topicClient, configClient) -> {
            MetadataImage image = new MetadataImage(MetadataProvenance.EMPTY, FeaturesImage.EMPTY, ClusterImage.EMPTY, TopicsImageTest.IMAGE1, ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, ScramImage.EMPTY);
            MetadataDelta delta = new MetadataDelta(image);
            driver.start();
            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
            delta.replay(this.zkBrokerRecord(0));
            delta.replay(this.zkBrokerRecord(1));
            delta.replay(this.zkBrokerRecord(2));
            delta.replay(this.zkBrokerRecord(3));
            delta.replay(this.zkBrokerRecord(4));
            delta.replay(this.zkBrokerRecord(5));
            MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
            image = delta.apply(provenance);
            LeaderAndEpoch newLeader = new LeaderAndEpoch(OptionalInt.of(3001), 1);
            driver.onControllerChange(newLeader);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            migrationClient.setMigrationRecoveryState(ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100L, 1));
            provenance = new MetadataProvenance(200L, 1, 1L);
            delta = new MetadataDelta(image);
            RecordTestUtils.replayAll(delta, TopicsImageTest.DELTA1_RECORDS);
            image = delta.apply(provenance);
            driver.onMetadataUpdate(delta, image, (LoaderManifest)new LogDeltaManifest(provenance, newLeader, 1, 100L, 42L));
            newLeader = new LeaderAndEpoch(OptionalInt.of(3000), 1);
            driver.onControllerChange(newLeader);
            TestUtils.waitForCondition(() -> ((MigrationDriverState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationDriverState.DUAL_WRITE), (String)"");
            Assertions.assertEquals((int)1, (int)topicClient.deletedTopics.size());
            Assertions.assertEquals((Object)"foo", (Object)topicClient.deletedTopics.get(0));
            Assertions.assertEquals((int)1, (int)topicClient.createdTopics.size());
            Assertions.assertEquals((Object)"baz", (Object)topicClient.createdTopics.get(0));
            Assertions.assertTrue((boolean)topicClient.updatedTopicPartitions.get("bar").contains(0));
            Assertions.assertEquals((Object)new ConfigResource(ConfigResource.Type.TOPIC, "foo"), (Object)configClient.deletedResources.get(0));
        });
    }

    @FunctionalInterface
    static interface TopicDualWriteVerifier {
        public void verify(KRaftMigrationDriver var1, CapturingMigrationClient var2, CapturingTopicMigrationClient var3, CapturingConfigMigrationClient var4) throws Exception;
    }

    static class CountingMetadataPropagator
    implements LegacyPropagator {
        public int deltas = 0;
        public int images = 0;

        CountingMetadataPropagator() {
        }

        public void startup() {
        }

        public void shutdown() {
        }

        public void publishMetadata(MetadataImage image) {
        }

        public void sendRPCsToBrokersFromMetadataDelta(MetadataDelta delta, MetadataImage image, int zkControllerEpoch) {
            ++this.deltas;
        }

        public void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch) {
            ++this.images;
        }

        public void clear() {
        }
    }

    static class NoOpRecordConsumer
    implements ZkRecordConsumer {
        NoOpRecordConsumer() {
        }

        public void beginMigration() {
        }

        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
            return CompletableFuture.completedFuture(null);
        }

        public CompletableFuture<OffsetAndEpoch> completeMigration() {
            return CompletableFuture.completedFuture(new OffsetAndEpoch(100L, 1));
        }

        public void abortMigration() {
        }
    }
}

