/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.image.loader;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.metadata.AbortTransactionRecord;
import org.apache.kafka.common.metadata.BeginTransactionRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LoaderManifest;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.MetadataLoader;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

@Timeout(value=40L)
public class MetadataLoaderTest {
    @Test
    public void testCreateAndClose() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testCreateAndClose");
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.empty()).build();){
            Assertions.assertEquals((long)-1L, (long)loader.lastAppliedOffset());
        }
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testInstallPublishers() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testInstallPublishers");
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher("a"), new MockPublisher("b"), new MockPublisher("c"));
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.empty()).build();){
            loader.installPublishers(publishers.subList(0, 2)).get();
        }
        Assertions.assertTrue((boolean)publishers.get((int)0).closed);
        Assertions.assertNull((Object)publishers.get((int)0).latestImage);
        Assertions.assertTrue((boolean)publishers.get((int)1).closed);
        Assertions.assertNull((Object)publishers.get((int)1).latestImage);
        Assertions.assertFalse((boolean)publishers.get((int)2).closed);
        Assertions.assertNull((Object)publishers.get((int)2).latestImage);
        faultHandler.maybeRethrowFirstException();
    }

    @ParameterizedTest
    @CsvSource(value={"false,false", "false,true", "true,false", "true,true"})
    public void testPublisherCannotBeInstalledMoreThanOnce(boolean loadSnapshot, boolean sameObject) throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testPublisherCannotBeInstalledMoreThanOnce");
        MockPublisher publisher = new MockPublisher();
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).build();){
            loader.installPublishers(Arrays.asList(publisher)).get();
            if (loadSnapshot) {
                MockSnapshotReader snapshotReader = new MockSnapshotReader(new MetadataProvenance(200L, 100, 4000L), Arrays.asList(Batch.control((long)200L, (int)100, (long)4000L, (int)10, Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, (ApiMessage)new SnapshotHeaderRecord())))));
                loader.handleLoadSnapshot((SnapshotReader)snapshotReader);
                TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertEquals((long)1L, (long)loader.metrics().handleLoadSnapshotCount()));
            } else {
                TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> Assertions.assertEquals((long)0L, (long)loader.metrics().handleLoadSnapshotCount()));
            }
            loader.waitForAllEventsToBeHandled();
            if (sameObject) {
                Assertions.assertEquals((Object)"testPublisherCannotBeInstalledMoreThanOnce: Attempted to install publisher MockPublisher, which is already installed.", (Object)((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
                    Void cfr_ignored_0 = (Void)loader.installPublishers(Arrays.asList(publisher)).get();
                })).getCause().getMessage());
            } else {
                Assertions.assertEquals((Object)"testPublisherCannotBeInstalledMoreThanOnce: Attempted to install a new publisher named MockPublisher, but there is already a publisher with that name.", (Object)((ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> {
                    Void cfr_ignored_0 = (Void)loader.installPublishers(Arrays.asList(new MockPublisher())).get();
                })).getCause().getMessage());
            }
        }
    }

    @Test
    public void testRemovePublisher() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testRemovePublisher");
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher("a"), new MockPublisher("b"), new MockPublisher("c"));
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).build();){
            loader.installPublishers(publishers.subList(0, 2)).get();
            loader.removeAndClosePublisher((MetadataPublisher)publishers.get(1)).get();
            MockSnapshotReader snapshotReader = MockSnapshotReader.fromRecordLists(new MetadataProvenance(100L, 50, 2000L), Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_3_IV2.featureLevel()), 0))));
            Assertions.assertFalse((boolean)snapshotReader.closed);
            loader.handleLoadSnapshot((SnapshotReader)snapshotReader);
            loader.waitForAllEventsToBeHandled();
            Assertions.assertTrue((boolean)snapshotReader.closed);
            publishers.get((int)0).firstPublish.get(1L, TimeUnit.MINUTES);
            loader.removeAndClosePublisher((MetadataPublisher)publishers.get(0)).get();
        }
        Assertions.assertTrue((boolean)publishers.get((int)0).closed);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_3_IV2, (Object)publishers.get((int)0).latestImage.features().metadataVersion());
        Assertions.assertTrue((boolean)publishers.get((int)1).closed);
        Assertions.assertNull((Object)publishers.get((int)1).latestImage);
        Assertions.assertFalse((boolean)publishers.get((int)2).closed);
        Assertions.assertNull((Object)publishers.get((int)2).latestImage);
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testLoadEmptySnapshot() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptySnapshot");
        MockTime time = new MockTime();
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher());
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setTime((Time)time).setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).build();){
            loader.installPublishers(publishers).get();
            this.loadEmptySnapshot(loader, 200L);
            publishers.get((int)0).firstPublish.get(10L, TimeUnit.SECONDS);
            Assertions.assertEquals((long)200L, (long)loader.lastAppliedOffset());
            this.loadEmptySnapshot(loader, 300L);
            Assertions.assertEquals((long)300L, (long)loader.lastAppliedOffset());
            Assertions.assertEquals((Object)new SnapshotManifest(new MetadataProvenance(300L, 100, 4000L), 3000000L), (Object)publishers.get((int)0).latestSnapshotManifest);
            Assertions.assertEquals((Object)MetadataVersion.MINIMUM_KRAFT_VERSION, (Object)loader.metrics().currentMetadataVersion());
        }
        Assertions.assertTrue((boolean)publishers.get((int)0).closed);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_0_IV1, (Object)publishers.get((int)0).latestImage.features().metadataVersion());
        Assertions.assertTrue((boolean)publishers.get((int)0).latestImage.isEmpty());
        faultHandler.maybeRethrowFirstException();
    }

    private void loadEmptySnapshot(MetadataLoader loader, long offset) throws Exception {
        MockSnapshotReader snapshotReader = new MockSnapshotReader(new MetadataProvenance(offset, 100, 4000L), Arrays.asList(Batch.control((long)200L, (int)100, (long)4000L, (int)10, Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, (ApiMessage)new SnapshotHeaderRecord())))));
        if (loader.time() instanceof MockTime) {
            snapshotReader.setTime((MockTime)loader.time());
        }
        loader.handleLoadSnapshot((SnapshotReader)snapshotReader);
        loader.waitForAllEventsToBeHandled();
    }

    @Test
    public void testLoadEmptyBatch() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch");
        MockTime time = new MockTime();
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher());
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setTime((Time)time).setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).build();){
            loader.installPublishers(publishers).get();
            this.loadTestSnapshot(loader, 200L);
            publishers.get((int)0).firstPublish.get(10L, TimeUnit.SECONDS);
            MockBatchReader batchReader = new MockBatchReader(300L, Arrays.asList(Batch.control((long)300L, (int)100, (long)4000L, (int)10, Arrays.asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, (ApiMessage)new SnapshotHeaderRecord()))))).setTime(time);
            loader.handleCommit((BatchReader)batchReader);
            loader.waitForAllEventsToBeHandled();
            Assertions.assertTrue((boolean)batchReader.closed);
            Assertions.assertEquals((long)300L, (long)loader.lastAppliedOffset());
        }
        Assertions.assertTrue((boolean)publishers.get((int)0).closed);
        Assertions.assertEquals((Object)LogDeltaManifest.newBuilder().provenance(new MetadataProvenance(300L, 100, 4000L)).leaderAndEpoch(LeaderAndEpoch.UNKNOWN).numBatches(1).elapsedNs(0L).numBytes(10L).build(), (Object)publishers.get((int)0).latestLogDeltaManifest);
        Assertions.assertEquals((Object)MetadataVersion.IBP_3_3_IV1, (Object)publishers.get((int)0).latestImage.features().metadataVersion());
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testLastAppliedOffset() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher("a"), new MockPublisher("b"));
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).build();){
            loader.installPublishers(publishers).get();
            loader.handleLoadSnapshot((SnapshotReader)MockSnapshotReader.fromRecordLists(new MetadataProvenance(200L, 100, 4000L), Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_3_IV1.featureLevel()), 0)), Arrays.asList(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Uum7sfhHQP-obSvfywmNUA")), 0)))));
            for (MockPublisher publisher : publishers) {
                publisher.firstPublish.get(1L, TimeUnit.MINUTES);
            }
            loader.waitForAllEventsToBeHandled();
            Assertions.assertEquals((long)200L, (long)loader.lastAppliedOffset());
            loader.handleCommit((BatchReader)new MockBatchReader(201L, Arrays.asList(MockBatchReader.newBatch(201L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new RemoveTopicRecord().setTopicId(Uuid.fromString((String)"Uum7sfhHQP-obSvfywmNUA")), 0))))));
            loader.waitForAllEventsToBeHandled();
            Assertions.assertEquals((long)201L, (long)loader.lastAppliedOffset());
        }
        for (int i = 0; i < 2; ++i) {
            Assertions.assertTrue((boolean)publishers.get((int)i).closed);
            Assertions.assertTrue((boolean)publishers.get((int)i).closed);
            Assertions.assertEquals((Object)MetadataVersion.IBP_3_3_IV1, (Object)publishers.get((int)i).latestImage.features().metadataVersion());
        }
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testCatchingUpState() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher("a"), new MockPublisher("b"));
        AtomicReference<OptionalLong> highWaterMark = new AtomicReference<OptionalLong>(OptionalLong.empty());
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> (OptionalLong)highWaterMark.get()).build();){
            loader.installPublishers(publishers).get();
            this.loadTestSnapshot(loader, 200L);
            Assertions.assertEquals((long)-1L, (long)loader.lastAppliedOffset());
            Assertions.assertFalse((boolean)publishers.get((int)0).firstPublish.isDone());
            highWaterMark.set(OptionalLong.of(221L));
            this.loadTestSnapshot(loader, 210L);
            Assertions.assertEquals((long)-1L, (long)loader.lastAppliedOffset());
            this.loadTestSnapshot(loader, 220L);
            Assertions.assertEquals((long)220L, (long)loader.lastAppliedOffset());
            publishers.get((int)0).firstPublish.get(1L, TimeUnit.MINUTES);
        }
        faultHandler.maybeRethrowFirstException();
    }

    private void loadTestSnapshot(MetadataLoader loader, long offset) throws Exception {
        loader.handleLoadSnapshot((SnapshotReader)MockSnapshotReader.fromRecordLists(new MetadataProvenance(offset, 100, 4000L), Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_3_IV1.featureLevel()), 0)), Arrays.asList(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Uum7sfhHQP-obSvfywmNUA")), 0)))));
        loader.waitForAllEventsToBeHandled();
    }

    private void loadTestSnapshot2(MetadataLoader loader, long offset) throws Exception {
        loader.handleLoadSnapshot((SnapshotReader)MockSnapshotReader.fromRecordLists(new MetadataProvenance(offset, 100, 4000L), Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_3_IV2.featureLevel()), 0)), Arrays.asList(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("bar").setTopicId(Uuid.fromString((String)"VcL2Mw-cT4aL6XV9VujzoQ")), 0)))));
        loader.waitForAllEventsToBeHandled();
    }

    @Test
    public void testReloadSnapshot() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
        List<MockPublisher> publishers = Arrays.asList(new MockPublisher("a"));
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).build();){
            this.loadTestSnapshot(loader, 100L);
            loader.installPublishers(publishers).get();
            loader.waitForAllEventsToBeHandled();
            Assertions.assertTrue((boolean)publishers.get((int)0).firstPublish.isDone());
            Assertions.assertTrue((boolean)publishers.get((int)0).latestDelta.image().isEmpty());
            Assertions.assertEquals((long)100L, (long)publishers.get((int)0).latestImage.provenance().lastContainedOffset());
            this.loadTestSnapshot(loader, 200L);
            Assertions.assertEquals((long)200L, (long)loader.lastAppliedOffset());
            Assertions.assertEquals((short)MetadataVersion.IBP_3_3_IV1.featureLevel(), (short)loader.metrics().currentMetadataVersion().featureLevel());
            Assertions.assertFalse((boolean)publishers.get((int)0).latestDelta.image().isEmpty());
            this.loadTestSnapshot2(loader, 400L);
            Assertions.assertEquals((long)400L, (long)loader.lastAppliedOffset());
            Assertions.assertEquals((short)MetadataVersion.IBP_3_3_IV2.featureLevel(), (short)loader.metrics().currentMetadataVersion().featureLevel());
            Assertions.assertFalse((boolean)publishers.get((int)0).latestImage.topics().topicsByName().containsKey((Object)"foo"));
            Assertions.assertTrue((boolean)publishers.get((int)0).latestImage.topics().topicsByName().containsKey((Object)"bar"));
            loader.handleCommit((BatchReader)new MockBatchReader(500L, Arrays.asList(MockBatchReader.newBatch(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new FeatureLevelRecord().setName("metadata.version").setFeatureLevel(MetadataVersion.IBP_3_5_IV0.featureLevel()), 0))))));
            loader.waitForAllEventsToBeHandled();
            Assertions.assertEquals((short)MetadataVersion.IBP_3_5_IV0.featureLevel(), (short)loader.metrics().currentMetadataVersion().featureLevel());
        }
        faultHandler.maybeRethrowFirstException();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testPublishTransaction(boolean abortTxn) throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testTransactions");
        MockPublisher publisher = new MockPublisher("testTransactions");
        List<MockPublisher> publishers = Collections.singletonList(publisher);
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).build();){
            loader.installPublishers(publishers).get();
            loader.waitForAllEventsToBeHandled();
            loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord().setName("testTransactions"), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"dMCqhcK4T5miGH5wEX7NsQ")), 0))));
            loader.waitForAllEventsToBeHandled();
            publisher.firstPublish.get(30L, TimeUnit.SECONDS);
            Assertions.assertNull((Object)publisher.latestImage.topics().getTopic("foo"), (String)"Topic should not be visible since we started transaction");
            loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setTopicId(Uuid.fromString((String)"dMCqhcK4T5miGH5wEX7NsQ")).setPartitionId(0), 0), new ApiMessageAndVersion((ApiMessage)new PartitionRecord().setTopicId(Uuid.fromString((String)"dMCqhcK4T5miGH5wEX7NsQ")).setPartitionId(1), 0))));
            loader.waitForAllEventsToBeHandled();
            Assertions.assertNull((Object)publisher.latestImage.topics().getTopic("foo"), (String)"Topic should not be visible after subsequent batch");
            if (abortTxn) {
                loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new AbortTransactionRecord(), 0))));
                loader.waitForAllEventsToBeHandled();
                Assertions.assertNull((Object)publisher.latestImage.topics().getTopic("foo"), (String)"Topic should not be visible since the transaction was aborted");
            } else {
                loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new EndTransactionRecord(), 0))));
                loader.waitForAllEventsToBeHandled();
                Assertions.assertNotNull((Object)publisher.latestImage.topics().getTopic("foo"), (String)"Topic should be visible now that transaction has ended");
            }
        }
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testPublishTransactionWithinBatch() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testPublishTransactionWithinBatch");
        MockPublisher publisher = new MockPublisher("testPublishTransactionWithinBatch");
        List<MockPublisher> publishers = Collections.singletonList(publisher);
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).build();){
            loader.installPublishers(publishers).get();
            loader.waitForAllEventsToBeHandled();
            loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord().setName("txn-1"), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"HQSM3ccPQISrHqYK_C8GpA")), 0), new ApiMessageAndVersion((ApiMessage)new EndTransactionRecord(), 0))));
            loader.waitForAllEventsToBeHandled();
            publisher.firstPublish.get(30L, TimeUnit.SECONDS);
            Assertions.assertNotNull((Object)publisher.latestImage.topics().getTopic("foo"));
        }
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testSnapshotDuringTransaction() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testSnapshotDuringTransaction");
        MockPublisher publisher = new MockPublisher("testSnapshotDuringTransaction");
        List<MockPublisher> publishers = Collections.singletonList(publisher);
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).build();){
            loader.installPublishers(publishers).get();
            loader.waitForAllEventsToBeHandled();
            loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(500L, 100, Arrays.asList(new ApiMessageAndVersion((ApiMessage)new BeginTransactionRecord().setName("txn-1"), 0), new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"HQSM3ccPQISrHqYK_C8GpA")), 0))));
            loader.waitForAllEventsToBeHandled();
            publisher.firstPublish.get(30L, TimeUnit.SECONDS);
            Assertions.assertNull((Object)publisher.latestImage.topics().getTopic("foo"));
            loader.handleLoadSnapshot((SnapshotReader)MockSnapshotReader.fromRecordLists(new MetadataProvenance(600L, 101, 4000L), Arrays.asList(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new TopicRecord().setName("foo").setTopicId(Uuid.fromString((String)"Uum7sfhHQP-obSvfywmNUA")), 0)))));
            loader.waitForAllEventsToBeHandled();
            Assertions.assertEquals((Object)"Uum7sfhHQP-obSvfywmNUA", (Object)publisher.latestImage.topics().getTopic("foo").id().toString());
        }
        faultHandler.maybeRethrowFirstException();
    }

    @Test
    public void testNoPublishEmptyImage() throws Exception {
        MockFaultHandler faultHandler = new MockFaultHandler("testNoPublishEmptyImage");
        final ArrayList capturedImages = new ArrayList();
        final CompletableFuture firstPublish = new CompletableFuture();
        MetadataPublisher capturingPublisher = new MetadataPublisher(){

            public String name() {
                return "testNoPublishEmptyImage";
            }

            public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
                if (!firstPublish.isDone()) {
                    firstPublish.complete(null);
                }
                capturedImages.add(newImage);
            }
        };
        try (MetadataLoader loader = new MetadataLoader.Builder().setFaultHandler((FaultHandler)faultHandler).setHighWaterMarkAccessor(() -> OptionalLong.of(1L)).build();){
            loader.installPublishers(Collections.singletonList(capturingPublisher)).get();
            loader.handleCommit((BatchReader)MockBatchReader.newSingleBatchReader(0L, 1, Collections.singletonList(new ApiMessageAndVersion((ApiMessage)new ConfigRecord().setResourceType(ConfigResource.Type.BROKER.id()).setResourceName("3000").setName("foo").setValue("bar"), 0))));
            firstPublish.get(30L, TimeUnit.SECONDS);
            Assertions.assertFalse((boolean)capturedImages.isEmpty());
            capturedImages.forEach(metadataImage -> Assertions.assertFalse((boolean)metadataImage.isEmpty()));
        }
        faultHandler.maybeRethrowFirstException();
    }

    static class MockBatchReader
    implements BatchReader<ApiMessageAndVersion> {
        private final long baseOffset;
        private final Iterator<Batch<ApiMessageAndVersion>> iterator;
        private boolean closed = false;
        private MockTime time = null;

        static MockBatchReader newSingleBatchReader(long batchBaseOffset, int epoch, List<ApiMessageAndVersion> records) {
            return new MockBatchReader(batchBaseOffset, Collections.singletonList(MockBatchReader.newBatch(batchBaseOffset, epoch, records)));
        }

        static Batch<ApiMessageAndVersion> newBatch(long batchBaseOffset, int epoch, List<ApiMessageAndVersion> records) {
            return Batch.data((long)batchBaseOffset, (int)epoch, (long)0L, (int)0, records);
        }

        MockBatchReader(long baseOffset, List<Batch<ApiMessageAndVersion>> batches) {
            this.baseOffset = baseOffset;
            this.iterator = batches.iterator();
        }

        private MockBatchReader setTime(MockTime time) {
            this.time = time;
            return this;
        }

        public long baseOffset() {
            return this.baseOffset;
        }

        public OptionalLong lastOffset() {
            return OptionalLong.empty();
        }

        public void close() {
            this.closed = true;
        }

        public boolean hasNext() {
            if (this.time != null) {
                this.time.sleep(1L);
            }
            return this.iterator.hasNext();
        }

        public Batch<ApiMessageAndVersion> next() {
            if (this.time != null) {
                this.time.sleep(1L);
            }
            return this.iterator.next();
        }
    }

    static class MockSnapshotReader
    implements SnapshotReader<ApiMessageAndVersion> {
        private final MetadataProvenance provenance;
        private final Iterator<Batch<ApiMessageAndVersion>> iterator;
        private MockTime time = null;
        boolean closed = false;

        static MockSnapshotReader fromRecordLists(MetadataProvenance provenance, List<List<ApiMessageAndVersion>> lists) {
            List<Batch<ApiMessageAndVersion>> batches = lists.stream().map(records -> Batch.data((long)0L, (int)0, (long)0L, (int)0, (List)records)).collect(Collectors.toList());
            return new MockSnapshotReader(provenance, batches);
        }

        MockSnapshotReader(MetadataProvenance provenance, List<Batch<ApiMessageAndVersion>> batches) {
            this.provenance = provenance;
            this.iterator = batches.iterator();
        }

        MockSnapshotReader setTime(MockTime time) {
            this.time = time;
            return this;
        }

        public OffsetAndEpoch snapshotId() {
            return this.provenance.snapshotId();
        }

        public long lastContainedLogOffset() {
            return this.provenance.lastContainedOffset();
        }

        public int lastContainedLogEpoch() {
            return this.provenance.lastContainedEpoch();
        }

        public long lastContainedLogTimestamp() {
            return this.provenance.lastContainedLogTimeMs();
        }

        public void close() {
            this.closed = true;
        }

        public boolean hasNext() {
            if (this.time != null) {
                this.time.sleep(1L);
            }
            return this.iterator.hasNext();
        }

        public Batch<ApiMessageAndVersion> next() {
            if (this.time != null) {
                this.time.sleep(1L);
            }
            return this.iterator.next();
        }
    }

    static class MockPublisher
    implements MetadataPublisher {
        final CompletableFuture<Void> firstPublish = new CompletableFuture();
        private final String name;
        volatile MetadataDelta latestDelta = null;
        volatile MetadataImage latestImage = null;
        volatile LogDeltaManifest latestLogDeltaManifest = null;
        volatile SnapshotManifest latestSnapshotManifest = null;
        volatile boolean closed = false;

        MockPublisher() {
            this("MockPublisher");
        }

        MockPublisher(String name) {
            this.name = name;
        }

        public String name() {
            return this.name;
        }

        public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) {
            this.latestDelta = delta;
            this.latestImage = newImage;
            switch (manifest.type()) {
                case LOG_DELTA: {
                    this.latestLogDeltaManifest = (LogDeltaManifest)manifest;
                    break;
                }
                case SNAPSHOT: {
                    this.latestSnapshotManifest = (SnapshotManifest)manifest;
                    break;
                }
                default: {
                    throw new RuntimeException("Invalid manifest type " + manifest.type());
                }
            }
            this.firstPublish.complete(null);
        }

        public void close() throws Exception {
            this.firstPublish.completeExceptionally(new RejectedExecutionException());
            this.closed = true;
        }
    }
}

