/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.log.remote.metadata.storage;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataManagerTestUtils;
import org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;

@ClusterTestDefaults(brokers=3)
@ExtendWith(value={ClusterTestExtensions.class})
@Tag(value="integration")
public class RemoteLogSegmentLifecycleTest {
    private final int segSize = 0x100000;
    private final int brokerId0 = 0;
    private final int brokerId1 = 1;
    private final Uuid topicId = Uuid.randomUuid();
    private final TopicPartition tp = new TopicPartition("foo", 0);
    private final TopicIdPartition topicIdPartition = new TopicIdPartition(this.topicId, this.tp);
    private final Time time = new SystemTime();
    private final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = (RemotePartitionMetadataStore)Mockito.spy((Object)new RemotePartitionMetadataStore());
    private final ClusterInstance clusterInstance;

    RemoteLogSegmentLifecycleTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    private RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() {
        return RemoteLogMetadataManagerTestUtils.builder().bootstrapServers(this.clusterInstance.bootstrapServers()).startConsumerThread(true).remotePartitionMetadataStore(() -> this.spyRemotePartitionMetadataStore).build();
    }

    @ClusterTest
    public void testRemoteLogSegmentLifeCycle() throws Exception {
        try (RemoteLogMetadataManager metadataManager = this.createTopicBasedRemoteLogMetadataManager();){
            metadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            HashMap<Integer, Long> leaderEpochSegment0 = new HashMap<Integer, Long>();
            leaderEpochSegment0.put(0, 0L);
            leaderEpochSegment0.put(1, 20L);
            leaderEpochSegment0.put(2, 80L);
            RemoteLogSegmentId segmentId0 = new RemoteLogSegmentId(this.topicIdPartition, Uuid.randomUuid());
            RemoteLogSegmentMetadata metadataSegment0 = new RemoteLogSegmentMetadata(segmentId0, 0L, 100L, -1L, 0, this.time.milliseconds(), 0x100000, leaderEpochSegment0);
            metadataManager.addRemoteLogSegmentMetadata(metadataSegment0).get();
            ((RemotePartitionMetadataStore)Mockito.verify((Object)this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(metadataSegment0);
            Assertions.assertFalse((boolean)metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 1, 40L).isPresent());
            for (int leaderEpoch = 0; leaderEpoch <= 2; ++leaderEpoch) {
                Assertions.assertFalse((boolean)metadataManager.highestOffsetForEpoch(this.topicIdPartition, leaderEpoch).isPresent());
            }
            RemoteLogSegmentMetadataUpdate metadataUpdateSegment0 = new RemoteLogSegmentMetadataUpdate(segmentId0, this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
            metadataManager.updateRemoteLogSegmentMetadata(metadataUpdateSegment0).get();
            ((RemotePartitionMetadataStore)Mockito.verify((Object)this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadataUpdate(metadataUpdateSegment0);
            metadataSegment0 = metadataSegment0.createWithUpdates(metadataUpdateSegment0);
            Map<Integer, Long> leaderEpochSegment1 = Collections.singletonMap(2, 101L);
            RemoteLogSegmentMetadata metadataSegment1 = this.upsertSegmentState(metadataManager, leaderEpochSegment1, 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap<Integer, Long> leaderEpochSegment2 = new HashMap<Integer, Long>();
            leaderEpochSegment2.put(2, 201L);
            leaderEpochSegment2.put(3, 240L);
            RemoteLogSegmentMetadata metadataSegment2 = this.upsertSegmentState(metadataManager, leaderEpochSegment2, 201L, 300L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap<Integer, Long> leaderEpochSegment3 = new HashMap<Integer, Long>();
            leaderEpochSegment3.put(3, 250L);
            leaderEpochSegment3.put(4, 370L);
            RemoteLogSegmentMetadata metadataSegment3 = this.upsertSegmentState(metadataManager, leaderEpochSegment3, 250L, 400L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap<EpochEntry, RemoteLogSegmentMetadata> expectedEpochEntryToMetadata = new HashMap<EpochEntry, RemoteLogSegmentMetadata>();
            expectedEpochEntryToMetadata.put(new EpochEntry(1, 40L), metadataSegment0);
            expectedEpochEntryToMetadata.put(new EpochEntry(2, 110L), metadataSegment1);
            expectedEpochEntryToMetadata.put(new EpochEntry(3, 240L), metadataSegment2);
            expectedEpochEntryToMetadata.put(new EpochEntry(3, 250L), metadataSegment3);
            expectedEpochEntryToMetadata.put(new EpochEntry(4, 375L), metadataSegment3);
            expectedEpochEntryToMetadata.put(new EpochEntry(1, 110L), null);
            expectedEpochEntryToMetadata.put(new EpochEntry(4, 401L), null);
            expectedEpochEntryToMetadata.put(new EpochEntry(5, 301L), null);
            for (Map.Entry entry : expectedEpochEntryToMetadata.entrySet()) {
                EpochEntry epochEntry = (EpochEntry)entry.getKey();
                Optional actualMetadataOpt = metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, epochEntry.epoch, epochEntry.startOffset);
                RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata)entry.getValue();
                if (remoteLogSegmentMetadata != null) {
                    Assertions.assertEquals(Optional.of(remoteLogSegmentMetadata), (Object)actualMetadataOpt);
                    continue;
                }
                Assertions.assertFalse((boolean)actualMetadataOpt.isPresent());
            }
            RemoteLogSegmentMetadataUpdate metadataDeleteStartedSegment0 = new RemoteLogSegmentMetadataUpdate(metadataSegment0.remoteLogSegmentId(), this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, 1);
            metadataManager.updateRemoteLogSegmentMetadata(metadataDeleteStartedSegment0).get();
            Assertions.assertFalse((boolean)metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 10L).isPresent());
            RemoteLogSegmentMetadataUpdate metadataDeleteFinishedSegment0 = new RemoteLogSegmentMetadataUpdate(metadataSegment0.remoteLogSegmentId(), this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, 1);
            metadataManager.updateRemoteLogSegmentMetadata(metadataDeleteFinishedSegment0).get();
            Assertions.assertFalse((boolean)metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 10L).isPresent());
            HashMap<Integer, Long> expectedEpochToHighestOffset = new HashMap<Integer, Long>();
            expectedEpochToHighestOffset.put(0, 19L);
            expectedEpochToHighestOffset.put(1, 79L);
            expectedEpochToHighestOffset.put(2, 239L);
            expectedEpochToHighestOffset.put(3, 369L);
            expectedEpochToHighestOffset.put(4, 400L);
            for (Map.Entry entry : expectedEpochToHighestOffset.entrySet()) {
                Integer epoch = (Integer)entry.getKey();
                Long expectedOffset = (Long)entry.getValue();
                Optional offset = metadataManager.highestOffsetForEpoch(this.topicIdPartition, epoch.intValue());
                Assertions.assertEquals(Optional.of(expectedOffset), (Object)offset);
            }
            Optional highestOffsetForEpoch5 = metadataManager.highestOffsetForEpoch(this.topicIdPartition, 5);
            Assertions.assertFalse((boolean)highestOffsetForEpoch5.isPresent());
        }
    }

    private RemoteLogSegmentMetadata upsertSegmentState(RemoteLogMetadataManager metadataManager, Map<Integer, Long> segmentLeaderEpochs, long startOffset, long endOffset, RemoteLogSegmentState state) throws RemoteStorageException, ExecutionException, InterruptedException {
        RemoteLogSegmentId segmentId = new RemoteLogSegmentId(this.topicIdPartition, Uuid.randomUuid());
        RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset, -1L, 0, this.time.milliseconds(), 0x100000, segmentLeaderEpochs);
        metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
        ((RemotePartitionMetadataStore)Mockito.verify((Object)this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(segmentMetadata);
        RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, this.time.milliseconds(), Optional.empty(), state, 1);
        metadataManager.updateRemoteLogSegmentMetadata(segMetadataUpdate).get();
        ((RemotePartitionMetadataStore)Mockito.verify((Object)this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadataUpdate(segMetadataUpdate);
        return segmentMetadata.createWithUpdates(segMetadataUpdate);
    }

    private void checkListSegments(RemoteLogMetadataManager metadataManager, int leaderEpoch, RemoteLogSegmentMetadata expectedMetadata) throws RemoteStorageException {
        Iterator metadataIter = metadataManager.listRemoteLogSegments(this.topicIdPartition, leaderEpoch);
        Assertions.assertTrue((boolean)metadataIter.hasNext());
        Assertions.assertEquals((Object)expectedMetadata, metadataIter.next());
        Iterator allMetadataIter = metadataManager.listRemoteLogSegments(this.topicIdPartition);
        Assertions.assertTrue((boolean)allMetadataIter.hasNext());
        Assertions.assertEquals((Object)expectedMetadata, allMetadataIter.next());
    }

    @ClusterTest
    public void testCacheSegmentWithCopySegmentStartedState() throws Exception {
        try (RemoteLogMetadataManager metadataManager = this.createTopicBasedRemoteLogMetadataManager();){
            metadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentId segmentId = new RemoteLogSegmentId(this.topicIdPartition, Uuid.randomUuid());
            RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, 0L, 50L, -1L, 0, this.time.milliseconds(), 0x100000, Collections.singletonMap(0, 0L));
            metadataManager.addRemoteLogSegmentMetadata(segmentMetadata).get();
            ((RemotePartitionMetadataStore)Mockito.verify((Object)this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadata(segmentMetadata);
            Optional segMetadataForOffset0Epoch0 = metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 0L);
            Assertions.assertFalse((boolean)segMetadataForOffset0Epoch0.isPresent());
            this.checkListSegments(metadataManager, 0, segmentMetadata);
        }
    }

    @ClusterTest
    public void testCacheSegmentWithCopySegmentFinishedState() throws Exception {
        try (RemoteLogMetadataManager metadataManager = this.createTopicBasedRemoteLogMetadataManager();){
            metadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata segmentMetadata = this.upsertSegmentState(metadataManager, Collections.singletonMap(0, 101L), 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            Optional segMetadataForOffset150 = metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 150L);
            Assertions.assertEquals(Optional.of(segmentMetadata), (Object)segMetadataForOffset150);
            this.checkListSegments(metadataManager, 0, segmentMetadata);
        }
    }

    @ClusterTest
    public void testCacheSegmentWithDeleteSegmentStartedState() throws Exception {
        try (RemoteLogMetadataManager metadataManager = this.createTopicBasedRemoteLogMetadataManager();){
            metadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata segmentMetadata = this.upsertSegmentState(metadataManager, Collections.singletonMap(0, 201L), 201L, 300L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
            Optional segmentMetadataForOffset250Epoch0 = metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 250L);
            Assertions.assertFalse((boolean)segmentMetadataForOffset250Epoch0.isPresent());
            this.checkListSegments(metadataManager, 0, segmentMetadata);
        }
    }

    @ClusterTest
    public void testCacheSegmentsWithDeleteSegmentFinishedState() throws Exception {
        try (RemoteLogMetadataManager metadataManager = this.createTopicBasedRemoteLogMetadataManager();){
            metadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata segmentMetadata = this.upsertSegmentState(metadataManager, Collections.singletonMap(0, 301L), 301L, 400L, RemoteLogSegmentState.DELETE_SEGMENT_STARTED);
            Assertions.assertFalse((boolean)metadataManager.remoteLogSegmentMetadata(this.topicIdPartition, 0, 350L).isPresent());
            RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), this.time.milliseconds(), Optional.empty(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, 1);
            metadataManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
            ((RemotePartitionMetadataStore)Mockito.verify((Object)this.spyRemotePartitionMetadataStore)).handleRemoteLogSegmentMetadataUpdate(segmentMetadataUpdate);
            Assertions.assertFalse((boolean)metadataManager.listRemoteLogSegments(this.topicIdPartition, 0).hasNext());
            Assertions.assertFalse((boolean)metadataManager.listRemoteLogSegments(this.topicIdPartition).hasNext());
        }
    }

    @ClusterTest
    public void testCacheListSegments() throws Exception {
        try (RemoteLogMetadataManager metadataManager = this.createTopicBasedRemoteLogMetadataManager();){
            metadataManager.onPartitionLeadershipChanges(Collections.singleton(this.topicIdPartition), Collections.emptySet());
            RemoteLogSegmentMetadata segment0 = this.upsertSegmentState(metadataManager, Collections.singletonMap(0, 0L), 0L, 100L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            RemoteLogSegmentMetadata segment1 = this.upsertSegmentState(metadataManager, Collections.singletonMap(0, 101L), 101L, 200L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            HashMap<Integer, Long> leaderEpochSegment2 = new HashMap<Integer, Long>();
            leaderEpochSegment2.put(0, 201L);
            leaderEpochSegment2.put(1, 301L);
            RemoteLogSegmentMetadata segment2 = this.upsertSegmentState(metadataManager, leaderEpochSegment2, 201L, 400L, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
            List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch0 = Arrays.asList(segment0, segment1, segment2);
            Assertions.assertTrue((boolean)TestUtils.sameElementsWithOrder(expectedSegmentsForEpoch0.iterator(), (Iterator)metadataManager.listRemoteLogSegments(this.topicIdPartition, 0)));
            Assertions.assertTrue((boolean)TestUtils.sameElementsWithoutOrder(expectedSegmentsForEpoch0.iterator(), (Iterator)metadataManager.listRemoteLogSegments(this.topicIdPartition)));
            List<RemoteLogSegmentMetadata> expectedSegmentsForEpoch1 = Collections.singletonList(segment2);
            Assertions.assertTrue((boolean)TestUtils.sameElementsWithOrder(expectedSegmentsForEpoch1.iterator(), (Iterator)metadataManager.listRemoteLogSegments(this.topicIdPartition, 1)));
        }
    }
}

