/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.migration;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.migration.KRaftMigrationDriver;
import org.apache.kafka.metadata.migration.LegacyPropagator;
import org.apache.kafka.metadata.migration.MigrationClient;
import org.apache.kafka.metadata.migration.MigrationState;
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class KRaftMigrationDriverTest {
    RegisterBrokerRecord zkBrokerRecord(int id) {
        RegisterBrokerRecord record = new RegisterBrokerRecord();
        record.setBrokerId(id);
        record.setIsMigratingZkBroker(true);
        record.setFenced(false);
        return record;
    }

    CompletableFuture<Void> enqueueMetadataChangeEventWithFuture(KRaftMigrationDriver driver, MetadataDelta delta, MetadataImage newImage, MetadataProvenance provenance) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        Consumer<Throwable> completionHandler = ex -> {
            if (ex == null) {
                future.complete(null);
            } else {
                future.completeExceptionally((Throwable)ex);
            }
        };
        driver.enqueueMetadataChangeEvent(delta, newImage, provenance, false, completionHandler);
        return future;
    }

    @Test
    public void testOnlySendNeededRPCsToBrokers() throws Exception {
        CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
        CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<Integer>(Arrays.asList(1, 2, 3)));
        KRaftMigrationDriver driver = new KRaftMigrationDriver(3000, (ZkRecordConsumer)new NoOpRecordConsumer(), (MigrationClient)migrationClient, (LegacyPropagator)metadataPropagator, metadataPublisher -> {}, (FaultHandler)new MockFaultHandler("test"));
        MetadataImage image = MetadataImage.EMPTY;
        MetadataDelta delta = new MetadataDelta.Builder().setImage(image).build();
        driver.start();
        delta.replay(this.zkBrokerRecord(1));
        delta.replay(this.zkBrokerRecord(2));
        delta.replay(this.zkBrokerRecord(3));
        MetadataProvenance provenance = new MetadataProvenance(100L, 1, 1L);
        image = delta.apply(provenance);
        driver.publishLogDelta(delta, image, new LogDeltaManifest(provenance, new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100L, 42L));
        TestUtils.waitForCondition(() -> ((MigrationState)driver.migrationState().get(1L, TimeUnit.MINUTES)).equals((Object)MigrationState.DUAL_WRITE), (String)"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
        Assertions.assertEquals((int)1, (int)metadataPropagator.images);
        Assertions.assertEquals((int)0, (int)metadataPropagator.deltas);
        delta = new MetadataDelta.Builder().setImage(image).build();
        delta.replay(new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("1").setName("foo").setValue("bar"));
        provenance = new MetadataProvenance(120L, 1, 2L);
        image = delta.apply(provenance);
        this.enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1L, TimeUnit.MINUTES);
        Assertions.assertEquals((int)1, (int)migrationClient.capturedConfigs.size());
        Assertions.assertEquals((int)1, (int)metadataPropagator.images);
        Assertions.assertEquals((int)0, (int)metadataPropagator.deltas);
        delta = new MetadataDelta.Builder().setImage(image).build();
        delta.replay(new BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(0L).setFenced(BrokerRegistrationFencingChange.NONE.value()).setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()));
        provenance = new MetadataProvenance(130L, 1, 3L);
        image = delta.apply(provenance);
        this.enqueueMetadataChangeEventWithFuture(driver, delta, image, provenance).get(1L, TimeUnit.MINUTES);
        Assertions.assertEquals((int)1, (int)metadataPropagator.images);
        Assertions.assertEquals((int)1, (int)metadataPropagator.deltas);
        driver.close();
    }

    class CountingMetadataPropagator
    implements LegacyPropagator {
        public int deltas = 0;
        public int images = 0;

        CountingMetadataPropagator() {
        }

        public void startup() {
        }

        public void shutdown() {
        }

        public void publishMetadata(MetadataImage image) {
        }

        public void sendRPCsToBrokersFromMetadataDelta(MetadataDelta delta, MetadataImage image, int zkControllerEpoch) {
            ++this.deltas;
        }

        public void sendRPCsToBrokersFromMetadataImage(MetadataImage image, int zkControllerEpoch) {
            ++this.images;
        }

        public void clear() {
        }

        public void setMetadataVersion(MetadataVersion metadataVersion) {
        }
    }

    class CapturingMigrationClient
    implements MigrationClient {
        private final Set<Integer> brokerIds;
        public final Map<ConfigResource, Map<String, String>> capturedConfigs = new HashMap<ConfigResource, Map<String, String>>();

        public CapturingMigrationClient(Set<Integer> brokerIdsInZk) {
            this.brokerIds = brokerIdsInZk;
        }

        public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
            return initialState;
        }

        public ZkMigrationLeadershipState setMigrationRecoveryState(ZkMigrationLeadershipState state) {
            return state;
        }

        public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershipState state) {
            return state;
        }

        public ZkMigrationLeadershipState releaseControllerLeadership(ZkMigrationLeadershipState state) {
            return state;
        }

        public ZkMigrationLeadershipState createTopic(String topicName, Uuid topicId, Map<Integer, PartitionRegistration> topicPartitions, ZkMigrationLeadershipState state) {
            return state;
        }

        public ZkMigrationLeadershipState updateTopicPartitions(Map<String, Map<Integer, PartitionRegistration>> topicPartitions, ZkMigrationLeadershipState state) {
            return state;
        }

        public ZkMigrationLeadershipState writeConfigs(ConfigResource configResource, Map<String, String> configMap, ZkMigrationLeadershipState state) {
            this.capturedConfigs.computeIfAbsent(configResource, __ -> new HashMap()).putAll(configMap);
            return state;
        }

        public ZkMigrationLeadershipState writeClientQuotas(Map<String, String> clientQuotaEntity, Map<String, Double> quotas, ZkMigrationLeadershipState state) {
            return state;
        }

        public ZkMigrationLeadershipState writeProducerId(long nextProducerId, ZkMigrationLeadershipState state) {
            return state;
        }

        public void readAllMetadata(Consumer<List<ApiMessageAndVersion>> batchConsumer, Consumer<Integer> brokerIdConsumer) {
        }

        public Set<Integer> readBrokerIds() {
            return this.brokerIds;
        }

        public Set<Integer> readBrokerIdsFromTopicAssignments() {
            return this.brokerIds;
        }

        public ZkMigrationLeadershipState writeMetadataDeltaToZookeeper(MetadataDelta delta, MetadataImage image, ZkMigrationLeadershipState state) {
            return state;
        }
    }

    class NoOpRecordConsumer
    implements ZkRecordConsumer {
        NoOpRecordConsumer() {
        }

        public void beginMigration() {
        }

        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
            return null;
        }

        public OffsetAndEpoch completeMigration() {
            return new OffsetAndEpoch(100L, 1);
        }

        public void abortMigration() {
        }
    }
}

